# Valida√ß√£o de Robustez - Modelos AML

Este notebook realiza testes de robustez dos modelos de detec√ß√£o de AML em diferentes cen√°rios futuros e an√°lise de concept drift.

## Objetivos
- Testar modelos em cen√°rios sint√©ticos de stress
- Avaliar sensibilidade a concept drift
- Simular ataques adversariais
- Identificar vulnerabilidades e pontos de melhoria

In [None]:
# CONFIGURA√á√ÉO INICIAL
import sys
import os
from pathlib import Path
import pickle
import json
from datetime import datetime

# Adicionar diret√≥rio raiz ao path
project_root = Path.cwd().parent
sys.path.append(str(project_root))

# Imports
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import precision_recall_curve, auc, f1_score, precision_score, recall_score

# Configura√ß√µes
plt.style.use('default')
sns.set_palette("husl")
pd.set_option('display.max_columns', None)

# Diret√≥rios
artifacts_dir = project_root / 'artifacts'
artifacts_dir.mkdir(exist_ok=True)

print(f"Diret√≥rio de artefatos: {artifacts_dir}")
print(f"Python path configurado: {project_root}")

## Carregamento dos Dados e Modelos

In [None]:
# CARREGAR DADOS PROCESSADOS
print("Carregando dados processados...")

# Carregar features
features_path = artifacts_dir / 'X_processed.csv'
X = pd.read_csv(features_path)
print(f"Features carregadas: {X.shape}")

# Carregar target
target_path = artifacts_dir / 'y_processed.csv'
y = pd.read_csv(target_path).iloc[:, 0]
print(f"Target carregado: {len(y)} amostras")

# Verificar consist√™ncia
assert len(X) == len(y), "Inconsist√™ncia entre features e target"
print(f"Taxa de fraude: {y.mean():.3%}")

In [None]:
# CARREGAR MODELOS OTIMIZADOS
models = {}
model_names = ['XGBoost', 'LightGBM', 'RandomForest', 'Ensemble']

for name in model_names:
    try:
        model_path = artifacts_dir / f'{name.lower()}_extended.pkl'
        with open(model_path, 'rb') as f:
            models[name] = pickle.load(f)
        print(f"‚úÖ {name} carregado")
    except Exception as e:
        print(f"‚ùå Erro ao carregar {name}: {e}")

print(f"\nModelos carregados: {list(models.keys())}")

## Cria√ß√£o de Cen√°rios de Teste

In [None]:
# CEN√ÅRIOS DE TESTE DE ROBUSTEZ
scenarios = {
    'baseline': X.copy(),  # Cen√°rio normal
    'fraud_increase': None,  # Ser√° criado
    'value_shift': None,    # Ser√° criado
    'noisy_data': None,     # Ser√° criado
    'missing_data': None    # Ser√° criado
}

print("üîÆ CRIANDO CEN√ÅRIOS DE TESTE...")

# Cen√°rio 1: Aumento da taxa de fraude
fraud_increase = X.copy()
fraud_indices = y[y == 1].index
additional_fraud = X.loc[fraud_indices].sample(frac=0.5, replace=True)
additional_fraud_y = pd.Series([1] * len(additional_fraud), index=additional_fraud.index)
fraud_increase = pd.concat([fraud_increase, additional_fraud])
fraud_increase_y = pd.concat([y, additional_fraud_y])
scenarios['fraud_increase'] = (fraud_increase, fraud_increase_y)

# Cen√°rio 2: Mudan√ßa nos valores das transa√ß√µes
value_shift = X.copy()
numeric_cols = value_shift.select_dtypes(include=[np.number]).columns
numeric_cols = [col for col in numeric_cols if col != 'is_fraud']
for col in numeric_cols:
    if 'amount' in col.lower():
        value_shift[col] = value_shift[col] * 1.2  # Aumento de 20%
scenarios['value_shift'] = (value_shift, y)

# Cen√°rio 3: Dados com ru√≠do
noisy_data = X.copy()
for col in numeric_cols:
    noise = np.random.normal(0, noisy_data[col].std() * 0.1, len(noisy_data))
    noisy_data[col] = noisy_data[col] + noise
