# 3. Fun√ß√µes Utilizadas

## 3.1. An√°lises gerais

In [None]:
def verificar_duplicatas(df, col_id="msno", col_safra="safra", visualizar=False):
    """
    Identifica e conta linhas duplicadas para a mesma chave (ID + Safra).
    """
    # Define a janela abrangendo todas as linhas do grupo
    window_spec = Window.partitionBy(col_id, col_safra)\
                        .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
    
    # Adiciona contagem e filtra duplicados
    df_duplicados = df.withColumn("contagem_janela", F.count("*").over(window_spec)).filter("contagem_janela > 1")
    
    if visualizar:
        df_duplicados.show(10)
    
    # Retorna a quantidade total de linhas duplicadas encontradas
    return df_duplicados.count()

In [None]:
def verificar_mudanca_estado(df, col_alvo, col_id="msno", col_safra="safra", visualizar=False):
    """
    Verifica mudan√ßas de estado em uma vari√°vel ao longo do tempo (safras).
    """
    # Definir a especifica√ß√£o da janela
    window_spec = Window.partitionBy(col_id).orderBy(col_safra)

    # Lag para criar as colunas de estado anterior da variavel e detec√ß√£o de mudan√ßa
    df_historico = (
        df.withColumn(f"{col_alvo}_anterior", F.lag(col_alvo).over(window_spec))
          .withColumn("mudou", 
              F.when(F.col(f"{col_alvo}_anterior").isNull(), False)
               .otherwise(F.col(col_alvo) != F.col(f"{col_alvo}_anterior"))))

    # Caso o usuario deseje visualizar as mudancas
    if visualizar:
        # Mostrar quantas mudancas ocorreram por safra
        print("Contagem de mudan√ßas por safra:")
        df_historico.filter(F.col("mudou") == True).groupBy(col_safra, "mudou").count().orderBy(col_safra, "mudou").show(30, truncate=False)
        # Mostrar quantas vezes o usuario mudou
        print("Mudan√ßas por usu√°rio:")
        df_historico.filter(F.col("mudou") == True).groupBy(col_id).agg(F.sum(F.col("mudou").cast("int")).alias("total_mudancas")).show(10, truncate=False)

In [None]:
def calcular_distribuicao(df, colunas_alvo, col_id="msno", n_show=10, agrupar_por_safra=False):
    """
    Gera agrega√ß√£o de contagem e percentual sobre uma vari√°vel alvo.
    """
    # 1. Calcula o total da base para o percentual global
    total_base = df.count()

    if agrupar_por_safra:
        # 2. Agrupamento inicial por safra e alvo
        df_result = df.groupBy("safra", *colunas_alvo).agg(
            F.count(col_id).alias("total")
        )
        
        # 3. Define a janela para calcular o total por safra (denominador do pct_safra)
        window_safra = Window.partitionBy("safra")
        
        # 4. Adiciona os c√°lculos de percentual
        df_result = df_result.withColumn(
            "pct_safra", 
            F.round((F.col("total") / F.sum("total").over(window_safra)) * 100, 2)
        ).withColumn(
            "pct_total", 
            F.round((F.col("total") / F.lit(total_base)) * 100, 2)
        ).orderBy("safra", F.desc("total"))
        
    else:
        # Resultado geral sem parti√ß√£o de safra
        df_result = df.groupBy(colunas_alvo).agg(
            F.count(col_id).alias("total"),
            F.round((F.count(col_id) / F.lit(total_base)) * 100, 2).alias("pct_total")
        ).orderBy(F.desc("total"))
    
    df_result.show(n_show, truncate=False)
    return df_result # Retornar o DF √© uma boa pr√°tica para an√°lises posteriores

