In [None]:
from sklearn.feature_selection import mutual_info_regression
from npeet import entropy_estimators as ee
from collections import Counter, defaultdict
from scipy.stats import gaussian_kde
import pandas as pd
import numpy as np
import time

np.random.seed(0)

# --- Configurações Iniciais ---
n_samples_global = int(1e4) # Amostras para testes com dados uniformes
noise_scale_global = 1.5
k_values_knn = [3, 5, 10]
n_bins_values_binning = [10, 20, 40]
n_jobs_values_sklearn = [1]
n_features_to_test_uniform = [1, 3, 5] # Para o loop original com dados uniformes
n_features_to_test_gaussian = [1, 2] # Número de features em X para o teste Gaussiano
n_samples_gaussian_analytical = int(5e4) # Amostras para o teste Gaussiano
base_log_manual = np.e

In [None]:
overall_results_list = []
#==============================================================================
# TESTE ANALÍTICO COM DADOS GAUSSIANOS (X MULTIVARIADO, Y UNIVARIADO)
#==============================================================================
print("\n--- INICIANDO TESTE ANALÍTICO COM DADOS GAUSSIANOS (X MULTIVARIADO) ---")

for n_feat_x_analytical in n_features_to_test_gaussian:
    print(f"\n--- Teste Gaussiano com N_Features_X = {n_feat_x_analytical} ---")
    dim_Y_analytical = 1
    total_dims = n_feat_x_analytical + dim_Y_analytical

    # Construir a matriz de covariância Sigma
    # Sigma_XX: Identidade (features X_i são N(0,1) e independentes entre si)
    sigma_xx = np.eye(n_feat_x_analytical)
    # Sigma_YY: Variância de Y (N(0,1))
    sigma_yy_val = 1.0
    sigma_yy = np.array([[sigma_yy_val]])

    # Sigma_XY: Covariâncias entre X_i e Y
    # Vamos criar uma correlação modesta para cada X_i com Y, garantindo Sigma > 0
    # Ex: cov(X_i, Y) = c / sqrt(n_feat_x_analytical) para que sum(cov^2) não exploda
    # Para simplicidade, vamos definir uma correlação alvo para a primeira feature X1 com Y
    # e correlações menores ou zero para as outras, garantindo SPD.
    
    target_overall_correlation_strength = 0.6 # Ajuste este valor
    sigma_xy = np.zeros((n_feat_x_analytical, dim_Y_analytical))
    
    # Exemplo: correlação apenas com a primeira feature de X
    # sigma_xy[0, 0] = target_overall_correlation_strength 
    # Para garantir que a soma dos quadrados das correlações seja < 1 se Sigma_XX=I, Sigma_YY=1:
    # Distribuir a "força de correlação"
    # Se n_feat_x_analytical = 1, rho = target_overall_correlation_strength
    # Se > 1, podemos ter rho_i = target_overall_correlation_strength / sqrt(n_feat_x_analytical)
    # ou definir manualmente para controle
    
    if n_feat_x_analytical == 1:
        sigma_xy[0,0] = target_overall_correlation_strength
    elif n_feat_x_analytical == 2:
        sigma_xy[0,0] = 0.5 # corr(X1, Y)
        sigma_xy[1,0] = 0.3 # corr(X2, Y) -> 0.5^2 + 0.3^2 = 0.25 + 0.09 = 0.34 < 1
    elif n_feat_x_analytical == 3:
        sigma_xy[0,0] = 0.4
        sigma_xy[1,0] = 0.3
        sigma_xy[2,0] = 0.2 # 0.16 + 0.09 + 0.04 = 0.29 < 1
    else: # Default para mais features
         for i in range(n_feat_x_analytical):
            sigma_xy[i,0] = (target_overall_correlation_strength / np.sqrt(n_feat_x_analytical)) * (0.8 + 0.4*np.random.rand()) # Adiciona alguma variação


    # Montar a matriz de covariância completa Sigma
    # Sigma = [[Sigma_XX, Sigma_XY],
    #          [Sigma_YX, Sigma_YY]]
    sigma_yx = sigma_xy.T
    top_block = np.hstack((sigma_xx, sigma_xy))
    bottom_block = np.hstack((sigma_yx, sigma_yy))
    cov_matrix_full_gaussian = np.vstack((top_block, bottom_block))

    # Checar se é positiva definida (Cholesky falhará se não for)
    try:
        np.linalg.cholesky(cov_matrix_full_gaussian)
    except np.linalg.LinAlgError:
        print(f"AVISO: Matriz de covariância não é positiva definida para N_Features_X = {n_feat_x_analytical}. Pulando este caso.")
        # Pode ser necessário ajustar os valores em sigma_xy se isso acontecer
        # Exemplo alternativo mais robusto para Sigma_XY que garante SPD:
        # A = np.random.rand(total_dims, total_dims)
        # cov_matrix_full_gaussian = np.dot(A, A.transpose()) # Garante SPD
        # # E então normalizar para ter variâncias unitárias nas diagonais se desejado,
        # # e extrair/definir as correlações. Isso é mais complexo.
        # # Por ora, a construção manual de sigma_xy acima deve funcionar para os valores dados.
        continue


    mean_full_gaussian = np.zeros(total_dims)
    data_full_gaussian = np.random.multivariate_normal(mean_full_gaussian, cov_matrix_full_gaussian, n_samples_gaussian_analytical)

    X_gaussian_analytical = data_full_gaussian[:, :n_feat_x_analytical]
    Y_gaussian_analytical = data_full_gaussian[:, n_feat_x_analytical] # Y é a última coluna, 1D
    Y_gaussian_analytical_reshaped = Y_gaussian_analytical.reshape(-1, 1) # Para NPEET

    # Calcular MI Teórica I(X;Y)
    det_sigma_xx = np.linalg.det(sigma_xx)
    det_sigma_yy = sigma_yy_val # Já que Y é 1D, |Sigma_YY| = Var(Y)
    det_sigma_full = np.linalg.det(cov_matrix_full_gaussian)

    mi_theoretical_gaussian = np.nan
    if det_sigma_xx > 1e-9 and det_sigma_yy > 1e-9 and det_sigma_full > 1e-9: # Evitar log de <=0 ou divisão por zero
        # Garantir que o argumento do log seja > 0
        log_arg = (det_sigma_xx * det_sigma_yy) / det_sigma_full
        if log_arg > 1e-9:
             mi_theoretical_gaussian = 0.5 * np.log(log_arg)
        else:
            print(f"AVISO: Argumento do log para MI teórica <=0 ({log_arg}). MI será NaN.")
    else:
        print(f"AVISO: Determinante zero ou próximo de zero. MI teórica será NaN.")


    print(f"  MI Teórica (Gaussiana, X {n_feat_x_analytical}-dim): {mi_theoretical_gaussian:.4f} nats")
    param_string_theoretical = f"X_dims={n_feat_x_analytical}, Y_dims=1, CovXY approx {np.mean(np.abs(sigma_xy)):.2f}"


    overall_results_list.append({
        "Method": "Theoretical Gaussian",
        "N_Features_X": n_feat_x_analytical,
        "MI (nats, mean)": mi_theoretical_gaussian,
        "Parameters": param_string_theoretical,
        "Time (s)": 0.0
    })

    # --- 1. Scikit-learn com Dados Gaussianos ---
    # Nota: mutual_info_regression retorna MI(X_i; Y) para cada feature X_i.
    # Vamos calcular a média desses valores. Não é I(X_vec; Y).
    for k_sklearn in k_values_knn:
        for n_jobs_sklearn in n_jobs_values_sklearn:
            params_sklearn = f"k={k_sklearn}, n_jobs={n_jobs_sklearn} (Gaussian Test)"
            start_time = time.time()
            mi_sklearn_per_feature = mutual_info_regression(X_gaussian_analytical, Y_gaussian_analytical, n_neighbors=k_sklearn, n_jobs=n_jobs_sklearn, random_state=0)
            mi_sklearn_val = np.mean(mi_sklearn_per_feature) # Média das MIs feature-wise
            end_time = time.time()
            time_sklearn = end_time - start_time
            overall_results_list.append({
                "Method": f"Scikit-learn (n_jobs={n_jobs_sklearn})",
                "N_Features_X": n_feat_x_analytical,
                "MI (nats, mean)": mi_sklearn_val,
                "Parameters": params_sklearn,
                "Time (s)": time_sklearn
            })

    # --- 2. NPEET com Dados Gaussianos ---
    # Estima I(X_vec; Y_vec)
    for k_npeet in k_values_knn:
        params_npeet = f"k={k_npeet} (Gaussian Test)"
        start_time = time.time()
        try:
            # X_gaussian_analytical já é (samples, n_feat_x_analytical)
            # Y_gaussian_analytical_reshaped é (samples, 1)
            mi_npeet_val = ee.mi(X_gaussian_analytical, Y_gaussian_analytical_reshaped, k=k_npeet, base=base_log_manual)
        except Exception as e:
            print(f"  Erro no NPEET com dados Gaussianos (X {n_feat_x_analytical}-dim): {e}")
            mi_npeet_val = np.nan
        end_time = time.time()
        time_npeet = end_time - start_time
        overall_results_list.append({
            "Method": "NPEET",
            "N_Features_X": n_feat_x_analytical,
            "MI (nats, mean)": mi_npeet_val,
            "Parameters": params_npeet,
            "Time (s)": time_npeet
        })
    
    # --- 3. Binarização com Dados Gaussianos ---
    def discretizar_variavel_continua(variavel_continua, num_bins):
        if len(variavel_continua) == 0: return np.array([])
        min_val = np.min(variavel_continua)
        max_val = np.max(variavel_continua)
        if min_val == max_val: return np.zeros(len(variavel_continua), dtype=int)
        bins = np.linspace(min_val, max_val + 1e-9 * abs(max_val) if max_val != 0 else 1e-9, num_bins + 1)
        variavel_discretizada = np.digitize(variavel_continua, bins=bins) - 1
        return np.clip(variavel_discretizada, 0, num_bins - 1)

    def calcular_informacao_mutua_binning_1d(X_1d_continuo, Y_1d_continuo, num_bins_x, num_bins_y, base_log=base_log_manual):
        N = len(X_1d_continuo)
        if N == 0: return 0.0
        X_binned = discretizar_variavel_continua(X_1d_continuo, num_bins_x)
        Y_binned = discretizar_variavel_continua(Y_1d_continuo, num_bins_y)
        p_xy = defaultdict(float)
        for i in range(N): p_xy[(X_binned[i], Y_binned[i])] += 1.0
        for key in p_xy: p_xy[key] /= N
        p_x = Counter(X_binned); p_y = Counter(Y_binned)
        for key in p_x: p_x[key] /= N
        for key in p_y: p_y[key] /= N
        mutual_info = 0.0
        for (x_val, y_val), joint_prob in p_xy.items():
            if joint_prob > 1e-12: 
                marginal_x_prob = p_x.get(x_val, 0)
                marginal_y_prob = p_y.get(y_val, 0)
                if marginal_x_prob > 1e-12 and marginal_y_prob > 1e-12: 
                    mutual_info += joint_prob * np.log(joint_prob / (marginal_x_prob * marginal_y_prob))
        if base_log != np.e: mutual_info /= np.log(base_log)
        return mutual_info

    for num_bins_current in n_bins_values_binning:
        params_binning = f"bins={num_bins_current} (Gaussian Test)"
        mi_binning_feature_wise_gaussian = []
        time_binning_total_gaussian = 0
        for j in range(n_feat_x_analytical):
            start_time = time.time()
            mi_j = calcular_informacao_mutua_binning_1d(X_gaussian_analytical[:, j], Y_gaussian_analytical, num_bins_current, num_bins_current, base_log=base_log_manual)
            end_time = time.time()
            time_binning_total_gaussian += (end_time - start_time)
            mi_binning_feature_wise_gaussian.append(mi_j)
        
        mi_binning_val = np.mean(mi_binning_feature_wise_gaussian) if mi_binning_feature_wise_gaussian else np.nan
        
        overall_results_list.append({
            "Method": "Binarização",
            "N_Features_X": n_feat_x_analytical,
            "MI (nats, mean)": mi_binning_val,
            "Parameters": params_binning,
            "Time (s)": time_binning_total_gaussian
        })

    # --- 4. KDE com Dados Gaussianos ---
    # Estima I(X_vec; Y)
    epsilon_kde = 1e-10
    params_kde = "gaussian_kde default bw (Gaussian Test)"
    start_time = time.time()
    mi_kde_val = np.nan
    h_X_kde, h_Y_kde, h_XY_kde = np.nan, np.nan, np.nan
    try:
        # H(X_vec) - KDE espera (dims, samples)
        kde_X = gaussian_kde(X_gaussian_analytical.T)
        log_p_X = np.log(kde_X.pdf(X_gaussian_analytical.T) + epsilon_kde)
        h_X_kde = -np.mean(log_p_X)
        
        # H(Y)
        kde_Y = gaussian_kde(Y_gaussian_analytical) # Y é 1D
        log_p_Y = np.log(kde_Y.pdf(Y_gaussian_analytical) + epsilon_kde)
        h_Y_kde = -np.mean(log_p_Y)
        
        # H(X_vec, Y)
        # np.vstack espera uma tupla de arrays a serem empilhados verticalmente
        # X_gaussian_analytical.T é (n_feat_x_analytical, n_samples)
        # Y_gaussian_analytical (1D) precisa ser (1, n_samples) para vstack
        data_XY = np.vstack((X_gaussian_analytical.T, Y_gaussian_analytical.reshape(1, -1)))
        kde_XY = gaussian_kde(data_XY)
        log_p_XY = np.log(kde_XY.pdf(data_XY) + epsilon_kde)
        h_XY_kde = -np.mean(log_p_XY)
        
        mi_kde_val = h_X_kde + h_Y_kde - h_XY_kde
    except Exception as e:
        print(f"  Erro no KDE com dados Gaussianos (X {n_feat_x_analytical}-dim): {e}")
        mi_kde_val = np.nan
        # Se falhar, o tempo total não reflete o cálculo completo
        # No entanto, time.time() - start_time ainda dará o tempo até a falha.
        # Poderíamos definir time_kde = np.nan também, mas por ora deixamos o tempo parcial.
    end_time = time.time()
    time_kde = end_time - start_time
    overall_results_list.append({
        "Method": "KDE",
        "N_Features_X": n_feat_x_analytical,
        "MI (nats, mean)": mi_kde_val,
        "Parameters": params_kde,
        "Time (s)": time_kde
    })