scenarios['noisy_data'] = (noisy_data, y)

# Cen√°rio 4: Dados com missing values
missing_data = X.copy()
for col in missing_data.columns:
    if col != 'is_fraud':
        mask = np.random.random(len(missing_data)) < 0.05  # 5% missing
        missing_data.loc[mask, col] = np.nan

# Imputa√ß√£o simples (mediana)
for col in numeric_cols:
    median_val = missing_data[col].median()
    missing_data[col] = missing_data[col].fillna(median_val)
scenarios['missing_data'] = (missing_data, y)

print("   ‚úÖ Cen√°rios criados:")
for name, data in scenarios.items():
    if data is not None:
        if isinstance(data, tuple):
            X_scenario, y_scenario = data
            fraud_rate = y_scenario.mean()
            print(f"      ‚Ä¢ {name}: {len(X_scenario):,} amostras ({fraud_rate:.3%} fraud)")
        else:
            print(f"      ‚Ä¢ {name}: {len(data):,} amostras")

## Avalia√ß√£o de Robustez

In [None]:
# AVALIA√á√ÉO DE ROBUSTEZ DOS MODELOS
print("üõ°Ô∏è AVALIANDO ROBUSTEZ DOS MODELOS...")

robustness_results = {}

for scenario_name, scenario_data in scenarios.items():
    if scenario_data is None:
        continue

    print(f"\nüîç Testando cen√°rio: {scenario_name}")

    if isinstance(scenario_data, tuple):
        X_scenario, y_scenario = scenario_data
    else:
        X_scenario, y_scenario = scenario_data, y

    # Limitar tamanho para avalia√ß√£o r√°pida
    if len(X_scenario) > 50000:
        sample_indices = np.random.choice(len(X_scenario), 50000, replace=False)
        X_scenario = X_scenario.iloc[sample_indices]
        y_scenario = y_scenario.iloc[sample_indices]

    scenario_results = {}

    for model_name, model in models.items():
        try:
            # Fazer predi√ß√µes
            y_pred_proba = model.predict_proba(X_scenario)[:, 1]
            y_pred = (y_pred_proba > 0.5).astype(int)

            # Calcular m√©tricas
            precision = precision_score(y_scenario, y_pred, zero_division=0)
            recall = recall_score(y_scenario, y_pred, zero_division=0)
            f1 = f1_score(y_scenario, y_pred, zero_division=0)

            precision_curve, recall_curve, _ = precision_recall_curve(y_scenario, y_pred_proba)
            pr_auc = auc(recall_curve, precision_curve)

            scenario_results[model_name] = {
                'precision': precision,
                'recall': recall,
                'f1_score': f1,
                'pr_auc': pr_auc,
                'test_samples': len(y_scenario),
                'fraud_cases': y_scenario.sum()
            }

            print(f"     üìä {model_name}: F1={f1:.4f}, PR-AUC={pr_auc:.4f}")

        except Exception as e:
            print(f"     ‚ùå Erro em {model_name}: {e}")
            scenario_results[model_name] = {'error': str(e)}

    robustness_results[scenario_name] = scenario_results

print(f"\nCen√°rios testados: {len(robustness_results)}")

## An√°lise de Concept Drift

In [None]:
# AN√ÅLISE DE CONCEPT DRIFT
print("üåä AN√ÅLISE DE CONCEPT DRIFT")
print("-" * 25)

# Usar o cen√°rio baseline (X, y originais)
baseline_X, baseline_y = X, y
baseline_results = {}

for model_name, model in models.items():
    try:
        y_pred_proba = model.predict_proba(baseline_X)[:, 1]
        y_pred = (y_pred_proba > 0.5).astype(int)

        precision_curve, recall_curve, _ = precision_recall_curve(baseline_y, y_pred_proba)
        pr_auc = auc(recall_curve, precision_curve)
        f1 = f1_score(baseline_y, y_pred)

        baseline_results[model_name] = {
            'pr_auc': pr_auc,
            'f1_score': f1
        }
    except Exception as e:
        baseline_results[model_name] = {'error': str(e)}