In [None]:
def correlation_matrix(df, colunas, plot=True, top_n=15, figsize=(20, 8)):
    """
    Calcula e visualiza matriz de correla√ß√£o com layout otimizado.
    
    Par√¢metros:
    -----------
    df : Spark DataFrame
    colunas : Lista com nomes das colunas num√©ricas
    plot : bool - Se True, gera visualiza√ß√£o
    top_n : int - N√∫mero m√°ximo de features a exibir (ordenadas por vari√¢ncia ou IV)
    figsize : tuple - Tamanho da figura
    """
    n = len(colunas)
    
    # ========================================================================
    # CASO 1: Par de vari√°veis (Scatter Plot)
    # ========================================================================
    if n == 2:
        col1, col2 = colunas
        
        pearson_val = df.corr(col1, col2, method="pearson")
        spearman_val = df.corr(col1, col2, method="spearman")
        
        print(f"üìä Correla√ß√£o de Pearson:  {pearson_val:+.4f}")
        print(f"üìä Correla√ß√£o de Spearman: {spearman_val:+.4f}")

        if plot:
            amostra_pd = df.select(col1, col2).sample(
                False, 
                fraction=min(1.0, 50000/df.count())
            ).toPandas()
            
            fig, ax = plt.subplots(figsize=(10, 7))
            
            # Scatter com densidade
            scatter = ax.scatter(
                amostra_pd[col1], 
                amostra_pd[col2], 
                alpha=0.4, 
                s=20,
                c=amostra_pd[col1],
                cmap='viridis',
                edgecolors='none'
            )
            
            # Linha de tend√™ncia
            z = np.polyfit(amostra_pd[col1], amostra_pd[col2], 1)
            p = np.poly1d(z)
            ax.plot(
                amostra_pd[col1].sort_values(), 
                p(amostra_pd[col1].sort_values()), 
                "r--", 
                alpha=0.8, 
                linewidth=2,
                label=f'Tend√™ncia: y = {z[0]:.3f}x + {z[1]:.3f}'
            )
            
            ax.set_xlabel(col1, fontsize=12, fontweight='bold')
            ax.set_ylabel(col2, fontsize=12, fontweight='bold')
            ax.set_title(
                f"Dispers√£o: {col1} vs {col2}\n"
                f"Pearson: {pearson_val:+.3f}  |  Spearman: {spearman_val:+.3f}",
                fontsize=14,
                fontweight='bold',
                pad=20
            )
            ax.grid(True, alpha=0.3, linestyle='--')
            ax.legend(loc='best', fontsize=10)
            
            plt.colorbar(scatter, ax=ax, label=col1)
            plt.tight_layout()
            plt.show()
            plt.close()
    
    # ========================================================================
    # CASO 2: Matriz de correla√ß√£o (> 2 vari√°veis)
    # ========================================================================
    else:
        # Limitar n√∫mero de features se necess√°rio
        if n > top_n:
            print(f"‚ö†Ô∏è  Muitas features ({n}). Exibindo apenas top {top_n} por vari√¢ncia.")
            
            # Calcular vari√¢ncia de cada coluna
            variancias = []
            for col in colunas:
                var = df.select(F.variance(col)).collect()[0][0]
                variancias.append((col, var if var else 0))
            
            # Ordenar por vari√¢ncia e pegar top_n
            colunas_top = [col for col, _ in sorted(variancias, key=lambda x: x[1], reverse=True)[:top_n]]
        else:
            colunas_top = colunas
        
        # Preparar vetor
        assembler = VectorAssembler(
            inputCols=colunas_top, 
            outputCol="features", 
            handleInvalid="skip"
        )
        df_vector = assembler.transform(df).select("features")
        
        # Calcular matrizes
        def get_corr_matrix(method):
            matrix = Correlation.corr(df_vector, "features", method=method).collect()[0][0]
            return pd.DataFrame(matrix.toArray(), index=colunas_top, columns=colunas_top)
        
        print("üîÑ Calculando matriz de Pearson...")
        matrix_pearson = get_corr_matrix("pearson")
        
        print("üîÑ Calculando matriz de Spearman...")
        matrix_spearman = get_corr_matrix("spearman")
        
        if plot:
            # ============================================================
            # VISUALIZA√á√ÉO OTIMIZADA
            # ============================================================
            
            # Truncar nomes longos para melhor legibilidade
            labels_curtos = [
                col[:30] + '...' if len(col) > 30 else col 
                for col in colunas_top
            ]
            
            fig, axes = plt.subplots(1, 2, figsize=figsize)
            
            # -------------------- PEARSON --------------------
            mask_pearson = np.triu(np.ones_like(matrix_pearson, dtype=bool), k=1)
            
            sns.heatmap(
                matrix_pearson,
                annot=True,
                fmt=".2f",
                cmap='RdBu_r',  # Vermelho = positivo, Azul = negativo
                center=0,
                vmin=-1,
                vmax=1,
                square=True,
                linewidths=0.5,
                cbar_kws={
                    "shrink": 0.8,
                    "label": "Correla√ß√£o"
                },
                ax=axes[0],
                xticklabels=labels_curtos,
                yticklabels=labels_curtos,
                annot_kws={"size": 8}
            )
            
            axes[0].set_title(
                "Matriz de Pearson\n(Correla√ß√£o Linear)",
                fontsize=14,
                fontweight='bold',
                pad=15
            )
            axes[0].set_xticklabels(
                axes[0].get_xticklabels(),
                rotation=45,
                ha='right',
                fontsize=9
            )
            axes[0].set_yticklabels(
                axes[0].get_yticklabels(),
                rotation=0,
                fontsize=9
            )
            
            # -------------------- SPEARMAN --------------------
            mask_spearman = np.triu(np.ones_like(matrix_spearman, dtype=bool), k=1)
            
            sns.heatmap(
                matrix_spearman,
                annot=True,
                fmt=".2f",
                cmap='PiYG',  # Rosa/Verde para Spearman
                center=0,
                vmin=-1,
                vmax=1,
                square=True,
                linewidths=0.5,
                cbar_kws={
                    "shrink": 0.8,
                    "label": "Correla√ß√£o"
                },
                ax=axes[1],
                xticklabels=labels_curtos,
                yticklabels=labels_curtos,
                annot_kws={"size": 8}
            )
            
            axes[1].set_title(
                "Matriz de Spearman\n(Correla√ß√£o Monot√¥nica)",
                fontsize=14,
                fontweight='bold',
                pad=15
            )
            axes[1].set_xticklabels(
                axes[1].get_xticklabels(),
                rotation=45,
                ha='right',
                fontsize=9
            )
            axes[1].set_yticklabels(
                axes[1].get_yticklabels(),
                rotation=0,
                fontsize=9
            )
            
            plt.tight_layout()
            plt.show()
            plt.close()
            
            # ============================================================
            # VISUALIZA√á√ÉO ALTERNATIVA: Apenas Tri√¢ngulo Superior
            # ============================================================
            
            print("\nüìä Gerando visualiza√ß√£o alternativa (tri√¢ngulo superior)...\n")
            
            fig, axes = plt.subplots(1, 2, figsize=figsize)
            
            # M√°scara para tri√¢ngulo superior
            mask = np.triu(np.ones_like(matrix_pearson, dtype=bool))
            
            # Pearson (tri√¢ngulo inferior)
            sns.heatmap(
                matrix_pearson,
                mask=mask,
                annot=True,
                fmt=".2f",
                cmap='RdBu_r',
                center=0,
                vmin=-1,
                vmax=1,
                square=True,
                linewidths=0.5,
                cbar_kws={"shrink": 0.8},
                ax=axes[0],
                xticklabels=labels_curtos,
                yticklabels=labels_curtos,
                annot_kws={"size": 8}
            )
            
            axes[0].set_title(
                "Pearson (Tri√¢ngulo Inferior)",
                fontsize=14,
                fontweight='bold',
                pad=15
            )
            axes[0].set_xticklabels(
                axes[0].get_xticklabels(),
                rotation=45,
                ha='right',
                fontsize=9
            )
            axes[0].set_yticklabels(
                axes[0].get_yticklabels(),
                rotation=0,
                fontsize=9
            )
            
            # Spearman (tri√¢ngulo inferior)
            sns.heatmap(
                matrix_spearman,
                mask=mask,
                annot=True,
                fmt=".2f",
                cmap='PiYG',
                center=0,
                vmin=-1,
                vmax=1,
                square=True,
                linewidths=0.5,
                cbar_kws={"shrink": 0.8},
                ax=axes[1],
                xticklabels=labels_curtos,
                yticklabels=labels_curtos,
                annot_kws={"size": 8}
            )
            
            axes[1].set_title(
                "Spearman (Tri√¢ngulo Inferior)",
                fontsize=14,
                fontweight='bold',
                pad=15
            )
            axes[1].set_xticklabels(
                axes[1].get_xticklabels(),
                rotation=45,
                ha='right',
                fontsize=9
            )
            axes[1].set_yticklabels(
                axes[1].get_yticklabels(),
                rotation=0,
                fontsize=9
            )
            
            plt.tight_layout()
            plt.show()
            plt.close()
        
        return matrix_pearson, matrix_spearman

