# Teste de Robustez do Scale Factor

Testes adicionais antes de implementar:
1. Robustez em taxas extremas (10-60%)
2. Normalização por dimensionalidade efectiva
3. Teste A/B com missings não aleatórios (viés)
4. Visualização de distribuição de pesos vs missings

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings('ignore')

np.random.seed(42)
print("Imports OK")

## Funções Base

In [None]:
def pds_distance_extended(sample, donors, weights, scale_mode='linear'):
    """
    Calcula distâncias PDS com modos de scale factor estendidos.
    
    scale_mode:
        - 'sqrt': sqrt(n/overlap) - actual
        - 'none': 1.0
        - 'linear': n/overlap - normalização por feature
        - 'effective_dims': sqrt(n/effective_dims) onde effective = n - missing_both
        - 'capped_linear': min(n/overlap, 2.0) - linear com cap
        - 'adaptive': linear se overlap < 70%, senão none
    """
    n_features = len(sample)
    min_overlap = max(2, n_features // 2)
    
    distances = []
    overlaps = []
    scales_used = []
    
    sample_avail = ~np.isnan(sample)
    sample_missing = np.isnan(sample)
    
    for donor in donors:
        donor_avail = ~np.isnan(donor)
        donor_missing = np.isnan(donor)
        overlap_mask = sample_avail & donor_avail
        overlap = overlap_mask.sum()
        
        # Missing em ambos (para effective_dims)
        missing_both = (sample_missing & donor_missing).sum()
        effective_dims = n_features - missing_both
        
        if overlap < min_overlap:
            distances.append(np.inf)
            overlaps.append(overlap)
            scales_used.append(np.inf)
            continue
        
        # Calcular distância raw
        dist_sq = 0.0
        weight_sum = 0.0
        for j in range(n_features):
            if overlap_mask[j]:
                diff = sample[j] - donor[j]
                dist_sq += weights[j] * diff * diff
                weight_sum += weights[j]
        
        dist_raw = np.sqrt(dist_sq) if weight_sum > 0 else np.inf
        
        # Calcular scale factor
        ratio = n_features / overlap
        
        if scale_mode == 'sqrt':
            scale = np.sqrt(ratio)
        elif scale_mode == 'none':
            scale = 1.0
        elif scale_mode == 'linear':
            scale = ratio
        elif scale_mode == 'effective_dims':
            scale = np.sqrt(n_features / effective_dims) if effective_dims > 0 else 1.0
        elif scale_mode == 'capped_linear':
            scale = min(ratio, 2.0)  # Cap at 2x
        elif scale_mode == 'adaptive':
            # Linear se overlap < 70%, senão none
            if overlap / n_features < 0.7:
                scale = ratio
            else:
                scale = 1.0
        else:
            scale = 1.0
        
        distances.append(dist_raw * scale)
        overlaps.append(overlap)
        scales_used.append(scale)
    
    return np.array(distances), np.array(overlaps), np.array(scales_used)

In [None]:
def create_correlated_dataset(n_samples=150):
    """Dataset com features correlacionadas"""
    F1 = np.random.normal(0, 1, n_samples)
    F2 = F1 * 0.8 + np.random.normal(0, 0.5, n_samples)
    F3 = np.random.normal(0, 1, n_samples)
    F4 = F1 * 0.3 + F3 * 0.3 + np.random.normal(0, 0.7, n_samples)
    Target = F1 * 2 + F2 * 1.5 + F4 * 0.5 + np.random.normal(0, 1, n_samples)
    return pd.DataFrame({'F1': F1, 'F2': F2, 'F3': F3, 'F4': F4, 'Target': Target})

def introduce_mcar(df, rate=0.2, seed=42):
    """Missings completamente aleatórios"""
    np.random.seed(seed)
    mask = np.random.random(df.shape) < rate
    return df.mask(mask)

def introduce_biased_missings(df, rate=0.2, bias_cols=['F1', 'F2'], seed=42):
    """
    Missings com viés: remove sempre as mesmas colunas em metade das amostras.
    Simula padrão de missings não aleatório.
    """
    np.random.seed(seed)
    df_missing = df.copy()
    n = len(df)
    
    # Metade das amostras: remove bias_cols
    biased_rows = np.random.choice(n, n//2, replace=False)
    for col in bias_cols:
        if col in df.columns:
            df_missing.loc[biased_rows, col] = np.nan
    
    # Adicionar mais missings aleatórios para atingir rate
    current_rate = df_missing.isna().sum().sum() / (n * len(df.columns))
    if current_rate < rate:
        additional_rate = (rate - current_rate) / (1 - current_rate)
        mask = np.random.random(df.shape) < additional_rate
        df_missing = df_missing.mask(mask | df_missing.isna())
    
    return df_missing

In [None]:
def run_imputation_test(df_complete, df_missing, scale_mode, k=5):
    """Executa imputação e retorna MAE"""
    result = df_missing.copy()
    n_features = len(df_missing.columns)
    weights = np.ones(n_features) / n_features
    
    missing_positions = []
    
    for col in df_missing.columns:
        missing_mask = df_missing[col].isna()
        if not missing_mask.any():
            continue
        
        donor_mask = ~df_missing[col].isna()
        if donor_mask.sum() < 2:
            continue
        
        donor_indices = df_missing[donor_mask].index.tolist()
        donor_values = df_missing.loc[donor_mask, col].values
        
        for idx in df_missing[missing_mask].index:
            missing_positions.append((idx, col))
            
            sample = df_missing.loc[idx].values.astype(float)
            donors = df_missing.loc[donor_indices].values.astype(float)
            
            distances, _, _ = pds_distance_extended(sample, donors, weights, scale_mode)
            
            valid_mask = np.isfinite(distances)
            if valid_mask.sum() < 1:
                continue
            
            valid_distances = distances[valid_mask]
            valid_values = donor_values[valid_mask]
            
            k_actual = min(k, len(valid_distances))
            top_k_idx = np.argsort(valid_distances)[:k_actual]
            
            top_distances = valid_distances[top_k_idx]
            top_values = valid_values[top_k_idx]
            
            if np.any(top_distances < 1e-10):
                imputed = np.mean(top_values[top_distances < 1e-10])
            else:
                w = 1 / (top_distances + 1e-6)
                w = w / w.sum()
                imputed = np.average(top_values, weights=w)
            
            result.loc[idx, col] = imputed
    
    # Calcular MAE
    errors = []
    for idx, col in missing_positions:
        true_val = df_complete.loc[idx, col]
        imp_val = result.loc[idx, col]
        if pd.notna(imp_val):
            errors.append(abs(imp_val - true_val))
    
    return np.mean(errors) if errors else np.nan

---
## TESTE 1: Robustez em Taxas Extremas (10-60%)

In [None]:
print("="*70)
print("TESTE 1: ROBUSTEZ EM TAXAS EXTREMAS")
print("="*70)

df_complete = create_correlated_dataset(200)

scale_modes = ['sqrt', 'linear', 'none', 'capped_linear', 'adaptive', 'effective_dims']
missing_rates = [0.10, 0.20, 0.30, 0.40, 0.50, 0.60]

results_robustness = {mode: [] for mode in scale_modes}

for rate in missing_rates:
    print(f"\nTaxa: {rate*100:.0f}%", end=" ")
    
    for mode in scale_modes:
        maes = []
        for run in range(3):
            df_missing = introduce_mcar(df_complete, rate, seed=42+run)
            mae = run_imputation_test(df_complete, df_missing, mode)
            if pd.notna(mae):
                maes.append(mae)
        
        results_robustness[mode].append(np.mean(maes) if maes else np.nan)
    
    print("✓")

print("\nConcluído!")

In [None]:
# Tabela de resultados
print("\n" + "="*70)
print("RESULTADOS: MAE por Taxa de Missing")
print("="*70)

header = f"{'Mode':<15}" + "".join([f"{int(r*100)}%{'':>6}" for r in missing_rates]) + "Média"
print(header)
print("-" * 75)

for mode in scale_modes:
    vals = results_robustness[mode]
    mean_val = np.nanmean(vals)
    row = f"{mode:<15}" + "".join([f"{v:<10.4f}" if pd.notna(v) else f"{'N/A':<10}" for v in vals])
    row += f"{mean_val:.4f}"
    print(row)

# Encontrar melhor por taxa
print("\n--- MELHOR POR TAXA ---")
for i, rate in enumerate(missing_rates):
    best_mode = min(scale_modes, key=lambda m: results_robustness[m][i] if pd.notna(results_robustness[m][i]) else 999)
    best_val = results_robustness[best_mode][i]
    print(f"  {int(rate*100)}%: {best_mode} (MAE={best_val:.4f})")

In [None]:
# Gráfico de evolução
plt.figure(figsize=(12, 6))

for mode in scale_modes:
    vals = results_robustness[mode]
    plt.plot([int(r*100) for r in missing_rates], vals, marker='o', label=mode, linewidth=2)

plt.xlabel('Taxa de Missing (%)', fontsize=12)
plt.ylabel('MAE', fontsize=12)
plt.title('Evolução do MAE por Taxa de Missing', fontsize=14)
plt.legend(loc='upper left')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

# Verificar ponto de viragem
print("\n--- ANÁLISE DE PONTO DE VIRAGEM ---")
linear_vals = results_robustness['linear']
sqrt_vals = results_robustness['sqrt']

for i, rate in enumerate(missing_rates):
    diff = linear_vals[i] - sqrt_vals[i] if pd.notna(linear_vals[i]) and pd.notna(sqrt_vals[i]) else np.nan
    winner = "LINEAR" if diff < 0 else "SQRT" if diff > 0 else "EMPATE"
    print(f"  {int(rate*100)}%: linear={linear_vals[i]:.4f}, sqrt={sqrt_vals[i]:.4f} → {winner} (diff={diff:.4f})")

---
## TESTE 2: Missings com Viés (Não Aleatórios)

In [None]:
print("\n" + "="*70)
print("TESTE 2: MISSINGS COM VIÉS")
print("="*70)
print("\nViés: F1 e F2 (features mais importantes) removidas em metade das amostras")

df_complete = create_correlated_dataset(200)

# Testar com missings viesados
results_biased = {mode: [] for mode in scale_modes}
results_mcar = {mode: [] for mode in scale_modes}  # Comparação

for rate in [0.20, 0.30, 0.40]:
    print(f"\nTaxa: {rate*100:.0f}%")
    
    for mode in scale_modes:
        # Com viés
        maes_biased = []
        for run in range(3):
            df_biased = introduce_biased_missings(df_complete, rate, bias_cols=['F1', 'F2'], seed=42+run)
            mae = run_imputation_test(df_complete, df_biased, mode)
            if pd.notna(mae):
                maes_biased.append(mae)
        
        results_biased[mode].append(np.mean(maes_biased) if maes_biased else np.nan)
        
        # MCAR (comparação)
        maes_mcar = []
        for run in range(3):
            df_mcar = introduce_mcar(df_complete, rate, seed=42+run)
            mae = run_imputation_test(df_complete, df_mcar, mode)
            if pd.notna(mae):
                maes_mcar.append(mae)
        
        results_mcar[mode].append(np.mean(maes_mcar) if maes_mcar else np.nan)

In [None]:
print("\n--- COMPARAÇÃO: MCAR vs VIÉS ---")
print(f"\n{'Mode':<15} {'MCAR 20%':<12} {'Viés 20%':<12} {'MCAR 30%':<12} {'Viés 30%':<12} {'MCAR 40%':<12} {'Viés 40%':<12}")
print("-" * 90)

for mode in scale_modes:
    mcar_vals = results_mcar[mode]
    bias_vals = results_biased[mode]
    row = f"{mode:<15}"
    for i in range(3):
        row += f"{mcar_vals[i]:<12.4f}{bias_vals[i]:<12.4f}"
    print(row)

# Qual método é mais robusto ao viés?
print("\n--- ROBUSTEZ AO VIÉS (menor degradação MCAR→Viés) ---")
for mode in scale_modes:
    mcar_mean = np.nanmean(results_mcar[mode])
    bias_mean = np.nanmean(results_biased[mode])
    degradation = (bias_mean - mcar_mean) / mcar_mean * 100 if mcar_mean > 0 else np.nan
    print(f"  {mode:<15}: MCAR={mcar_mean:.4f}, Viés={bias_mean:.4f}, Degradação={degradation:+.1f}%")

---
## TESTE 3: Visualização de Distribuição de Pesos vs Missings

In [None]:
print("\n" + "="*70)
print("TESTE 3: DISTRIBUIÇÃO DE SCALE FACTORS vs OVERLAP")
print("="*70)

# Criar dataset com missings variados
df_complete = create_correlated_dataset(200)
df_missing = introduce_mcar(df_complete, rate=0.3, seed=42)

n_features = len(df_missing.columns)
weights = np.ones(n_features) / n_features

# Recolher dados de overlap e scale factors
overlap_data = {mode: [] for mode in scale_modes}
scale_data = {mode: [] for mode in scale_modes}
distance_data = {mode: [] for mode in scale_modes}

# Para cada par sample-donor, recolher overlap e scale usado
for idx in df_missing.index[:50]:  # Primeiras 50 amostras
    sample = df_missing.loc[idx].values.astype(float)
    donors = df_missing.values.astype(float)
    
    for mode in scale_modes:
        distances, overlaps, scales = pds_distance_extended(sample, donors, weights, mode)
        
        valid = np.isfinite(distances)
        overlap_data[mode].extend(overlaps[valid])
        scale_data[mode].extend(scales[valid])
        distance_data[mode].extend(distances[valid])

print(f"Recolhidos {len(overlap_data['linear'])} pares sample-donor")

In [None]:
# Visualização: Scale Factor vs Overlap
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
axes = axes.flatten()

for i, mode in enumerate(scale_modes):
    ax = axes[i]
    overlaps = np.array(overlap_data[mode])
    scales = np.array(scale_data[mode])
    
    # Scatter plot
    ax.scatter(overlaps, scales, alpha=0.3, s=10)
    
    # Linha de tendência
    unique_overlaps = np.unique(overlaps)
    mean_scales = [np.mean(scales[overlaps == o]) for o in unique_overlaps]
    ax.plot(unique_overlaps, mean_scales, 'r-', linewidth=2, label='Média')
    
    ax.set_xlabel('Overlap (n features)')
    ax.set_ylabel('Scale Factor')
    ax.set_title(f'{mode.upper()}')
    ax.grid(True, alpha=0.3)
    ax.legend()

plt.suptitle('Scale Factor vs Overlap por Método', fontsize=14)
plt.tight_layout()
plt.show()

In [None]:
# Correlação entre overlap e scale factor
print("\n--- CORRELAÇÃO: Overlap vs Scale Factor ---")
print("(Idealmente próximo de 0 para independência, ou negativo para penalizar baixo overlap)")

for mode in scale_modes:
    overlaps = np.array(overlap_data[mode])
    scales = np.array(scale_data[mode])
    
    # Remover infinitos
    valid = np.isfinite(scales)
    if valid.sum() > 10:
        corr = np.corrcoef(overlaps[valid], scales[valid])[0, 1]
        print(f"  {mode:<15}: r = {corr:+.4f}")
    else:
        print(f"  {mode:<15}: N/A (poucos dados)")

In [None]:
# Visualização: Distribuição de distâncias por método
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
axes = axes.flatten()

for i, mode in enumerate(scale_modes):
    ax = axes[i]
    distances = np.array(distance_data[mode])
    distances = distances[np.isfinite(distances)]  # Remover inf
    
    ax.hist(distances, bins=50, alpha=0.7, edgecolor='black')
    ax.axvline(np.median(distances), color='r', linestyle='--', label=f'Mediana: {np.median(distances):.2f}')
    ax.axvline(np.mean(distances), color='g', linestyle='--', label=f'Média: {np.mean(distances):.2f}')
    
    ax.set_xlabel('Distância')
    ax.set_ylabel('Frequência')
    ax.set_title(f'{mode.upper()}')
    ax.legend()
    ax.grid(True, alpha=0.3)

plt.suptitle('Distribuição de Distâncias por Método', fontsize=14)
plt.tight_layout()
plt.show()

---
## RESUMO FINAL

In [None]:
print("\n" + "="*70)
print("TESTE 4: OTIMIZAÇÃO DO THRESHOLD ADAPTIVE")
print("="*70)

def pds_distance_adaptive_threshold(sample, donors, weights, threshold):
    """
    Adaptive com threshold configurável.
    - overlap < threshold: usa linear (penalização forte)
    - overlap >= threshold: usa sqrt (penalização suave)
    """
    n_features = len(sample)
    min_overlap = max(2, n_features // 2)
    
    distances = []
    sample_avail = ~np.isnan(sample)
    
    for donor in donors:
        donor_avail = ~np.isnan(donor)
        overlap_mask = sample_avail & donor_avail
        overlap = overlap_mask.sum()
        
        if overlap < min_overlap:
            distances.append(np.inf)
            continue
        
        # Distância raw
        dist_sq = 0.0
        weight_sum = 0.0
        for j in range(n_features):
            if overlap_mask[j]:
                diff = sample[j] - donor[j]
                dist_sq += weights[j] * diff * diff
                weight_sum += weights[j]
        
        dist_raw = np.sqrt(dist_sq) if weight_sum > 0 else np.inf
        
        # Scale factor adaptive
        ratio = n_features / overlap
        overlap_ratio = overlap / n_features
        
        if overlap_ratio < threshold:
            scale = ratio  # linear
        else:
            scale = np.sqrt(ratio)  # sqrt
        
        distances.append(dist_raw * scale)
    
    return np.array(distances)


def run_imputation_adaptive_threshold(df_complete, df_missing, threshold, k=5):
    """Imputação com threshold específico"""
    result = df_missing.copy()
    n_features = len(df_missing.columns)
    weights = np.ones(n_features) / n_features
    
    missing_positions = []
    
    for col in df_missing.columns:
        missing_mask = df_missing[col].isna()
        if not missing_mask.any():
            continue
        
        donor_mask = ~df_missing[col].isna()
        if donor_mask.sum() < 2:
            continue
        
        donor_indices = df_missing[donor_mask].index.tolist()
        donor_values = df_missing.loc[donor_mask, col].values
        
        for idx in df_missing[missing_mask].index:
            missing_positions.append((idx, col))
            
            sample = df_missing.loc[idx].values.astype(float)
            donors = df_missing.loc[donor_indices].values.astype(float)
            
            distances = pds_distance_adaptive_threshold(sample, donors, weights, threshold)
            
            valid_mask = np.isfinite(distances)
            if valid_mask.sum() < 1:
                continue
            
            valid_distances = distances[valid_mask]
            valid_values = donor_values[valid_mask]
            
            k_actual = min(k, len(valid_distances))
            top_k_idx = np.argsort(valid_distances)[:k_actual]
            
            top_distances = valid_distances[top_k_idx]
            top_values = valid_values[top_k_idx]
            
            if np.any(top_distances < 1e-10):
                imputed = np.mean(top_values[top_distances < 1e-10])
            else:
                w = 1 / (top_distances + 1e-6)
                w = w / w.sum()
                imputed = np.average(top_values, weights=w)
            
            result.loc[idx, col] = imputed
    
    # MAE
    errors = []
    for idx, col in missing_positions:
        true_val = df_complete.loc[idx, col]
        imp_val = result.loc[idx, col]
        if pd.notna(imp_val):
            errors.append(abs(imp_val - true_val))
    
    return np.mean(errors) if errors else np.nan


# Testar thresholds
thresholds = [0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]  # 1.0 = sempre linear
missing_rates = [0.10, 0.20, 0.30, 0.40, 0.50]

df_complete = create_correlated_dataset(200)

results_threshold = {t: [] for t in thresholds}

for rate in missing_rates:
    print(f"\nTaxa: {rate*100:.0f}%", end=" ")
    
    for threshold in thresholds:
        maes = []
        for run in range(3):
            df_missing = introduce_mcar(df_complete, rate, seed=42+run)
            mae = run_imputation_adaptive_threshold(df_complete, df_missing, threshold)
            if pd.notna(mae):
                maes.append(mae)
        
        results_threshold[threshold].append(np.mean(maes) if maes else np.nan)
    
    print("✓")

print("\nConcluído!")

In [None]:
# Tabela de resultados por threshold
print("\n" + "="*70)
print("RESULTADOS: MAE por Threshold Adaptive")
print("="*70)

header = f"{'Threshold':<12}" + "".join([f"{int(r*100)}%{'':>6}" for r in missing_rates]) + "Média"
print(header)
print("-" * 70)

for threshold in thresholds:
    vals = results_threshold[threshold]
    mean_val = np.nanmean(vals)
    row = f"{threshold:<12.1f}" + "".join([f"{v:<10.4f}" if pd.notna(v) else f"{'N/A':<10}" for v in vals])
    row += f"{mean_val:.4f}"
    print(row)

# Encontrar melhor threshold global
best_threshold = min(thresholds, key=lambda t: np.nanmean(results_threshold[t]))
best_mae = np.nanmean(results_threshold[best_threshold])
print(f"\n>>> MELHOR THRESHOLD GLOBAL: {best_threshold} (MAE médio = {best_mae:.4f})")

# Melhor por taxa
print("\n--- MELHOR THRESHOLD POR TAXA ---")
for i, rate in enumerate(missing_rates):
    best_t = min(thresholds, key=lambda t: results_threshold[t][i] if pd.notna(results_threshold[t][i]) else 999)
    best_v = results_threshold[best_t][i]
    print(f"  {int(rate*100)}%: threshold={best_t} (MAE={best_v:.4f})")

In [None]:
# Gráfico: MAE vs Threshold por taxa de missing
plt.figure(figsize=(12, 6))

for rate_idx, rate in enumerate(missing_rates):
    maes = [results_threshold[t][rate_idx] for t in thresholds]
    plt.plot(thresholds, maes, marker='o', label=f'{int(rate*100)}% missing', linewidth=2)

plt.xlabel('Threshold (overlap ratio)', fontsize=12)
plt.ylabel('MAE', fontsize=12)
plt.title('MAE vs Threshold Adaptive por Taxa de Missing', fontsize=14)
plt.legend()
plt.grid(True, alpha=0.3)
plt.xticks(thresholds)
plt.tight_layout()
plt.show()

# Gráfico: MAE médio vs Threshold
plt.figure(figsize=(10, 5))

mean_maes = [np.nanmean(results_threshold[t]) for t in thresholds]
colors = ['green' if m == min(mean_maes) else 'steelblue' for m in mean_maes]

plt.bar([str(t) for t in thresholds], mean_maes, color=colors, edgecolor='black')
plt.xlabel('Threshold', fontsize=12)
plt.ylabel('MAE Médio', fontsize=12)
plt.title('MAE Médio por Threshold (verde = melhor)', fontsize=14)
plt.grid(True, alpha=0.3, axis='y')
plt.tight_layout()
plt.show()

print(f"\nNota: threshold=1.0 significa 'sempre linear' (nunca usa sqrt)")
print(f"      threshold=0.0 significaria 'sempre sqrt' (nunca usa linear)")

---
## CONCLUSÕES E RECOMENDAÇÕES

### Interpretação do Threshold:
- **threshold = 0.4**: Se overlap < 40% → linear, senão sqrt
- **threshold = 0.7**: Se overlap < 70% → linear, senão sqrt  
- **threshold = 1.0**: Sempre linear (sqrt nunca usado)

### O que esperamos:
- **Threshold baixo (0.4-0.5)**: Mais uso de sqrt → menos penalização → potencialmente pior em taxas altas
- **Threshold alto (0.9-1.0)**: Mais uso de linear → mais penalização → melhor discriminação de donors