# Comparar cen√°rios vs baseline
drift_analysis = {
    'baseline_performance': baseline_results,
    'drift_indicators': {},
    'vulnerabilities': []
}

for scenario_name, scenario_results in robustness_results.items():
    if scenario_name == 'baseline':
        continue

    print(f"\nüîÑ Comparando {scenario_name} vs baseline:")

    scenario_drift = {}

    for model_name in baseline_results.keys():
        if model_name in scenario_results and 'error' not in scenario_results[model_name]:
            baseline_metrics = baseline_results[model_name]
            scenario_metrics = scenario_results[model_name]

            # Calcular diferen√ßas percentuais
            pr_auc_diff = (scenario_metrics['pr_auc'] - baseline_metrics['pr_auc']) / baseline_metrics['pr_auc'] * 100
            f1_diff = (scenario_metrics['f1_score'] - baseline_metrics['f1_score']) / baseline_metrics['f1_score'] * 100

            scenario_drift[model_name] = {
                'pr_auc_change_percent': pr_auc_diff,
                'f1_change_percent': f1_diff,
                'baseline_pr_auc': baseline_metrics['pr_auc'],
                'scenario_pr_auc': scenario_metrics['pr_auc']
            }

            print(f"     üìä {model_name}: PR-AUC {pr_auc_diff:+.1f}%, F1 {f1_diff:+.1f}%")

            # Identificar vulnerabilidades
            if abs(pr_auc_diff) > 10:  # Mudan√ßa > 10%
                severity = 'high' if abs(pr_auc_diff) > 20 else 'medium'
                drift_analysis['vulnerabilities'].append({
                    'scenario': scenario_name,
                    'model': model_name,
                    'metric': 'pr_auc',
                    'change_percent': pr_auc_diff,
                    'severity': severity
                })

    drift_analysis['drift_indicators'][scenario_name] = scenario_drift

print(f"\nVulnerabilidades identificadas: {len(drift_analysis['vulnerabilities'])}")

## Simula√ß√£o de Ataques Adversariais

In [None]:
# SIMULA√á√ÉO DE ATAQUES ADVERSARIAIS
print("üéØ SIMULA√á√ÉO DE ATAQUES ADVERSARIAIS")
print("-" * 35)

attack_results = {}

# Usar uma amostra menor para ataques
sample_size = min(10000, len(X))
sample_indices = np.random.choice(len(X), sample_size, replace=False)
X_attack = X.iloc[sample_indices]

for model_name, model in models.items():
    print(f"   üîÑ Testando ataques em {model_name}...")

    try:
        # Ataque: Feature perturbation
        X_perturbed = X_attack.copy()
        numeric_cols = X_perturbed.select_dtypes(include=[np.number]).columns

        # Identificar features importantes (simplificado - top 5)
        important_features = numeric_cols[:5]  # Simplificado

        # Adicionar ru√≠do direcionado
        for col in important_features:
            if col in numeric_cols:
                noise = np.random.normal(0, X_perturbed[col].std() * 0.5, len(X_perturbed))
                X_perturbed[col] = X_perturbed[col] + noise

        # Avaliar impacto
        y_pred_original = model.predict_proba(X_attack)[:, 1]
        y_pred_perturbed = model.predict_proba(X_perturbed)[:, 1]

        pred_diff = np.abs(y_pred_original - y_pred_perturbed)
        avg_diff = pred_diff.mean()
        stability = 1 - avg_diff

        attack_results[model_name] = {
            'perturbation_attack': {
                'avg_prediction_change': avg_diff,
                'max_prediction_change': pred_diff.max(),
                'prediction_stability': stability
            }
        }

        print(f"     üìä Ataque de perturba√ß√£o: mudan√ßa m√©dia = {avg_diff:.4f}, estabilidade = {stability:.4f}")

    except Exception as e:
        print(f"     ‚ùå Erro no ataque para {model_name}: {e}")
        attack_results[model_name] = {'error': str(e)}

print(f"\nModelos testados contra ataques: {len(attack_results)}")

## Relat√≥rio de Robustez