In [None]:
def plot_correlation_clusters(matrix_pearson, threshold=0.85, figsize=(16, 12), max_pairs=30):
    """
    Visualiza apenas pares com alta correla√ß√£o (|r| >= threshold).
    Vers√£o otimizada para evitar sobreposi√ß√£o de labels.
    
    Par√¢metros:
    -----------
    matrix_pearson : pd.DataFrame - Matriz de correla√ß√£o
    threshold : float - Threshold m√≠nimo de correla√ß√£o
    figsize : tuple - Tamanho da figura
    max_pairs : int - N√∫mero m√°ximo de pares a exibir
    """
    
    # Extrair pares com alta correla√ß√£o
    pares_altos = []
    n = len(matrix_pearson)
    
    for i in range(n):
        for j in range(i+1, n):
            corr_val = matrix_pearson.iloc[i, j]
            if abs(corr_val) >= threshold:
                pares_altos.append({
                    'feature_1': matrix_pearson.index[i],
                    'feature_2': matrix_pearson.columns[j],
                    'correlacao': corr_val
                })
    
    if not pares_altos:
        print(f"‚úÖ Nenhum par com |correla√ß√£o| >= {threshold}")
        return
    
    # Criar DataFrame e ordenar
    df_pares = pd.DataFrame(pares_altos).sort_values('correlacao', key=abs, ascending=False)
    
    # Limitar n√∫mero de pares se necess√°rio
    if len(df_pares) > max_pairs:
        print(f"‚ö†Ô∏è  Limitando visualiza√ß√£o aos top {max_pairs} pares (de {len(df_pares)} encontrados)")
        df_pares = df_pares.head(max_pairs)
    
    print(f"\nüîó Encontrados {len(pares_altos)} pares com |correla√ß√£o| >= {threshold}\n")
    print(df_pares.to_string(index=False))
    
    # ========================================================================
    # VISUALIZA√á√ÉO MELHORADA
    # ========================================================================
    
    fig, ax = plt.subplots(figsize=figsize)
    
    # Criar labels mais limpos
    def truncar_nome(nome, max_len=35):
        """Trunca nome mantendo in√≠cio e fim"""
        if len(nome) <= max_len:
            return nome
        else:
            # Manter in√≠cio e fim
            metade = (max_len - 3) // 2
            return f"{nome[:metade]}...{nome[-metade:]}"
    
    # Labels formatados (um por linha, sem "vs")
    labels = []
    for _, row in df_pares.iterrows():
        feat1 = truncar_nome(row['feature_1'], max_len=40)
        feat2 = truncar_nome(row['feature_2'], max_len=40)
        labels.append(f"{feat1}\n  ‚ü∑  {feat2}")
    
    # Cores por intensidade
    def get_color(corr):
        abs_corr = abs(corr)
        if abs_corr >= 0.99:
            return '#8B0000'  # Vermelho escuro (perfeita)
        elif abs_corr >= 0.95:
            return '#DC143C'  # Vermelho (cr√≠tica)
        elif abs_corr >= 0.90:
            return '#FF8C00'  # Laranja (muito alta)
        else:
            return '#32CD32'  # Verde (alta)
    
    colors = [get_color(c) for c in df_pares['correlacao']]
    
    # Criar barras horizontais
    y_pos = np.arange(len(df_pares))
    bars = ax.barh(y_pos, df_pares['correlacao'], color=colors, alpha=0.85, height=0.7)
    
    # Configurar eixos
    ax.set_yticks(y_pos)
    ax.set_yticklabels(labels, fontsize=9, family='monospace')
    ax.set_xlabel('Correla√ß√£o de Pearson', fontsize=13, fontweight='bold')
    ax.set_xlim([-1.05, 1.15])  # Espa√ßo extra para valores
    
    # T√≠tulo
    ax.set_title(
        f'Pares com Alta Correla√ß√£o (|r| ‚â• {threshold})\n'
        f'Total: {len(pares_altos)} pares | Exibindo: {len(df_pares)} pares',
        fontsize=15,
        fontweight='bold',
        pad=20
    )
    
    # Linhas de refer√™ncia
    ax.axvline(x=0, color='black', linestyle='-', linewidth=1.2, alpha=0.7)
    ax.axvline(x=threshold, color='red', linestyle='--', linewidth=1.5, alpha=0.4, 
               label=f'Threshold = ¬±{threshold}')
    ax.axvline(x=-threshold, color='red', linestyle='--', linewidth=1.5, alpha=0.4)
    ax.axvline(x=0.95, color='orange', linestyle=':', linewidth=1.2, alpha=0.4, 
               label='Cr√≠tico = ¬±0.95')
    ax.axvline(x=-0.95, color='orange', linestyle=':', linewidth=1.2, alpha=0.4)
    
    # Grid
    ax.grid(axis='x', alpha=0.3, linestyle='--', linewidth=0.5)
    ax.set_axisbelow(True)
    
    # Adicionar valores nas barras
    for i, (bar, val) in enumerate(zip(bars, df_pares['correlacao'])):
        # Posi√ß√£o do texto
        x_pos = val + 0.03 if val > 0 else val - 0.03
        ha = 'left' if val > 0 else 'right'
        
        # Cor do texto (branco se barra escura)
        text_color = 'white' if abs(val) >= 0.95 else 'black'
        
        # Texto dentro da barra se correla√ß√£o alta
        if abs(val) >= 0.90:
            x_pos = val - 0.03 if val > 0 else val + 0.03
            ha = 'right' if val > 0 else 'left'
            text_color = 'white'
        
        ax.text(
            x_pos,
            i,
            f'{val:.3f}',
            va='center',
            ha=ha,
            fontsize=10,
            fontweight='bold',
            color=text_color
        )
    
    # Legenda
    from matplotlib.patches import Patch
    legend_elements = [
        Patch(facecolor='#8B0000', label='Perfeita (‚â• 0.99)'),
        Patch(facecolor='#DC143C', label='Cr√≠tica (0.95 - 0.99)'),
        Patch(facecolor='#FF8C00', label='Muito Alta (0.90 - 0.95)'),
        Patch(facecolor='#32CD32', label='Alta (0.85 - 0.90)'),
    ]
    ax.legend(
        handles=legend_elements,
        loc='lower right',
        fontsize=10,
        framealpha=0.9,
        title='Intensidade'
    )
    
    plt.tight_layout()
    plt.show()
    plt.close()
    
    # ========================================================================
    # TABELA RESUMIDA POR CATEGORIA
    # ========================================================================
    
    print("\n" + "="*100)
    print("üìä RESUMO POR CATEGORIA DE CORRELA√á√ÉO")
    print("="*100)
    
    categorias = [
        ('üî¥ PERFEITA (‚â• 0.99)', 0.99, 1.01),
        ('üü† CR√çTICA (0.95 - 0.99)', 0.95, 0.99),
        ('üü° MUITO ALTA (0.90 - 0.95)', 0.90, 0.95),
        ('üü¢ ALTA (0.85 - 0.90)', 0.85, 0.90),
    ]
    
    for titulo, min_val, max_val in categorias:
        subset = df_pares[
            (df_pares['correlacao'].abs() >= min_val) & 
            (df_pares['correlacao'].abs() < max_val)
        ]
        
        if len(subset) > 0:
            print(f"\n{titulo} ({len(subset)} pares)")
            print("-"*100)
            
            for idx, row in subset.iterrows():
                feat1 = row['feature_1'][:45]
                feat2 = row['feature_2'][:45]
                corr = row['correlacao']
                
                print(f"  ‚Ä¢ {feat1:47s} ‚ü∑  {feat2:47s}  ‚îÇ  r = {corr:+.4f}")
    
    print("\n" + "="*100)

In [None]:
def calcular_cramer_v(df, col1, col2):
    # Verifica√ß√£o de seguran√ßa: n√£o rodar em colunas com muitos valores √∫nicos (num√©ricas)
    # Se a coluna tiver mais de 100 valores distintos, provavelmente n√£o √© uma categoria √∫til
    distinct_count = df.select(col1).distinct().count()
    if distinct_count > 100:
        print(f"Pulei {col1}: muitos valores √∫nicos ({distinct_count}). Verifique se √© quantitativa.")
        return np.nan

    # Indexa√ß√£o
    idx1, idx2 = col1 + "_idx", col2 + "_idx"
    df_indexed = StringIndexer(inputCol=col1, outputCol=idx1, handleInvalid="skip").fit(df).transform(df)
    df_indexed = StringIndexer(inputCol=col2, outputCol=idx2, handleInvalid="skip").fit(df_indexed).transform(df_indexed)

    # Assembler para o Teste
    assembler = VectorAssembler(inputCols=[idx1], outputCol="features")
    test_data = assembler.transform(df_indexed).select("features", idx2)

    # Chi-Square Test
    res = ChiSquareTest.test(test_data, "features", idx2).head()
    
    # --- CORRE√á√ÉO DO ACESSO AOS ATRIBUTOS ---
    chi2 = float(res.statistics[0]) # Estat√≠stica Chi2 correta
    n = test_data.count()
    
    # Resgate do n√∫mero de categorias pelos metadados ou contagem
    r = df_indexed.select(idx1).distinct().count()
    k = df_indexed.select(idx2).distinct().count()

    if n == 0 or min(r-1, k-1) <= 0: return 0.0
    
    v = np.sqrt(chi2 / (n * min(r-1, k-1)))
    return v

