## Setup Inicial

In [None]:
import numpy as np
import pandas as pd
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from sklearn.metrics import r2_score, mean_absolute_error, mean_squared_error
from scipy.stats import pearsonr
import time
import warnings

from sklearn.datasets import load_iris
from sklearn.impute import KNNImputer
from sklearn.preprocessing import LabelEncoder
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
from sklearn.ensemble import RandomForestRegressor

from iscak_core import ISCAkCore

# Configurações
warnings.filterwarnings('ignore')
pd.set_option('display.max_columns', None)
pd.set_option('display.width', None)
pd.set_option('display.max_colwidth', None)

# Configuração de runs
N_RUNS = 50
BASE_SEED = 42
MISSING_RATES = [0.10, 0.20, 0.30, 0.40]
MECHANISMS = ['MCAR', 'MAR', 'MNAR']
METHOD_ORDER = ['ISCA-k', 'KNN', 'MICE', 'MissForest']

# Gerar seeds dinamicamente
SEEDS = [BASE_SEED + run for run in range(N_RUNS)]

# Cores consistentes para métodos (Plotly)
METHOD_COLORS = {
    'ISCA-k': '#2E86AB',      # Azul
    'KNN': '#A23B72',         # Roxo
    'MICE': '#F18F01',        # Laranja
    'MissForest': '#C73E1D'   # Vermelho
}

print("Setup inicial completo")
print(f"N_RUNS: {N_RUNS}")
print(f"Seeds: {BASE_SEED} a {BASE_SEED + N_RUNS - 1}")
print(f"Missing rates: {[int(r*100) for r in MISSING_RATES]}%")
print(f"Mecanismos: {MECHANISMS}")

## Gerar MCAR

In [None]:
def introduce_mcar(data: pd.DataFrame, missing_rate: float, seed: int) -> pd.DataFrame:
    """
    Introduz missings MCAR (Missing Completely At Random).
    
    Args:
        data: DataFrame original
        missing_rate: Proporção de missings (0 a 1)
        seed: Seed para reprodutibilidade
    
    Returns:
        DataFrame com missings introduzidos
    """
    np.random.seed(seed)
    data_missing = data.copy()
    mask = np.random.rand(*data.shape) < missing_rate
    data_missing = data_missing.mask(mask)
    return data_missing


## Métricas de Avaliação

In [None]:
def calculate_metrics(data_original: pd.DataFrame, 
                     data_imputed: pd.DataFrame, 
                     missing_mask: pd.DataFrame) -> dict:
    """
    Calcula métricas de avaliação para imputação.
    Retorna tanto métricas agregadas quanto métricas por coluna.
    
    Args:
        data_original: DataFrame original (ground truth)
        data_imputed: DataFrame após imputação
        missing_mask: Máscara booleana dos valores que eram missing
    
    Returns:
        Dict com:
        - R2, Pearson, NRMSE (numéricas) e Accuracy (categóricas) - AGREGADOS
        - column_metrics: dict com métricas individuais por coluna
    """
    numeric_cols = data_original.select_dtypes(include=[np.number]).columns
    categorical_cols = data_original.select_dtypes(exclude=[np.number]).columns
    
    metrics = {
        'R2': np.nan,
        'Pearson': np.nan,
        'NRMSE': np.nan,
        'Accuracy': np.nan,
        'column_metrics': {}
    }
    
    # ===== MÉTRICAS NUMÉRICAS =====
    r2_scores = []
    pearson_scores = []
    nrmse_scores = []
    
    for col in numeric_cols:
        col_mask = missing_mask[col]
        
        if not col_mask.any():
            continue
        
        orig_vals = data_original.loc[col_mask, col].values
        imp_vals = data_imputed.loc[col_mask, col].values
        
        # Remover possíveis NaN remanescentes
        valid = ~(pd.isna(orig_vals) | pd.isna(imp_vals))
        orig_vals = orig_vals[valid]
        imp_vals = imp_vals[valid]
        
        if len(orig_vals) < 2:
            continue
        
        # Inicializar métricas desta coluna
        col_metrics = {
            'type': 'numeric',
            'R2': np.nan,
            'NRMSE': np.nan,
            'Accuracy': np.nan
        }
        
        # R²
        if np.var(orig_vals) > 1e-10:
            col_r2 = r2_score(orig_vals, imp_vals)
            r2_scores.append(col_r2)
            col_metrics['R2'] = col_r2
        
        # Pearson
        if len(orig_vals) >= 3 and np.var(orig_vals) > 1e-10 and np.var(imp_vals) > 1e-10:
            pearson_scores.append(pearsonr(orig_vals, imp_vals)[0])
        
        # NRMSE
        rmse = np.sqrt(mean_squared_error(orig_vals, imp_vals))
        value_range = orig_vals.max() - orig_vals.min()
        if value_range > 1e-10:
            col_nrmse = rmse / value_range
            nrmse_scores.append(col_nrmse)
            col_metrics['NRMSE'] = col_nrmse
        
        metrics['column_metrics'][col] = col_metrics
    
    if r2_scores:
        metrics['R2'] = np.mean(r2_scores)
    if pearson_scores:
        metrics['Pearson'] = np.mean(pearson_scores)
    if nrmse_scores:
        metrics['NRMSE'] = np.mean(nrmse_scores)
    
    # ===== MÉTRICAS CATEGÓRICAS =====
    accuracy_scores = []
    
    for col in categorical_cols:
        col_mask = missing_mask[col]
        
        if not col_mask.any():
            continue
        
        orig_vals = data_original.loc[col_mask, col]
        imp_vals = data_imputed.loc[col_mask, col]
        
        # Remover possíveis NaN remanescentes
        valid = ~(orig_vals.isna() | imp_vals.isna())
        if valid.sum() < 1:
            continue
        
        orig_vals = orig_vals[valid]
        imp_vals = imp_vals[valid]
        
        col_accuracy = (orig_vals == imp_vals).mean()
        accuracy_scores.append(col_accuracy)
        
        # Guardar métricas desta coluna
        metrics['column_metrics'][col] = {
            'type': 'categorical',
            'R2': np.nan,
            'NRMSE': np.nan,
            'Accuracy': col_accuracy
        }
    
    if accuracy_scores:
        metrics['Accuracy'] = np.mean(accuracy_scores)
    
    return metrics