In [None]:
# GERAR RELAT√ìRIO DE ROBUSTEZ
print("üìã GERANDO RELAT√ìRIO DE ROBUSTEZ...")

robustness_report = {
    'timestamp': datetime.now().isoformat(),
    'phase': 'Valida√ß√£o de Robustez',
    'scenarios_tested': list(robustness_results.keys()),
    'robustness_results': robustness_results,
    'concept_drift_analysis': drift_analysis,
    'adversarial_attacks': attack_results,
    'key_findings': {
        'overall_robustness': 'Modelos mostram robustez vari√°vel por cen√°rio',
        'vulnerabilities_identified': len(drift_analysis.get('vulnerabilities', [])),
        'most_robust_model': None,  # Ser√° determinado
        'drift_sensitivity': 'An√°lise de sensibilidade a concept drift realizada'
    },
    'recommendations': {
        'monitoring': [
            'Implementar monitoramento cont√≠nuo de performance',
            'Alertas autom√°ticos para degrada√ß√£o de performance',
            'Re-treinamento peri√≥dico baseado em thresholds',
            'Valida√ß√£o cruzada temporal em produ√ß√£o'
        ],
        'robustness_improvements': [
            'Considerar ensemble methods para maior robustez',
            'Implementar detec√ß√£o de concept drift',
            'Adicionar valida√ß√£o de entrada de dados',
            'Desenvolver estrat√©gias de fallback'
        ]
    }
}

# Determinar modelo mais robusto
if robustness_results:
    baseline = robustness_results.get('baseline', {})
    if baseline:
        model_stability = {}
        for model_name in baseline_results.keys():
            stability_scores = []
            for scenario_name, scenario_results in robustness_results.items():
                if scenario_name != 'baseline' and model_name in scenario_results:
                    scenario_metrics = scenario_results[model_name]
                    baseline_metrics = baseline_results[model_name]
                    if 'pr_auc' in scenario_metrics and 'pr_auc' in baseline_metrics:
                        stability = 1 - abs(scenario_metrics['pr_auc'] - baseline_metrics['pr_auc'])
                        stability_scores.append(stability)

            if stability_scores:
                model_stability[model_name] = np.mean(stability_scores)

        if model_stability:
            most_robust = max(model_stability.items(), key=lambda x: x[1])
            robustness_report['key_findings']['most_robust_model'] = most_robust[0]

# Salvar relat√≥rio
with open(artifacts_dir / 'robustness_analysis_notebook.json', 'w') as f:
    json.dump(robustness_report, f, indent=2, default=str)

print(f"   üíæ Relat√≥rio salvo: {artifacts_dir / 'robustness_analysis_notebook.json'}")

# Resumo executivo
print("\nüõ°Ô∏è RESUMO EXECUTIVO - VALIDA√á√ÉO DE ROBUSTEZ:")
print("   üõ°Ô∏è TESTES DE ROBUSTEZ CONCLU√çDOS:")
print(f"   ‚Ä¢ Cen√°rios testados: {len(robustness_results)}")
print("   ‚Ä¢ An√°lise de concept drift: Realizada")
print("   ‚Ä¢ Ataques adversariais: Simulados")

vulnerabilities = len(drift_analysis.get('vulnerabilities', []))
if vulnerabilities > 0:
    print(f"   ‚ö†Ô∏è Vulnerabilidades identificadas: {vulnerabilities}")
else:
    print("   ‚úÖ Nenhuma vulnerabilidade cr√≠tica identificada")

most_robust = robustness_report['key_findings'].get('most_robust_model')
if most_robust:
    print(f"   üèÜ Modelo mais robusto: {most_robust}")

print("\nüí° PR√ìXIMAS A√á√ïES RECOMENDADAS:")
print("   1. Implementar monitoramento cont√≠nuo de performance")
print("   2. Configurar alertas para degrada√ß√£o de m√©tricas")
print("   3. Preparar estrat√©gias de re-treinamento")
print("   4. Finalizar documenta√ß√£o e reprodutibilidade")

print("\n‚úÖ VALIDA√á√ÉO DE ROBUSTEZ CONCLU√çDA!")