In [None]:
def matriz_cramer_v_spark_optimized(df, colunas_categoricas, figsize=(20, 16)):
    """
    Constr√≥i uma matriz de correla√ß√£o Cramer's V para vari√°veis categ√≥ricas.
    Otimizada para visualiza√ß√£o de at√© 20+ vari√°veis.
    
    Parameters:
    -----------
    df : pyspark.sql.DataFrame
        DataFrame com as vari√°veis categ√≥ricas
    colunas_categoricas : list
        Lista de nomes das colunas categ√≥ricas
    figsize : tuple
        Tamanho da figura (width, height)
    
    Returns:
    --------
    pd.DataFrame : Matriz de correla√ß√£o Cramer's V
    """
    
    n_cols = len(colunas_categoricas)
    matrix = np.ones((n_cols, n_cols))
    
    print(f"üîÑ Iniciando indexa√ß√£o de {n_cols} colunas categ√≥ricas...")
    
    # ‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê
    # 1. INDEXA√á√ÉO EM BATCH (Performance)
    # ‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê
    indexers = [
        StringIndexer(
            inputCol=c, 
            outputCol=c+"_idx", 
            handleInvalid="keep"  # Mant√©m categorias novas como "unknown"
        ) 
        for c in colunas_categoricas
    ]
    
    df_indexed = df
    models = []
    for idx, indexer in enumerate(indexers):
        model = indexer.fit(df_indexed)
        df_indexed = model.transform(df_indexed)
        models.append(model)
    
    print(f"‚úÖ Indexa√ß√£o conclu√≠da!\n")
    
    # ‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê
    # 2. C√ÅLCULO DO CRAMER'S V (Tri√¢ngulo Superior)
    # ‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê
    total_pairs = (n_cols * (n_cols - 1)) // 2
    current_pair = 0
    
    for i in range(n_cols):
        for j in range(i + 1, n_cols):
            current_pair += 1
            col1 = colunas_categoricas[i]
            col2 = colunas_categoricas[j]
            col1_idx = col1 + "_idx"
            col2_idx = col2 + "_idx"
            
            print(f"Calculando: {col1} ‚Üî {col2}")
            
            # Preparar dados para ChiSquareTest
            assembler = VectorAssembler(inputCols=[col1_idx], outputCol="features")
            test_data = assembler.transform(df_indexed).select("features", col2_idx)
            
            # Executar teste Qui-Quadrado
            try:
                res = ChiSquareTest.test(test_data, "features", col2_idx).head()
                chi2 = float(res.statistics[0])
                n = test_data.count()
                
                # N√∫mero de categorias (r e k)
                r = len(models[i].labels)
                k = len(models[j].labels)
                
                # C√°lculo do V de Cramer
                if n > 0 and min(r-1, k-1) > 0:
                    v = np.sqrt(chi2 / (n * min(r-1, k-1)))
                else:
                    v = 0.0
                    
            except Exception as e:
                print(f"  ‚ö†Ô∏è Erro ao calcular {col1} vs {col2}: {e}")
                v = 0.0
                
            # Preencher matriz (sim√©trica)
            matrix[i, j] = v
            matrix[j, i] = v
    
    print(f"\n‚úÖ C√°lculo conclu√≠do!\n")
    
    # ‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê
    # 3. CRIAR DATAFRAME E VISUALIZA√á√ÉO OTIMIZADA
    # ‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê
    cramer_df = pd.DataFrame(
        matrix, 
        index=colunas_categoricas, 
        columns=colunas_categoricas
    )
    
    # Plotar Heatmap Otimizado
    fig, ax = plt.subplots(figsize=figsize)
    
    # M√°scara para o tri√¢ngulo superior (opcional, deixa mais limpo)
    mask = np.triu(np.ones_like(cramer_df, dtype=bool), k=1)
    
    sns.heatmap(
        cramer_df, 
        annot=True,           # Mostrar valores
        fmt='.2f',            # 2 casas decimais
        cmap='RdYlGn_r',      # Colormap: Vermelho (alta correla√ß√£o) -> Verde (baixa)
        vmin=0, 
        vmax=1,
        center=0.5,           # Centro da escala
        square=True,          # C√©lulas quadradas
        linewidths=0.5,       # Linhas entre c√©lulas
        cbar_kws={
            "shrink": 0.8,
            "label": "Cramer's V"
        },
        mask=mask,            # Oculta tri√¢ngulo superior (opcional)
        ax=ax
    )
    
    # Ajustes de visualiza√ß√£o
    plt.title(
        "Matriz de Associa√ß√£o: Cramer's V (Vari√°veis Categ√≥ricas)", 
        fontsize=16, 
        fontweight='bold',
        pad=20
    )
    plt.xlabel("", fontsize=12)
    plt.ylabel("", fontsize=12)
    
    # Rotacionar labels para melhor legibilidade
    plt.xticks(rotation=45, ha='right', fontsize=10)
    plt.yticks(rotation=0, fontsize=10)
    
    plt.tight_layout()
    plt.show()
    
    return cramer_df

In [None]:
def matriz_cramer_v_spark(df, colunas_categoricas):
    """
    Constr√≥i uma matriz de correla√ß√£o Cramer's V para uma lista de colunas.
    """
    n_cols = len(colunas_categoricas)
    # Inicializa a matriz com 1s na diagonal (correla√ß√£o de uma var com ela mesma)
    matrix = np.ones((n_cols, n_cols))
    
    print(f"Iniciando indexa√ß√£o de {n_cols} colunas...")
    
    # 1. Indexar todas as colunas de uma vez para ganhar performance
    indexers = [StringIndexer(inputCol=c, outputCol=c+"_idx", handleInvalid="skip") for c in colunas_categoricas]
    
    # Aplicamos a indexa√ß√£o em cadeia
    df_indexed = df
    models = []
    for indexer in indexers:
        model = indexer.fit(df_indexed)
        df_indexed = model.transform(df_indexed)
        models.append(model)
    
    # 2. Iterar sobre os pares (apenas o tri√¢ngulo superior para economizar processamento)
    for i in range(n_cols):
        for j in range(i + 1, n_cols):
            col1 = colunas_categoricas[i]
            col2 = colunas_categoricas[j]
            col1_idx = col1 + "_idx"
            col2_idx = col2 + "_idx"
            
            print(f"Calculando Cramer's V: {col1} vs {col2}...")
            
            # Preparar dados para o ChiSquareTest
            assembler = VectorAssembler(inputCols=[col1_idx], outputCol="features")
            test_data = assembler.transform(df_indexed).select("features", col2_idx)
            
            # Executar teste
            res = ChiSquareTest.test(test_data, "features", col2_idx).head()
            chi2 = float(res.statistics[0])
            n = test_data.count()
            
            # N√∫mero de categorias de cada coluna (r e k) extra√≠dos dos modelos indexadores
            r = len(models[i].labels)
            k = len(models[j].labels)
            
            # C√°lculo do V de Cramer
            if n > 0 and min(r-1, k-1) > 0:
                v = np.sqrt(chi2 / (n * min(r-1, k-1)))
            else:
                v = 0.0
                
            # Preencher a matriz de forma sim√©trica
            matrix[i, j] = v
            matrix[j, i] = v
            
    # Criar DataFrame Pandas para facilitar a visualiza√ß√£o
    cramer_df = pd.DataFrame(matrix, index=colunas_categoricas, columns=colunas_categoricas)
    
    # Plotar o Heatmap
    plt.figure(figsize=(10, 8))
    sns.heatmap(cramer_df, annot=True, cmap='YlGnBu', fmt='.2f', vmin=0, vmax=1)
    plt.title("Matriz de Associa√ß√£o: Cramer's V (Vari√°veis Categ√≥ricas)")
    plt.show()
    
    return cramer_df