print("--- FIM DO TESTE ANALÍTICO COM DADOS GAUSSIANOS ---")


#==============================================================================
# LOOP ORIGINAL PARA TESTAR COM DIFERENTES NÚMEROS DE FEATURES (DADOS UNIFORMES)
#==============================================================================
if n_features_to_test_uniform: # Apenas executa se a lista não estiver vazia
    print("\n--- INICIANDO TESTES COM DADOS UNIFORMES E COMBINAÇÃO LINEAR ---")
    for n_features_X in n_features_to_test_uniform: # Renomeado para clareza
        # --- Gerar Dados Sintéticos ---
        X = np.random.uniform(0, 10, (n_samples_global, n_features_X))
        if n_features_X > 0:
            weights = np.random.rand(n_features_X)
            Y = np.dot(X, weights) + np.random.uniform(-1, 1, n_samples_global) * noise_scale_global
        else:
            X = np.zeros((n_samples_global, 1))
            Y = np.random.uniform(-1, 1, n_samples_global) * noise_scale_global
        Y_reshaped = Y.reshape(-1, 1)

        print(f"--- Geração de Dados ({n_features_X} features) ---")
        print(f"Número de amostras: {n_samples_global}")
        print(f"Escala do ruído: {noise_scale_global:.1f}")

        # --- 1. Cálculo de MI com Scikit-learn ---
        for k_sklearn in k_values_knn:
            for n_jobs_sklearn in n_jobs_values_sklearn:
                params_sklearn = f"k={k_sklearn}, n_jobs={n_jobs_sklearn}"
                start_time = time.time()
                if n_features_X > 0:
                    mi_sklearn_values_per_feature = mutual_info_regression(X, Y, n_neighbors=k_sklearn, n_jobs=n_jobs_sklearn, random_state=0)
                    mi_sklearn_mean = np.mean(mi_sklearn_values_per_feature)
                else:
                    mi_sklearn_mean = np.nan
                end_time = time.time()
                time_sklearn = end_time - start_time
                overall_results_list.append({
                    "Method": f"Scikit-learn (n_jobs={n_jobs_sklearn})",
                    "N_Features_X": n_features_X,
                    "MI (nats, mean)": mi_sklearn_mean,
                    "Parameters": params_sklearn,
                    "Time (s)": time_sklearn
                })

        # --- 2. Cálculo de MI com NPEET (feature-wise para dados uniformes) ---
        # No loop de dados uniformes, n_features_X refere-se às features de X.
        # Se n_features_X > 1, calculamos a média de I(X_i; Y).
        # Se n_features_X = 1, é I(X;Y).
        # Para NPEET, vamos manter a lógica de média feature-wise para consistência com
        # Scikit-learn e Binarização neste loop de dados uniformes.
        # Se quiséssemos I(X_vec; Y) para dados uniformes com NPEET, chamaríamos uma vez.
        
        # DECISÃO: Para o loop de dados uniformes, manter a lógica feature-wise para todos
        # os métodos que o suportam, para comparar "apples-to-apples" o que cada um faz
        # quando X tem múltiplas features e Y é 1D. O teste Gaussiano acima
        # é onde NPEET e KDE estimam o I(X_vec, Y) teórico.

        for k_npeet in k_values_knn:
            params_npeet = f"k={k_npeet}"
            mi_npeet_values_per_feature = []
            time_npeet_total = 0
            try:
                if n_features_X == 0:
                     mi_npeet_values_per_feature.append(np.nan)
                else:
                    for j in range(n_features_X): # Feature-wise
                        X_j_reshaped = X[:, j].reshape(-1, 1)
                        start_time_feat = time.time()
                        mi_j = ee.mi(X_j_reshaped, Y_reshaped, k=k_npeet, base=base_log_manual)
                        end_time_feat = time.time()
                        time_npeet_total += (end_time_feat - start_time_feat)
                        mi_npeet_values_per_feature.append(mi_j)
                if mi_npeet_values_per_feature and not all(np.isnan(mi_npeet_values_per_feature)):
                    mi_npeet_mean = np.mean(mi_npeet_values_per_feature)
                else:
                    mi_npeet_mean = np.nan
            except Exception as e:
                print(f"Erro ao calcular MI com NPEET para uma feature (dados uniformes): {e}")
                mi_npeet_mean = np.nan
                time_npeet_total = np.nan
            overall_results_list.append({
                "Method": "NPEET",
                "N_Features_X": n_features_X,
                "MI (nats, mean)": mi_npeet_mean,
                "Parameters": params_npeet,
                "Time (s)": time_npeet_total
            })

        # --- 3. Cálculo de MI com Método de Binarização (Binning) (feature-wise) ---
        for num_bins_current in n_bins_values_binning:
            params_binning = f"bins={num_bins_current}"
            mi_binning_values_per_feature = []
            time_binning_total = 0
            if n_features_X == 0:
                 mi_binning_values_per_feature.append(np.nan)
            else:
                for j in range(n_features_X): # Feature-wise
                    start_time_feat = time.time()
                    mi_j = calcular_informacao_mutua_binning_1d(X[:, j], Y, num_bins_current, num_bins_current, base_log=base_log_manual)
                    end_time_feat = time.time()
                    time_binning_total += (end_time_feat - start_time_feat)
                    mi_binning_values_per_feature.append(mi_j)
            if mi_binning_values_per_feature and not all(np.isnan(mi_binning_values_per_feature)):
                mi_binning_mean = np.mean(mi_binning_values_per_feature)
            else:
                mi_binning_mean = np.nan
            overall_results_list.append({
                "Method": "Binarização",
                "N_Features_X": n_features_X,
                "MI (nats, mean)": mi_binning_mean,
                "Parameters": params_binning,
                "Time (s)": time_binning_total
            })

        # --- 4. Cálculo de MI com Estimativa de Densidade por Kernel (KDE) (feature-wise) ---
        epsilon_kde = 1e-10
        params_kde = "gaussian_kde default bw"
        mi_kde_values_per_feature = []
        time_kde_total = 0
        kde_calculation_successful_for_at_least_one_feature = False
        h_Y_kde_val = np.nan
        try:
            kde_Y_global = gaussian_kde(Y)
            log_p_Y_global = np.log(kde_Y_global.pdf(Y) + epsilon_kde)
            h_Y_kde_val = -np.mean(log_p_Y_global)
        except Exception as e_hy:
            print(f"Erro ao calcular H(Y) com KDE (dados uniformes): {e_hy}")

        if not np.isnan(h_Y_kde_val):
            if n_features_X == 0:
                pass
            else:
                for j in range(n_features_X): # Feature-wise
                    X_j = X[:, j]
                    h_Xj_kde, h_XjY_kde, mi_j_kde = np.nan, np.nan, np.nan
                    try:
                        start_time_feature = time.time()
                        kde_Xj = gaussian_kde(X_j)
                        log_p_Xj = np.log(kde_Xj.pdf(X_j) + epsilon_kde)
                        h_Xj_kde = -np.mean(log_p_Xj)
                        data_XjY = np.vstack([X_j, Y])
                        kde_XjY = gaussian_kde(data_XjY)
                        log_p_XjY = np.log(kde_XjY.pdf(data_XjY) + epsilon_kde)
                        h_XjY_kde = -np.mean(log_p_XjY)
                        mi_j_kde = h_Xj_kde + h_Y_kde_val - h_XjY_kde
                        end_time_feature = time.time()
                        time_kde_total += (end_time_feature - start_time_feature)
                        mi_kde_values_per_feature.append(mi_j_kde)
                        kde_calculation_successful_for_at_least_one_feature = True
                    except Exception as e_feature:
                        print(f"Erro ao calcular MI com KDE para feature {j} (dados uniformes): {e_feature}")
                        mi_kde_values_per_feature.append(np.nan)
        if kde_calculation_successful_for_at_least_one_feature and mi_kde_values_per_feature:
            mi_kde_mean = np.nanmean(mi_kde_values_per_feature)
        else:
            mi_kde_mean = np.nan
            time_kde_total = np.nan
        overall_results_list.append({
            "Method": "KDE",
            "N_Features_X": n_features_X,
            "MI (nats, mean)": mi_kde_mean,
            "Parameters": params_kde,
            "Time (s)": time_kde_total
        })
