# ISCA-k v2 - Benchmark de Avaliação

Este notebook é o **único ponto de avaliação** do método ISCA-k.

Cada vez que modificamos o método (adicionamos pesos, PDS, k adaptativo, etc.), corremos este notebook para medir o impacto.

## Estrutura
1. **Funções de Avaliação** - Métricas (RMSE, R², etc.)
2. **Funções de Missingness** - Introduzir MCAR, MAR, MNAR
3. **Datasets** - Carregar dados de teste
4. **Métodos de Imputação** - ISCA-k e baselines (KNN, MICE)
5. **Benchmark** - Executar e comparar
6. **Resultados** - Visualizar e analisar

In [None]:
import numpy as np
import pandas as pd
import time
import warnings
from typing import Tuple, Dict, List, Union, Optional

# sklearn
from sklearn.datasets import load_iris, load_diabetes, load_wine
from sklearn.impute import KNNImputer
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer

warnings.filterwarnings('ignore')
np.random.seed(42)

print("Imports OK")

---
## 1. Funções de Avaliação (Métricas)

Métricas para avaliar qualidade da imputação:
- **RMSE**: Root Mean Squared Error
- **NRMSE**: RMSE normalizado pelo range
- **MAE**: Mean Absolute Error
- **R²**: Coeficiente de determinação
- **Accuracy**: Para variáveis categóricas

In [None]:
def rmse(y_true: np.ndarray, y_pred: np.ndarray) -> float:
    """Root Mean Squared Error."""
    y_true = np.asarray(y_true, dtype=np.float64)
    y_pred = np.asarray(y_pred, dtype=np.float64)
    return float(np.sqrt(np.mean((y_true - y_pred) ** 2)))


def nrmse(y_true: np.ndarray, y_pred: np.ndarray) -> float:
    """Normalized RMSE (pelo range)."""
    y_true = np.asarray(y_true, dtype=np.float64)
    value_range = float(np.max(y_true) - np.min(y_true))
    if value_range == 0:
        return np.nan
    return rmse(y_true, y_pred) / value_range


def mae(y_true: np.ndarray, y_pred: np.ndarray) -> float:
    """Mean Absolute Error."""
    y_true = np.asarray(y_true, dtype=np.float64)
    y_pred = np.asarray(y_pred, dtype=np.float64)
    return float(np.mean(np.abs(y_true - y_pred)))


def r2_score(y_true: np.ndarray, y_pred: np.ndarray) -> float:
    """Coeficiente de determinação R²."""
    y_true = np.asarray(y_true, dtype=np.float64)
    y_pred = np.asarray(y_pred, dtype=np.float64)
    
    if len(y_true) < 2:
        return np.nan
    
    ss_res = np.sum((y_true - y_pred) ** 2)
    ss_tot = np.sum((y_true - np.mean(y_true)) ** 2)
    
    if ss_tot == 0:
        return np.nan
    
    return float(1 - ss_res / ss_tot)


def accuracy(y_true: np.ndarray, y_pred: np.ndarray) -> float:
    """Accuracy para variáveis categóricas."""
    matches = sum(1 for t, p in zip(y_true, y_pred) if t == p)
    return float(matches / len(y_true))


def compute_metrics(original: np.ndarray, imputed: np.ndarray, 
                    mask: np.ndarray, is_categorical: bool = False) -> Dict:
    """Calcula métricas para uma coluna imputada."""
    y_true = original[mask]
    y_pred = imputed[mask]
    
    if is_categorical:
        return {'accuracy': accuracy(y_true, y_pred), 'n': len(y_true)}
    
    y_true = y_true.astype(np.float64)
    y_pred = y_pred.astype(np.float64)
    
    return {
        'rmse': rmse(y_true, y_pred),
        'nrmse': nrmse(y_true, y_pred),
        'mae': mae(y_true, y_pred),
        'r2': r2_score(y_true, y_pred),
        'n': len(y_true)
    }


# Teste rápido
y_true = np.array([1.0, 2.0, 3.0, 4.0, 5.0])
y_pred = np.array([1.1, 2.0, 3.1, 4.0, 5.0])
print(f"RMSE teste: {rmse(y_true, y_pred):.4f} (esperado: ~0.063)")
print(f"R² teste: {r2_score(y_true, y_pred):.4f} (esperado: ~0.99)")
print("Métricas OK ✓")

---
## 2. Funções de Missingness