In [None]:
def identificar_outliers(df, coluna, negativo=True, minimo_definido=0):
    # Calcula os quantis de forma aproximada para performance
    quantis = df.approxQuantile(coluna, [0.25, 0.75], 0.05)
    q1, q3 = quantis[0], quantis[1]
    iqr = q3 - q1

    limite_superior = q3 + 1.5 * iqr
    
    # Se negativo for True, permite valores negativos no limite inferior --> dependendo da vari√°vel
    if negativo:
        limite_inferior = q1 - 1.5 * iqr
    else:
        limite_inferior = max(0, (q1 - 1.5 * iqr))

    if minimo_definido != 0:
        limite_inferior = minimo_definido
    
    outliers = df.filter((F.col(coluna) < limite_inferior) | (F.col(coluna) > limite_superior))
    print(f"Vari√°vel {coluna}:")
    print(f"Limites: [{limite_inferior}, {limite_superior}]")
    print(f"Total de outliers (#): {outliers.count()}")
    print(f"Total de outliers (%): {(outliers.count() / df.count()) * 100:.2f}%")

In [None]:
def plot_tendencia_temporal(df, col_valor, aggregation="mean", col_safra="safra", categories=None):
    """
    df: Spark DataFrame
    col_valor: Vari√°vel num√©rica para calcular a m√©dia
    col_safra: Coluna de tempo (ex: 'safra')
    categories: Nome da coluna categ√≥rica para quebrar as linhas (ex: 'gender') ou None
    """
    # Definir a lista de colunas para o agrupamento
    group_cols = [col_safra]
    if categories:
        group_cols.append(categories)
    
    # Agrega√ß√£o no Spark por safra (e categoria, se houver) e calculamos a m√©dia. Transformar em Pandas para plotar
    if aggregation == "mean":
        stats_safra = (df.groupBy(group_cols)
                        .agg(F.mean(col_valor).alias(f"{aggregation}"))
                        .orderBy(col_safra)
                        .toPandas())
    elif aggregation == "sum":
        stats_safra = (df.groupBy(group_cols)
                        .agg(F.sum(col_valor).alias(f"{aggregation}"))
                        .orderBy(col_safra)
                        .toPandas())
    elif aggregation == "max":
        stats_safra = (df.groupBy(group_cols)
                        .agg(F.max(col_valor).alias(f"{aggregation}"))
                        .orderBy(col_safra)
                        .toPandas())
    elif aggregation == "min":
        stats_safra = (df.groupBy(group_cols)
                        .agg(F.min(col_valor).alias(f"{aggregation}"))
                        .orderBy(col_safra)
                        .toPandas())
    elif aggregation == "med":
        stats_safra = (df.groupBy(group_cols)
                        .agg(F.expr("percentile_approx({}, 0.5)".format(col_valor)).alias(f"{aggregation}"))
                        .orderBy(col_safra)
                        .toPandas())
    
    # Tratar a safra como categoria (texto) 
    stats_safra[col_safra] = stats_safra[col_safra].astype(str)
    
    # 4. Configura√ß√£o do Plot
    plt.figure(figsize=(14, 6))
    
    # Se 'categories' for informada, o Seaborn cria uma linha para cada categoria usando 'hue'
    sns.lineplot(
        data=stats_safra, 
        x=col_safra, 
        y=f"{aggregation}", 
        hue=categories if categories else None, 
        marker='o', 
        sort=False # Garante que a ordem das safras do Spark seja mantida
    )
    
    # Ajustes finos de visualiza√ß√£o
    titulo = f"{aggregation} de {col_valor} por {col_safra}"
    if categories:
        titulo += f" (Quebrado por {categories})"
        plt.legend(title=categories, bbox_to_anchor=(1.02, 1), loc='upper left')
        
    plt.title(titulo)
    plt.xticks(rotation=45)
    plt.grid(True, axis='y', linestyle='--', alpha=0.7)
    plt.tight_layout()
    plt.show()

In [None]:
def integridade_entre_bases(df_base, df_comparacao, nome_relacao, chave=["msno", "safra"]):
    """
    Rastreia a perda de dados entre tabelas (ex: membros vs logs).
    Essencial para justificar a volumetria final do modelo.
    """
    total_base = df_base.select(chave).distinct().count()
    com_match = df_base.join(df_comparacao, chave, "inner").select(chave).distinct().count()
    perda = ((total_base - com_match) / total_base) * 100
    
    print(f"--- Integridade: {nome_relacao} ---")
    print(f"Registros na base: {total_base} | Registros correspondentes: {com_match}")
    print(f"Taxa de Perda: {perda:.2f}%\n")

In [None]:
def plot_boxplot(df, coluna_categorica, coluna_alvo, agrupar_por_safra=False, table=False):
    """
    Plota boxplots utilizando estat√≠sticas calculadas no Spark, incluindo a m√©dia.
    """
    
    # 1. Definimos as colunas de agrupamento
    cols_base = ['safra'] if agrupar_por_safra else []

    for col_cat in coluna_categorica:
        print(f"Processando estat√≠sticas para: {col_cat}...")
        
        # 2. Calculamos os quartis E a m√©dia no Spark
        df_stats = (df
                    .groupBy(cols_base + [col_cat])
                    .agg(
                        F.percentile_approx(coluna_alvo, [0.0, 0.25, 0.5, 0.75, 1.0], 10000).alias("stats"),
                        F.mean(coluna_alvo).alias("avg") # <-- Adicionado m√©dia
                    )
                    .select(*cols_base, col_cat,
                            F.col("stats")[0].alias("min"),
                            F.col("stats")[1].alias("q1"),
                            F.col("stats")[2].alias("med"),
                            F.col("stats")[3].alias("q3"),
                            F.col("stats")[4].alias("max"),
                            F.col("avg").alias("mean")) # <-- Selecionando a m√©dia
                    .toPandas())

        # 3. L√≥gica de Plotagem
        if agrupar_por_safra:
            safras = sorted(df_stats['safra'].unique())
            for s in safras:
                df_safra = df_stats[df_stats['safra'] == s]
                _desenhar_boxplot_estatistico(df_safra, col_cat, coluna_alvo, f"Safra {s}", show_table=table)
        else:
            _desenhar_boxplot_estatistico(df_stats, col_cat, coluna_alvo, "Vis√£o Consolidada", show_table=table)

def _desenhar_boxplot_estatistico(df_plot, col_cat, col_alvo, subtitulo, show_table=False):
    stats_list = []
    df_plot = df_plot.sort_values(by=col_cat)

    for _, row in df_plot.iterrows():
        stats_list.append({
            "label": str(row[col_cat]),
            "whislo": row["min"], 
            "q1": row["q1"],
            "med": row["med"],
            "q3": row["q3"],
            "whishi": row["max"],
            "mean": row["mean"] # <-- Matplotlib usa essa chave para desenhar a m√©dia
        })

    if not stats_list:
        return

    # Renderiza√ß√£o do Gr√°fico
    plt.figure(figsize=(12, 6))
    ax = plt.gca()
    # showmeans=True ativa a exibi√ß√£o do marcador da m√©dia
    ax.bxp(stats_list, showfliers=False, showmeans=True, 
           meanprops={"marker":"o", "markerfacecolor":"white", "markeredgecolor":"black", "markersize":"6"}) 
    
    plt.title(f"Boxplot: {col_alvo} por {col_cat} ({subtitulo})")
    plt.ylabel(col_alvo)
    plt.xticks(rotation=45)
    plt.grid(axis='y', linestyle='--', alpha=0.7)
    plt.tight_layout()
    plt.show()

    # Renderiza√ß√£o da Tabela
    if show_table:
        print(f"\n--- Estat√≠sticas: {col_cat} ({subtitulo}) ---")
        # Adicionado "mean" na visualiza√ß√£o da tabela
        cols_tabela = [col_cat, "min", "q1", "med", "mean", "q3", "max"]
        df_resumo = df_plot[cols_tabela].copy()
        
        try:
            from IPython.display import display
            display(df_resumo)
        except ImportError:
            print(df_resumo.to_string(index=False))
        print("\n" + "="*50 + "\n")