else:
    print("\nNenhum teste com dados uniformes será executado pois 'n_features_to_test_uniform' está vazio.")


# --- Criar e Mostrar DataFrame com os Resultados Finais ---
df_results = pd.DataFrame(overall_results_list)

if not df_results.empty:
    cols_order = ["Method", "N_Features_X", "MI (nats, mean)", "Parameters", "Time (s)"]
    existing_cols_order = [col for col in cols_order if col in df_results.columns]
    df_results = df_results[existing_cols_order]

    # Lidar com NaNs na coluna 'Time (s)' antes de rankear para evitar erros com alguns tipos de NaN
    df_results['Time (s)'] = pd.to_numeric(df_results['Time (s)'], errors='coerce')


    df_results['Processing_Time_Rank'] = df_results.groupby('N_Features_X')['Time (s)'] \
                                                 .rank(method='dense', na_option='bottom')
    
    df_results_ranked_sorted = df_results.sort_values(by=['N_Features_X', 'MI (nats, mean)', 'Processing_Time_Rank'], ascending=[True, False, True]) # Ordenar por N_Features, depois por MI (decrescente), depois por Rank (crescente)
    
    print("\n--- Resultados Rankeados (Dentro de Cada N_Features_X, Ordenado por MI e Tempo) ---")
    pd.set_option('display.max_rows', None) 
    pd.set_option('display.max_columns', None)
    pd.set_option('display.width', 1000) 

    for n_feat, group_df in df_results_ranked_sorted.groupby('N_Features_X'):
        print(f"\n>>> Resultados para N_Features_X = {n_feat} <<<")
        # Mostrar colunas relevantes, incluindo o MI e o Rank
        print(group_df[['Method', 'Parameters', 'MI (nats, mean)', 'Time (s)', 'Processing_Time_Rank']].to_string(index=False))
else:
    print("Nenhum resultado para exibir.")