Introduzir valores em falta com diferentes padrões:
- **MCAR**: Missing Completely At Random
- **MAR**: Missing At Random (depende de outras variáveis)
- **MNAR**: Missing Not At Random (depende do próprio valor)

In [None]:
def introduce_mcar(data: np.ndarray, missing_rate: float, 
                   random_state: int = 42) -> Tuple[np.ndarray, np.ndarray]:
    """
    Introduz missings MCAR (Missing Completely At Random).
    
    Returns:
        (data_missing, mask) - mask é True onde valor foi removido
    """
    rng = np.random.RandomState(random_state)
    data = np.array(data, dtype=np.float64).copy()
    n_rows, n_cols = data.shape
    
    # Gerar máscara
    mask = rng.random((n_rows, n_cols)) < missing_rate
    
    # Garantir pelo menos 1 valor por linha
    for i in range(n_rows):
        if mask[i].all():
            mask[i, rng.randint(n_cols)] = False
    
    # Garantir pelo menos 1 valor por coluna
    for j in range(n_cols):
        if mask[:, j].all():
            mask[rng.randint(n_rows), j] = False
    
    data[mask] = np.nan
    return data, mask


def introduce_mar(data: np.ndarray, missing_rate: float,
                  random_state: int = 42, driver_col: int = 0) -> Tuple[np.ndarray, np.ndarray]:
    """
    Introduz missings MAR (Missing At Random).
    Probabilidade depende da coluna driver.
    """
    rng = np.random.RandomState(random_state)
    data = np.array(data, dtype=np.float64).copy()
    n_rows, n_cols = data.shape
    
    driver = data[:, driver_col]
    median = np.nanmedian(driver)
    
    prob_high = min(missing_rate * 1.5, 0.95)
    prob_low = max(missing_rate * 0.5, 0.0)
    
    mask = np.zeros((n_rows, n_cols), dtype=bool)
    
    for i in range(n_rows):
        prob = prob_high if driver[i] > median else prob_low
        for j in range(n_cols):
            if j != driver_col and rng.random() < prob:
                mask[i, j] = True
    
    # Garantias
    for i in range(n_rows):
        if mask[i].all():
            mask[i, rng.randint(n_cols)] = False
    for j in range(n_cols):
        if mask[:, j].all():
            mask[rng.randint(n_rows), j] = False
    
    data[mask] = np.nan
    return data, mask


def introduce_mnar(data: np.ndarray, missing_rate: float,
                   random_state: int = 42) -> Tuple[np.ndarray, np.ndarray]:
    """
    Introduz missings MNAR (Missing Not At Random).
    Probabilidade depende do próprio valor.
    """
    rng = np.random.RandomState(random_state)
    data = np.array(data, dtype=np.float64).copy()
    n_rows, n_cols = data.shape
    
    prob_high = min(missing_rate * 1.5, 0.95)
    prob_low = max(missing_rate * 0.5, 0.0)
    
    mask = np.zeros((n_rows, n_cols), dtype=bool)
    
    for j in range(n_cols):
        col = data[:, j]
        median = np.nanmedian(col)
        for i in range(n_rows):
            prob = prob_high if col[i] > median else prob_low
            if rng.random() < prob:
                mask[i, j] = True
    
    # Garantias
    for i in range(n_rows):
        if mask[i].all():
            mask[i, rng.randint(n_cols)] = False
    for j in range(n_cols):
        if mask[:, j].all():
            mask[rng.randint(n_rows), j] = False
    
    data[mask] = np.nan
    return data, mask


# Teste rápido
test_data = np.random.randn(100, 5)
data_missing, mask = introduce_mcar(test_data, 0.2)
actual_rate = mask.sum() / mask.size
print(f"Taxa de missings: {actual_rate:.2%} (alvo: 20%)")
print(f"Linhas 100% vazias: {(mask.all(axis=1)).sum()} (esperado: 0)")
print(f"Colunas 100% vazias: {(mask.all(axis=0)).sum()} (esperado: 0)")
print("Missingness OK ✓")

---
## 3. Datasets

Datasets de teste:
- **Iris**: 150 × 4 (pequeno, baixa dimensionalidade)
- **Wine**: 178 × 13 (médio)
- **Diabetes**: 442 × 10 (JÁ NORMALIZADO!)