## 3.2. Tratamento de dados

### 3.2.1. Agrupamento

In [None]:
def agregar_logs(df_logs):
    """
    Agrega a tabela de logs no n√≠vel (msno, safra).
    
    Premissa: A base de logs j√° vem agregada por m√™s, mas esta fun√ß√£o
    garante robustez caso surjam duplicatas em produ√ß√£o.
    
    Regra de agrega√ß√£o:
    - Vari√°veis num√©ricas de uso: SUM (acumula√ß√£o de comportamento)
    
    Args:
        df_logs: DataFrame Spark com logs brutos
        
    Returns:
        DataFrame agregado no n√≠vel (msno, safra)
    """
    
    # Colunas esperadas (contrato de dados)
    expected_cols = ["msno", "safra", "num_25", "num_50", "num_75", "num_985", "num_100", "num_unq", "total_secs"]
    
    # Selecionar apenas colunas esperadas (prote√ß√£o contra novas features)
    df_logs = df_logs.select(*expected_cols)
    
    # Verificar duplicatas
    duplicates = df_logs.groupBy("msno", "safra").count().filter("count > 1")
    
    if duplicates.count() > 0:
        print(f"‚ö†Ô∏è LOGS: {duplicates.count()} duplicatas detectadas. Aplicando agrega√ß√£o...")
        
        df_logs_agg = (
            df_logs
            .groupBy("msno", "safra")
            .agg(
                # Vari√°veis de uso: acumula√ß√£o (SUM)
                F.sum("num_25").alias("num_25"),
                F.sum("num_50").alias("num_50"),
                F.sum("num_75").alias("num_75"),
                F.sum("num_985").alias("num_985"),
                F.sum("num_100").alias("num_100"),
                F.sum("num_unq").alias("num_unq"),
                F.sum("total_secs").alias("total_secs")))
        
        print("‚úÖ LOGS: Agrega√ß√£o conclu√≠da.")
        return df_logs_agg
    
    else:
        print("‚úÖ LOGS: Base j√° est√° no n√≠vel (msno, safra). Nenhuma agrega√ß√£o necess√°ria.")
        return df_logs

In [None]:
def agregar_transactions(df_transactions):
    """
    Agrega a tabela de transa√ß√µes no n√≠vel (msno, safra).
    
    Premissa: Podem existir m√∫ltiplas transa√ß√µes por cliente no mesmo m√™s
    (upgrades, downgrades, reprocessamentos).
    
    Regras de agrega√ß√£o:
    - Flags: MAX (ocorreu ao menos uma vez?)
    - Valores monet√°rios: SUM (acumula√ß√£o financeira)
    - Atributos de plano e categ√≥ricas: LAST (√∫ltimo estado observado)
    - Datas: MAX para transaction_date, LAST para membership_expire_date
    
    Args:
        df_transactions: DataFrame Spark com transa√ß√µes brutas
        
    Returns:
        DataFrame agregado no n√≠vel (msno, safra)
    """
    
    # Colunas esperadas (contrato de dados)
    expected_cols = ["msno", "safra", "payment_method_id", "payment_plan_days", "plan_list_price",
        "actual_amount_paid", "is_auto_renew", "transaction_date", "membership_expire_date", "is_cancel"]
    
    # Selecionar apenas colunas esperadas (prote√ß√£o contra novas features)
    df_transactions = df_transactions.select(*expected_cols)
    
    # Verificar duplicatas
    duplicates = df_transactions.groupBy("msno", "safra").count().filter("count > 1")
    
    if duplicates.count() > 0:
        print(f"‚ö†Ô∏è TRANSACTIONS: {duplicates.count()} duplicatas detectadas. Aplicando agrega√ß√£o...")
        
        # Criar ranking por data (√∫ltima transa√ß√£o = 1)
        w_order = Window.partitionBy("msno", "safra").orderBy(F.col("transaction_date").desc())
        df_transactions = df_transactions.withColumn("_rank", F.row_number().over(w_order))
        
        df_tx_agg = (
            df_transactions
            .groupBy("msno", "safra")
            .agg(
                # Flags: MAX (ocorr√™ncia)
                F.max("is_auto_renew").alias("is_auto_renew"),
                F.max("is_cancel").alias("is_cancel"),
                
                # Valores monet√°rios: SUM (acumula√ß√£o)
                F.sum("actual_amount_paid").alias("actual_amount_paid"),
                
                # Atributos de plano: LAST (√∫ltimo estado)
                F.max(F.when(F.col("_rank") == 1, F.col("payment_method_id"))).alias("payment_method_id"),
                F.max(F.when(F.col("_rank") == 1, F.col("payment_plan_days"))).alias("payment_plan_days"),
                F.max(F.when(F.col("_rank") == 1, F.col("plan_list_price"))).alias("plan_list_price"),
                
                # Datas
                F.max("transaction_date").alias("transaction_date"),
                F.max(F.when(F.col("_rank") == 1, F.col("membership_expire_date"))).alias("membership_expire_date")
            ))
        
        print("‚úÖ TRANSACTIONS: Agrega√ß√£o conclu√≠da.")
        return df_tx_agg
    
    else:
        print("‚úÖ TRANSACTIONS: Base j√° est√° no n√≠vel (msno, safra). Nenhuma agrega√ß√£o necess√°ria.")
        return df_transactions

In [None]:
def agregar_members(df_members):
    """
    Agrega a tabela de membros no n√≠vel (msno, safra).
    
    Premissa: Atributos de membros s√£o majoritariamente est√°ticos, mas podem
    existir m√∫ltiplas linhas por cliente em casos de mudan√ßa de cidade, 
    reativa√ß√£o, ou inconsist√™ncias de dados.
    
    Regras de agrega√ß√£o:
    - Atributos demogr√°ficos: LAST (√∫ltimo estado observado)
    - Flag de status: MAX (esteve ativo ao menos uma vez?)
    - Data de registro: MIN (primeira ocorr√™ncia)
    
    Args:
        df_members: DataFrame Spark com dados de membros
        
    Returns:
        DataFrame agregado no n√≠vel (msno, safra)
    """
    
    # Colunas esperadas (contrato de dados)
    expected_cols = ["msno", "safra","registration_init_time", "city", "bd", "gender", "registered_via", "is_ativo"]
    
    # Selecionar apenas colunas esperadas (prote√ß√£o contra novas features)
    df_members = df_members.select(*expected_cols)
    
    # Converter safra para integer (est√° como string no schema)
    df_members = df_members.withColumn("safra", F.col("safra").cast("integer"))
    
    # Verificar duplicatas
    duplicates = df_members.groupBy("msno", "safra").count().filter("count > 1")
    
    if duplicates.count() > 0:
        print(f"‚ö†Ô∏è MEMBERS: {duplicates.count()} duplicatas detectadas. Aplicando agrega√ß√£o...")
        
        # Criar ranking por data de registro (mais recente = 1)
        # Se n√£o houver coluna de atualiza√ß√£o, usamos registration_init_time como proxy
        w_order = Window.partitionBy("msno", "safra").orderBy(F.col("registration_init_time").desc())
        df_members = df_members.withColumn("_rank", F.row_number().over(w_order))
        
        df_members_agg = (
            df_members
            .groupBy("msno", "safra")
            .agg(
                # Data de registro: MIN (primeira ocorr√™ncia)
                F.min("registration_init_time").alias("registration_init_time"),
                
                # Atributos demogr√°ficos: LAST (√∫ltimo estado)
                F.max(F.when(F.col("_rank") == 1, F.col("city"))).alias("city"),
                F.max(F.when(F.col("_rank") == 1, F.col("bd"))).alias("bd"),
                F.max(F.when(F.col("_rank") == 1, F.col("gender"))).alias("gender"),
                F.max(F.when(F.col("_rank") == 1, F.col("registered_via"))).alias("registered_via"),
                
                # Flag de status: MAX (esteve ativo?)
                F.max("is_ativo").alias("is_ativo")))
        
        print("‚úÖ MEMBERS: Agrega√ß√£o conclu√≠da.")
        return df_members_agg
    
    else:
        print("‚úÖ MEMBERS: Base j√° est√° no n√≠vel (msno, safra). Nenhuma agrega√ß√£o necess√°ria.")
        return df_members