## Testes estatisticos

In [None]:
from scipy.stats import wilcoxon, friedmanchisquare
from itertools import combinations

def pairwise_wilcoxon(results_dict: dict, mechanism: str, missing_rate: float, 
                      metric: str = 'NRMSE', reference_method: str = 'ISCA-k') -> pd.DataFrame:
    """
    Realiza testes de Wilcoxon signed-rank entre o método de referência e os outros.
    
    Args:
        results_dict: Dicionário com resultados do benchmark
        mechanism: 'MCAR', 'MAR' ou 'MNAR'
        missing_rate: Missing rate específico (e.g., 0.30)
        metric: Métrica a comparar ('R2', 'NRMSE', 'Pearson', 'Accuracy')
        reference_method: Método de referência (default: 'ISCA-k')
    
    Returns:
        DataFrame com comparações e p-values
    """
    comparisons = []
    
    # Obter scores do método de referência
    key_ref = (mechanism, missing_rate, reference_method)
    if key_ref not in results_dict:
        print(f"Método de referência {reference_method} não encontrado")
        return pd.DataFrame()
    
    ref_scores = [run[metric] for run in results_dict[key_ref] if not np.isnan(run[metric])]
    
    if len(ref_scores) < 2:
        print(f"Dados insuficientes para {reference_method}")
        return pd.DataFrame()
    
    # Comparar com outros métodos
    for method in METHOD_ORDER:
        if method == reference_method:
            continue
        
        key = (mechanism, missing_rate, method)
        if key not in results_dict:
            continue
        
        method_scores = [run[metric] for run in results_dict[key] if not np.isnan(run[metric])]
        
        if len(method_scores) < 2 or len(ref_scores) != len(method_scores):
            continue
        
        # Wilcoxon signed-rank test
        try:
            statistic, p_value = wilcoxon(ref_scores, method_scores)
            
            # Determinar significância
            if p_value < 0.001:
                significance = "***"
            elif p_value < 0.01:
                significance = "**"
            elif p_value < 0.05:
                significance = "*"
            else:
                significance = "n.s."
            
            # Calcular diferença média
            diff = np.mean(ref_scores) - np.mean(method_scores)
            
            # Para NRMSE, menor é melhor (inverter interpretação)
            if metric == 'NRMSE':
                better = reference_method if diff < 0 else method
            else:
                better = reference_method if diff > 0 else method
            
            comparisons.append({
                'Comparison': f"{reference_method} vs {method}",
                'Reference_Mean': f"{np.mean(ref_scores):.4f}",
                'Other_Mean': f"{np.mean(method_scores):.4f}",
                'Difference': f"{diff:.4f}",
                'p-value': f"{p_value:.4f}",
                'Significance': significance,
                'Better': better
            })
        
        except Exception as e:
            print(f"Erro no teste {reference_method} vs {method}: {e}")
            continue
    
    return pd.DataFrame(comparisons)