In [None]:
def load_datasets():
    """Carrega todos os datasets de teste."""
    datasets = {}
    
    # Iris
    iris = load_iris()
    datasets['iris'] = {
        'data': iris.data,
        'names': iris.feature_names,
        'is_normalized': False,
        'description': 'Iris (150×4)'
    }
    
    # Wine
    wine = load_wine()
    datasets['wine'] = {
        'data': wine.data,
        'names': wine.feature_names,
        'is_normalized': False,
        'description': 'Wine (178×13)'
    }
    
    # Diabetes (ATENÇÃO: já normalizado!)
    diabetes = load_diabetes()
    datasets['diabetes'] = {
        'data': diabetes.data,
        'names': diabetes.feature_names,
        'is_normalized': True,  # IMPORTANTE!
        'description': 'Diabetes (442×10, JÁ NORMALIZADO)'
    }
    
    return datasets


# Carregar
DATASETS = load_datasets()

print("Datasets carregados:")
for name, info in DATASETS.items():
    data = info['data']
    print(f"  {name}: {data.shape[0]} amostras × {data.shape[1]} features")
    if info['is_normalized']:
        print(f"    ⚠️  JÁ NORMALIZADO (mean≈{data.mean():.3f}, std≈{data.std():.3f})")

---
## 4. Métodos de Imputação

- **KNN (sklearn)**: Baseline
- **MICE (sklearn)**: Baseline iterativo
- **ISCA-k**: Nosso método (a ser implementado)

In [None]:
def impute_knn(data_missing: np.ndarray, k: int = 5) -> np.ndarray:
    """Imputa usando KNN do sklearn."""
    imputer = KNNImputer(n_neighbors=k)
    return imputer.fit_transform(data_missing)


def impute_mice(data_missing: np.ndarray, max_iter: int = 10) -> np.ndarray:
    """Imputa usando MICE (IterativeImputer) do sklearn."""
    imputer = IterativeImputer(max_iter=max_iter, random_state=42)
    return imputer.fit_transform(data_missing)


def impute_iscak(data_missing: np.ndarray) -> np.ndarray:
    """
    Imputa usando ISCA-k.
    
    VERSÃO ACTUAL: KNN simples (baseline)
    TODO: Adicionar componentes progressivamente
    """
    # Por agora, igual ao KNN básico
    # Vamos substituir gradualmente
    imputer = KNNImputer(n_neighbors=5)
    return imputer.fit_transform(data_missing)


# Dicionário de métodos
METHODS = {
    'KNN': impute_knn,
    'MICE': impute_mice,
    'ISCA-k': impute_iscak
}

print(f"Métodos disponíveis: {list(METHODS.keys())}")

---
## 5. Benchmark

Executa o benchmark completo:
- Múltiplos datasets
- Múltiplas taxas de missing
- Múltiplos padrões (MCAR, MAR)
- Múltiplos métodos

In [None]:
def run_single_experiment(data_original: np.ndarray, 
                          method_fn, 
                          missing_rate: float,
                          pattern: str = 'MCAR',
                          random_state: int = 42) -> Dict:
    """
    Executa uma experiência única.
    
    Returns:
        Dict com métricas agregadas
    """
    # Introduzir missings
    if pattern == 'MCAR':
        data_missing, mask = introduce_mcar(data_original, missing_rate, random_state)
    elif pattern == 'MAR':
        data_missing, mask = introduce_mar(data_original, missing_rate, random_state)
    elif pattern == 'MNAR':
        data_missing, mask = introduce_mnar(data_original, missing_rate, random_state)
    else:
        raise ValueError(f"Padrão desconhecido: {pattern}")
    
    # Imputar
    start = time.time()
    try:
        data_imputed = method_fn(data_missing)
        elapsed = time.time() - start
    except Exception as e:
        return {'error': str(e)}
    
    # Calcular métricas por coluna
    all_metrics = []
    for j in range(data_original.shape[1]):
        col_mask = mask[:, j]
        if col_mask.sum() > 0:
            m = compute_metrics(data_original[:, j], data_imputed[:, j], col_mask)
            all_metrics.append(m)
    
    # Agregar
    if not all_metrics:
        return {'error': 'Sem valores para avaliar'}
    
    result = {
        'rmse': np.nanmean([m['rmse'] for m in all_metrics]),
        'nrmse': np.nanmean([m['nrmse'] for m in all_metrics]),
        'mae': np.nanmean([m['mae'] for m in all_metrics]),
        'r2': np.nanmean([m['r2'] for m in all_metrics]),
        'time': elapsed,
        'actual_missing_rate': mask.sum() / mask.size
    }
    
    return result