### 3.2.2. Outliers e Categoriza√ß√£o Estat√≠stica

In [None]:
def aplicar_winsorizacao(df, colunas, p_inf=0.01, p_sup=0.99):
    for col in colunas:
        # Calcula os limites baseados em percentis
        limites = df.approxQuantile(col, [p_inf, p_sup], 0.001) # 0.001 √© a precis√£o/erro aceit√°vel
        inf, sup = limites[0], limites[1]
        
        print(f"Coluna {col}: Limite Inferior={inf}, Limite Superior={sup}")
        
        # Aplica o Capping
        df = df.withColumn(f"{col}_win", 
            F.when(F.col(col) < inf, inf)
             .when(F.col(col) > sup, sup)
             .otherwise(F.col(col))
        )
    return df

In [None]:
def segment_by_percentile(df, col_to_group, table_name, num_buckets=4):
    """
    Categoriza uma vari√°vel dinamicamente com base em percentis calculados no Spark.
    
    Par√¢metros:
    - df: DataFrame do Spark.
    - col_to_group: Nome da coluna num√©rica a ser segmentada (ex: 'total_plays').
    - table_name: Nome da tabela de origem para compor a flag (ex: 'logs').
    - num_buckets: N√∫mero de divis√µes desejadas. 
                   4 para Quartis (25%, 50%, 75%, 100%)
                   10 para Decis√µes (10%, 20%, ..., 100%)
    
    Vari√°veis esperadas no DataFrame:
    - f"flag_has_{table_name}": Coluna bin√°ria (0/1) que indica presen√ßa de dados.
    - {col_to_group}: A vari√°vel num√©rica que ser√° processada.
    """
    
    # 1. Definimos a coluna de flag
    flag_col = f"flag_has_{table_name}"
    
    # 2. Calculo dinamico dos percentis
    # Criamos uma lista de probabilidades: ex para 4 buckets -> [0.25, 0.50, 0.75, 1.0]
    probabilities = [i / num_buckets for i in range(1, num_buckets + 1)]
    
    # Coletamos os valores de corte (stats)
    # Nota: Filtramos apenas onde a flag √© 1 para n√£o enviesar os percentis com zeros/nulos
    cut_off_points = (df
                      .filter(F.col(flag_col).isin(1))
                      .agg(F.percentile_approx(col_to_group, probabilities).alias("pts"))
                      .collect()[0]["pts"])
    
    # 3. A l√≥gica do .when dinamico, comecando com a condicao base para desconhecidos/inativos
    case_expression = F.when(F.col(flag_col).isin(0), "unknown")
    
    # Iteramos sobre os pontos de corte para criar as faixas
    # Ex: para Quartis, teremos Tier 1 (at√© 25%), Tier 2 (at√© 50%), etc.
    for i, point in enumerate(cut_off_points):
        tier_label = f"tier_{i+1}"
        # A primeira faixa pega do menor valor at√© o primeiro ponto
        if i == 0:
            case_expression = case_expression.when(F.col(col_to_group) <= point, tier_label)
        else:
            # As demais pegam entre o ponto anterior e o atual
            case_expression = case_expression.when(
                (F.col(col_to_group) > cut_off_points[i-1]) & (F.col(col_to_group) <= point), 
                tier_label)
            
    # Caso escape de alguma l√≥gica (fallback)
    case_expression = case_expression.otherwise(f"tier_{num_buckets}")

    # Retornamos o dataframe com a nova coluna
    return df.withColumn(f"{col_to_group}_group", case_expression)

### 3.2.3. Tend√™ncia - Janela temporal (3 e 6 meses)

In [None]:
def features_de_tendencia_num(df, variables, id_col="msno", time_col="safra", windows=[3, 6], keep_intermediate=False):
    original_cols = df.columns
    generated_cols = []

    for var in variables:
        # manter raw
        raw_col = f"{var}_raw"
        df = df.withColumn(raw_col, F.col(var))
        generated_cols.append(raw_col)

        for n in windows:
            w = Window.partitionBy(id_col).orderBy(time_col)

            # lags
            lags = []
            for i in range(n):
                lag_col = f"{var}_lag_{i}"
                df = df.withColumn(lag_col, F.lag(var, i).over(w))
                lags.append(lag_col)

            # contadores
            cnt_null = f"{var}_cnt_null_{n}"
            cnt_zero = f"{var}_cnt_zero_{n}"

            df = (df
                .withColumn(cnt_null, sum(F.col(c).isNull().cast("int") for c in lags))
                .withColumn(cnt_zero, sum((F.col(c) == 0).cast("int") for c in lags)))

            ref = f"{var}_lag_0"
            arr = f"array({','.join(lags)})"

            # m√©tricas raw
            mean_raw = f"{var}_mean_{n}_raw"
            min_raw = f"{var}_min_{n}_raw"
            max_raw = f"{var}_max_{n}_raw"

            df = (df
                .withColumn(
                    mean_raw,
                    F.expr(f"""
                        aggregate(filter({arr}, x -> x is not null), 0D, (acc, x) -> acc + x) / size(filter({arr}, x -> x is not null))
                        """))
                .withColumn(min_raw, F.expr(f"array_min(filter({arr}, x -> x is not null))"))
                .withColumn(max_raw, F.expr(f"array_max(filter({arr}, x -> x is not null))")))

            # m√©tricas finais (com sentinela)
            for metric, raw_metric in zip(
                ["mean", "min", "max"],
                [mean_raw, min_raw, max_raw]
            ):
                final_col = f"{var}_{metric}_{n}"
                generated_cols.append(final_col)

                df = df.withColumn(final_col,
                    F.when(F.col(cnt_null) == n, F.lit(-99998))
                     .when(F.col(cnt_zero) == n, F.lit(-99999))
                     .when((F.col(ref) == 0) & (F.col(cnt_null) < n), F.lit(-99995))
                     .when((F.col(ref).isNotNull()) & (F.col(cnt_null) == n - 1), F.lit(-99996))
                     .when((F.col(ref).isNotNull()) & (F.col(cnt_zero) == n - 1), F.lit(-99997))
                     .otherwise(F.col(raw_metric)))

            # raz√µes
            for metric in ["mean", "min", "max"]:
                denom = f"{var}_{metric}_{n}"
                ratio = f"{var}_ratio_ref_{metric}_{n}"
                generated_cols.append(ratio)

                df = df.withColumn(ratio,
                    F.when(F.col(denom) <= 0, F.col(denom))
                    .otherwise(F.col(ref) / F.col(denom)))

    # =========================
    # CONTROLE DE SA√çDA
    # =========================
    if not keep_intermediate:
        final_cols = list(dict.fromkeys(
            original_cols + generated_cols))
        df = df.select(final_cols)

    return df