def display_statistical_tests(results_dict: dict, mechanism: str, missing_rate: float,
                              dataset_name: str, metric: str = 'NRMSE'):
    """
    Exibe tabela de testes estatísticos com Plotly.
    
    Args:
        results_dict: Dicionário com resultados
        mechanism: 'MCAR', 'MAR' ou 'MNAR'
        missing_rate: Missing rate específico
        dataset_name: Nome do dataset
        metric: Métrica a analisar
    """
    df = pairwise_wilcoxon(results_dict, mechanism, missing_rate, metric, reference_method='ISCA-k')
    
    if len(df) == 0:
        print(f"Sem dados para testes estatísticos - {mechanism} {int(missing_rate*100)}%")
        return
    
    # Criar tabela Plotly
    fig = go.Figure(data=[go.Table(
        header=dict(
            values=list(df.columns),
            fill_color='#2E86AB',
            font=dict(color='white', size=12, family='Arial'),
            align='center',
            height=35,
            line_color='white'
        ),
        cells=dict(
            values=[df[col] for col in df.columns],
            fill_color=[['#f8f9fa', 'white'] * len(df)],
            font=dict(size=11, family='Arial'),
            align='center',
            height=28,
            line_color='#e0e0e0'
        )
    )])
    
    fig.update_layout(
        title=f"{dataset_name} - {mechanism} {int(missing_rate*100)}% - Statistical Tests ({metric})",
        title_font_size=14,
        height=min(400, 150 + len(df) * 30),
        margin=dict(l=20, r=20, t=60, b=20)
    )
    
    fig.show()
    
    # Interpretação textual
    print(f"\nInterpretação dos resultados ({metric}):")
    print("Significância: *** p<0.001, ** p<0.01, * p<0.05, n.s. = não significativo")
    
    significant_wins = df[df['Significance'] != 'n.s.']
    if len(significant_wins) > 0:
        print(f"\nISCA-k apresenta diferenças significativas em {len(significant_wins)}/{len(df)} comparações:")
        for _, row in significant_wins.iterrows():
            print(f"  - vs {row['Comparison'].split('vs')[1].strip()}: p={row['p-value']} {row['Significance']}")
    else:
        print("\nNenhuma diferença estatisticamente significativa detectada.")


def friedman_test_all_methods(results_dict: dict, mechanism: str, missing_rate: float,
                               metric: str = 'NRMSE') -> dict:
    """
    Realiza teste de Friedman para comparar múltiplos métodos.
    
    Args:
        results_dict: Dicionário com resultados
        mechanism: 'MCAR', 'MAR' ou 'MNAR'
        missing_rate: Missing rate específico
        metric: Métrica a analisar
    
    Returns:
        Dict com estatística e p-value
    """
    # Coletar scores de todos os métodos
    all_scores = []
    methods_with_data = []
    
    for method in METHOD_ORDER:
        key = (mechanism, missing_rate, method)
        if key not in results_dict:
            continue
        
        scores = [run[metric] for run in results_dict[key] if not np.isnan(run[metric])]
        
        if len(scores) >= 2:
            all_scores.append(scores)
            methods_with_data.append(method)
    
    if len(all_scores) < 3:
        return {'error': 'Dados insuficientes (necessário ≥3 métodos)'}
    
    # Verificar se todos têm o mesmo número de runs
    if len(set(len(s) for s in all_scores)) > 1:
        return {'error': 'Número diferente de runs entre métodos'}
    
    try:
        statistic, p_value = friedmanchisquare(*all_scores)
        
        # Calcular rankings médios
        # Transformar para array (n_runs x n_methods)
        data_array = np.array(all_scores).T
        
        # Calcular ranks: menor valor = rank 1 para NRMSE, maior valor = rank 1 para R²
        if metric == 'NRMSE':
            # Menor é melhor: rank 1 para o menor valor
            ranks = np.apply_along_axis(lambda x: np.argsort(np.argsort(x)) + 1, 1, data_array)
        else:
            # Maior é melhor (R², Pearson, Accuracy): rank 1 para o maior valor
            ranks = np.apply_along_axis(lambda x: np.argsort(np.argsort(-x)) + 1, 1, data_array)
        
        mean_ranks = ranks.mean(axis=0)
        
        return {
            'statistic': statistic,
            'p_value': p_value,
            'methods': methods_with_data,
            'mean_ranks': mean_ranks,
            'significant': p_value < 0.05
        }
    
    except Exception as e:
        return {'error': str(e)}


def display_friedman_results(results_dict: dict, mechanism: str, missing_rate: float,
                             dataset_name: str, metric: str = 'NRMSE'):
    """
    Exibe resultados do teste de Friedman.
    
    Args:
        results_dict: Dicionário com resultados
        mechanism: 'MCAR', 'MAR' ou 'MNAR'
        missing_rate: Missing rate específico
        dataset_name: Nome do dataset
        metric: Métrica a analisar
    """
    result = friedman_test_all_methods(results_dict, mechanism, missing_rate, metric)
    
    if 'error' in result:
        print(f"Teste de Friedman: {result['error']}")
        return
    
    print(f"\n{'='*80}")
    print(f"TESTE DE FRIEDMAN - {dataset_name}")
    print(f"{mechanism} {int(missing_rate*100)}% - {metric}")
    print('='*80)
    
    print(f"\nEstatística: {result['statistic']:.4f}")
    print(f"p-value: {result['p_value']:.4f}")
    
    if result['significant']:
        print("Resultado: DIFERENÇAS SIGNIFICATIVAS entre métodos (p < 0.05)")
    else:
        print("Resultado: Sem diferenças significativas entre métodos")
    
    print("\nRanking médio (1=melhor):")
    for method, rank in zip(result['methods'], result['mean_ranks']):
        print(f"  {method}: {rank:.2f}")
    
    # Ordenar por ranking
    sorted_methods = sorted(zip(result['methods'], result['mean_ranks']), key=lambda x: x[1])
    print(f"\nOrdem: {' > '.join([m for m, r in sorted_methods])}")