def run_benchmark(datasets: Dict = None,
                  methods: Dict = None,
                  missing_rates: List[float] = [0.1, 0.2, 0.3, 0.4],
                  patterns: List[str] = ['MCAR', 'MAR'],
                  n_runs: int = 3) -> pd.DataFrame:
    """
    Executa benchmark completo.
    
    Args:
        datasets: Dict de datasets (usa DATASETS global se None)
        methods: Dict de métodos (usa METHODS global se None)
        missing_rates: Lista de taxas de missing
        patterns: Lista de padrões
        n_runs: Número de repetições (com seeds diferentes)
    
    Returns:
        DataFrame com resultados
    """
    if datasets is None:
        datasets = DATASETS
    if methods is None:
        methods = METHODS
    
    results = []
    total = len(datasets) * len(methods) * len(missing_rates) * len(patterns) * n_runs
    current = 0
    
    for ds_name, ds_info in datasets.items():
        data = ds_info['data']
        
        for pattern in patterns:
            for rate in missing_rates:
                for method_name, method_fn in methods.items():
                    for run in range(n_runs):
                        current += 1
                        seed = 42 + run
                        
                        print(f"\r[{current}/{total}] {ds_name} | {pattern} | {rate:.0%} | {method_name} | run {run+1}", end="")
                        
                        metrics = run_single_experiment(
                            data, method_fn, rate, pattern, seed
                        )
                        
                        results.append({
                            'dataset': ds_name,
                            'pattern': pattern,
                            'missing_rate': f"{int(rate*100)}%",
                            'method': method_name,
                            'run': run,
                            **metrics
                        })
    
    print("\nConcluído!")
    return pd.DataFrame(results)

---
## 6. Executar Benchmark

In [None]:
# Configuração
MISSING_RATES = [0.1, 0.2, 0.3, 0.4]
PATTERNS = ['MCAR', 'MAR']  # Ignoramos MNAR por agora
N_RUNS = 3

# Executar
results_df = run_benchmark(
    missing_rates=MISSING_RATES,
    patterns=PATTERNS,
    n_runs=N_RUNS
)

# Guardar
results_df.to_csv('benchmark_results.csv', index=False)
print(f"\nResultados guardados em benchmark_results.csv")
print(f"Total de experiências: {len(results_df)}")

---
## 7. Resultados

In [None]:
# Resumo por método
print("=" * 60)
print("RESUMO POR MÉTODO")
print("=" * 60)

summary = results_df.groupby('method').agg({
    'r2': ['mean', 'std'],
    'nrmse': ['mean', 'std'],
    'time': 'mean'
}).round(4)

display(summary)

In [None]:
# Resumo por dataset
print("\n" + "=" * 60)
print("RESUMO POR DATASET")
print("=" * 60)

for ds in results_df['dataset'].unique():
    print(f"\n--- {ds.upper()} ---")
    ds_data = results_df[results_df['dataset'] == ds]
    summary_ds = ds_data.groupby('method').agg({
        'r2': 'mean',
        'nrmse': 'mean'
    }).round(4)
    display(summary_ds)

In [None]:
# Resumo por taxa de missing
print("\n" + "=" * 60)
print("RESUMO POR TAXA DE MISSING")
print("=" * 60)

for rate in results_df['missing_rate'].unique():
    print(f"\n--- {rate} ---")
    rate_data = results_df[results_df['missing_rate'] == rate]
    summary_rate = rate_data.groupby('method').agg({
        'r2': 'mean',
        'nrmse': 'mean'
    }).round(4)
    display(summary_rate)

---
## 8. Comparação ISCA-k vs Baselines

In [None]:
print("=" * 60)
print("ISCA-k vs BASELINES")
print("=" * 60)

iscak_results = results_df[results_df['method'] == 'ISCA-k']['r2'].mean()
knn_results = results_df[results_df['method'] == 'KNN']['r2'].mean()
mice_results = results_df[results_df['method'] == 'MICE']['r2'].mean()

print(f"\nR² médio:")
print(f"  ISCA-k: {iscak_results:.4f}")
print(f"  KNN:    {knn_results:.4f}")
print(f"  MICE:   {mice_results:.4f}")

print(f"\nDiferença ISCA-k vs KNN:  {iscak_results - knn_results:+.4f}")
print(f"Diferença ISCA-k vs MICE: {iscak_results - mice_results:+.4f}")

if iscak_results >= knn_results:
    print("\n✅ ISCA-k ≥ KNN")
else:
    print("\n❌ ISCA-k < KNN (precisa melhorar)")