In [None]:
def features_de_tendencia_cat(df, flags, id_col="msno", time_col="safra", windows=[3, 6]):
    for flag in flags:
        df = df.withColumn(f"{flag}_raw", F.col(flag))

        for n in windows:
            w = Window.partitionBy(id_col).orderBy(time_col)

            lags = []
            for i in range(n):
                lag_col = f"{flag}_lag_{i}"
                df = df.withColumn(lag_col, F.lag(flag, i).over(w))
                lags.append(lag_col)

            arr = f"array({','.join(lags)})"

            df = (df
                .withColumn(f"{flag}_sum_{n}",
                    F.expr(f"aggregate(filter({arr}, x -> x is not null), 0, (acc, x) -> acc + x)"))
                .withColumn(f"{flag}_mean_{n}", F.col(f"{flag}_sum_{n}") / F.lit(n))
                .withColumn(f"{flag}_max_{n}", F.expr(f"array_max(filter({arr}, x -> x is not null))")))

    return df

#### Objetivo geral
O objetivo desta fun√ß√£o √© capturar **comportamentos temporais dos usu√°rios**, indo al√©m do valor pontual do m√™s de refer√™ncia.  
Para cada cliente (`msno`) e safra (`yyyyMM`), s√£o constru√≠das vari√°veis que representam:
- o **n√≠vel atual** de uma m√©trica,
- o **hist√≥rico recente** dessa m√©trica,
- e a **rela√ß√£o entre o comportamento atual e o passado**.

Essa abordagem permite ao modelo identificar:
- padr√µes de crescimento ou queda,
- usu√°rios novos vs recorrentes,
- mudan√ßas abruptas de comportamento,
- aus√™ncia estrutural de informa√ß√£o.

---

#### Estrutura geral da fun√ß√£o
A fun√ß√£o recebe:
- uma lista de vari√°veis cont√≠nuas (ex: `num_100`, `total_secs`, `avg_secs_per_play`);
- janelas temporais fixas (`n = 3` e `n = 6` meses);
- e gera automaticamente **todas as transforma√ß√µes** para todas as vari√°veis listadas.

Para cada vari√°vel e cada janela temporal, s√£o criados:
1. valores defasados (lags),
2. m√©tricas estat√≠sticas hist√≥ricas,
3. raz√µes entre o valor atual e o hist√≥rico,
4. vers√µes com e sem c√≥digos sentinela,
5. mantendo sempre o valor original (raw).

---

#### Lags temporais
Para cada vari√°vel `X`, s√£o criados:
- `X_lag_0`: valor no m√™s de refer√™ncia
- `X_lag_1`: valor no m√™s anterior
- ...
- `X_lag_(n-1)`: valor n‚àí1 meses atr√°s

Esses lags formam a base para todas as transforma√ß√µes seguintes.

---

#### Vari√°veis `*_raw`
As vari√°veis com sufixo `_raw` representam o **valor matem√°tico puro**, sem qualquer regra sem√¢ntica adicional.

Exemplos:
- `num_100_raw`
- `num_100_mean_6_raw`
- `total_secs_max_3_raw`

Caracter√≠sticas:
- ignoram valores nulos quando poss√≠vel;
- n√£o utilizam c√≥digos sentinela;
- podem assumir valores nulos ou zero naturalmente.

Uso principal:
- regress√£o linear,
- an√°lises estat√≠sticas,
- inspe√ß√£o e debug.

---

#### M√©tricas hist√≥ricas criadas
Para cada vari√°vel `X` e janela `n`, s√£o criadas:

- `X_mean_n_raw`: m√©dia dos √∫ltimos `n` meses
- `X_min_n_raw`: m√≠nimo dos √∫ltimos `n` meses
- `X_max_n_raw`: m√°ximo dos √∫ltimos `n` meses

Essas m√©tricas descrevem o **n√≠vel t√≠pico**, o **pior cen√°rio** e o **melhor cen√°rio recente** do usu√°rio.

---

#### Vari√°veis sem `_raw` (com sem√¢ntica)
As vari√°veis sem o sufixo `_raw` aplicam **regras sem√¢nticas** para diferenciar situa√ß√µes estruturalmente distintas, que numericamente poderiam parecer iguais.

Essas vari√°veis utilizam **c√≥digos sentinela negativos**, que carregam significado comportamental:

| C√≥digo | Significado |
|------|-------------|
| `-99998` | N√£o h√° registros em nenhum dos √∫ltimos `n` meses |
| `-99999` | Todos os valores nos √∫ltimos `n` meses s√£o zero |
| `-99995` | Valor atual √© zero, mas havia hist√≥rico v√°lido |
| `-99996` | Valor atual existe, mas hist√≥rico √© todo nulo |
| `-99997` | Valor atual existe, mas hist√≥rico √© todo zero |

Esses c√≥digos permitem ao modelo (especialmente √°rvores):
- distinguir aus√™ncia de uso vs aus√™ncia de dados,
- identificar usu√°rios novos,
- detectar interrup√ß√µes de comportamento.

---

#### Raz√µes (vari√°veis de mudan√ßa de comportamento)
Para cada m√©trica hist√≥rica, s√£o criadas raz√µes do tipo:

- `X_ratio_ref_mean_n`
- `X_ratio_ref_min_n`
- `X_ratio_ref_max_n`

Essas vari√°veis comparam o valor atual (`lag_0`) com o hist√≥rico recente.

Interpreta√ß√£o:
- valor ‚âà 1 ‚Üí comportamento est√°vel
- valor > 1 ‚Üí aumento recente
- valor < 1 ‚Üí queda recente

Caso o denominador seja inv√°lido (zero, nulo ou sentinela), a raz√£o herda o c√≥digo sentinela, evitando divis√µes inv√°lidas.

---

#### Separa√ß√£o entre estat√≠stica e sem√¢ntica
Um ponto central do design √© **n√£o misturar estat√≠stica com sem√¢ntica**.

Por isso:
- vari√°veis `_raw` s√£o mantidas limpas e cont√≠nuas;
- vari√°veis sem `_raw` carregam contexto comportamental.

Essa separa√ß√£o permite:
- uso seguro em regress√£o linear (via `_raw`);
- explora√ß√£o rica de padr√µes em modelos baseados em √°rvore (via sentinelas).

---

#### Vari√°veis de flags (bin√°rias)
Para vari√°veis categ√≥ricas/bin√°rias (ex: `is_cancel`, `is_auto_renew`), s√£o criadas transforma√ß√µes espec√≠ficas:

- `flag_sum_n`: n√∫mero de ocorr√™ncias nos √∫ltimos `n` meses
- `flag_mean_n`: frequ√™ncia relativa
- `flag_max_n`: ocorreu pelo menos uma vez

Essas vari√°veis capturam **persist√™ncia, recorr√™ncia e reincid√™ncia** de eventos.

---

#### Benef√≠cios da abordagem
Essa arquitetura de features:
- reduz depend√™ncia de um √∫nico m√™s,
- melhora estabilidade temporal,
- captura mudan√ßas de comportamento,
- √© robusta a dados faltantes,
- funciona bem tanto para regress√£o quanto para √°rvores.

Al√©m disso, permite sele√ß√£o posterior de vari√°veis sem refazer o pipeline, mantendo rastreabilidade e flexibilidade.

---

#### Observa√ß√£o final
Nem todas as vari√°veis criadas necessariamente entrar√£o no modelo final.  
A fun√ß√£o foi desenhada para **gerar um espa√ßo rico de candidatos**, permitindo que a sele√ß√£o seja feita de forma emp√≠rica, baseada em valida√ß√£o e performance.
