Importação da etapa de pré-processamento

In [11]:
import sys
import os
sys.path.append(os.path.abspath('..'))
sys.path.append(os.path.abspath('../STab'))

In [12]:
from pre_processing import ChurnDataProcessor
from STab import MainModel, Num_Cat

Importação das bibliotecas necessárias

In [None]:
import pandas as pd
import numpy as np
import torch
import torch.nn.functional as F
import optuna
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import classification_report, confusion_matrix
from scipy.stats import ks_2samp
import keras4torch
from keras4torch.callbacks import EarlyStopping, ModelCheckpoint

In [None]:
# Configuração de semente para reprodutibilidade
SEED = 42
torch.manual_seed(SEED)
np.random.seed(SEED)

In [None]:
file_path = os.path.abspath('../../data/customer_churn_telecom_services.csv')
processor = ChurnDataProcessor(file_path)
processor.split_and_balance() # realizar apenas a divisão dos dados, porque vamos usar técnicas de tratamento diferentes para o STab

In [None]:
#Log de verificação dos dados
print(f"Dados carregados e divididos.")
print(f"Treino: {processor.train_df.shape}")
print(f"Validação: {processor.validation_df.shape}")
print(f"Teste: {processor.test_df.shape}")

In [None]:
def prepare_data_for_stab(processor):
    """
    Prepara os dados do processor para o formato específico do STab:
    - Numéricas: StandardScaler
    - Categóricas: LabelEncoder (Inteiros)
    - Retorno: Listas [X_num, X_cat] compatíveis com Num_Cat
    """
    # Cópias para não alterar o original
    train_df = processor.train_df.copy()
    val_df = processor.validation_df.copy()
    test_df = processor.test_df.copy()

    # Tratamento de Nulos
    for df in [train_df, val_df, test_df]:
        df['TotalCharges'] = df['TotalCharges'].fillna(0.0)

    # Definição de Colunas
    target_col = 'Churn'
    num_cols = ['tenure', 'MonthlyCharges', 'TotalCharges']
    # Todas as outras colunas (exceto target e numéricas) são categóricas
    cat_cols = [c for c in train_df.columns if c not in num_cols + [target_col]]

    # 1. Processamento Numérico (StandardScaler)
    scaler = StandardScaler()
    X_train_num = scaler.fit_transform(train_df[num_cols]).astype(np.float32)
    X_val_num = scaler.transform(val_df[num_cols]).astype(np.float32)
    X_test_num = scaler.transform(test_df[num_cols]).astype(np.float32)

    # 2. Processamento Categórico (LabelEncoder)
    cat_cardinalities = []
    
    # Matrizes para guardar os índices inteiros
    X_train_cat = np.zeros((len(train_df), len(cat_cols)), dtype=np.int64)
    X_val_cat = np.zeros((len(val_df), len(cat_cols)), dtype=np.int64)
    X_test_cat = np.zeros((len(test_df), len(cat_cols)), dtype=np.int64)

    for i, col in enumerate(cat_cols):
        le = LabelEncoder()
        # Treina com todos os dados possíveis para não dar erro de categoria desconhecida
        all_data = pd.concat([train_df[col], val_df[col], test_df[col]])
        le.fit(all_data.astype(str))
        
        X_train_cat[:, i] = le.transform(train_df[col].astype(str))
        X_val_cat[:, i] = le.transform(val_df[col].astype(str))
        X_test_cat[:, i] = le.transform(test_df[col].astype(str))
        
        # Guarda a cardinalidade (+1 para segurança)
        cat_cardinalities.append(len(le.classes_) + 1)

    # 3. Targets
    le_target = LabelEncoder()
    y_train = torch.tensor(le_target.fit_transform(train_df[target_col]), dtype=torch.long)
    y_val = torch.tensor(le_target.transform(val_df[target_col]), dtype=torch.long)
    y_test = torch.tensor(le_target.transform(test_df[target_col]), dtype=torch.long)

    return (
        [X_train_num, X_train_cat], y_train,
        [X_val_num, X_val_cat], y_val,
        [X_test_num, X_test_cat], y_test,
        cat_cardinalities,
        len(num_cols)
    )

# Executa a preparação
X_train, y_train, X_val, y_val, X_test, y_test, categories_list, num_continuous = prepare_data_for_stab(processor)

print(f"Dados transformados para STab.")
print(f"Cardinalidades das categorias: {categories_list}")

In [None]:
def objective(trial):
    # --- Espaço de Busca ---
    params = {
        'dim': trial.suggest_categorical('dim', [8, 16, 32, 64]),
        'depth': trial.suggest_int('depth', 1, 6),
        'heads': trial.suggest_categorical('heads', [2, 4, 8]),
        'attn_dropout': trial.suggest_float('attn_dropout', 0.0, 0.5),
        'ff_dropout': trial.suggest_float('ff_dropout', 0.0, 0.5),
        'U': trial.suggest_int('U', 1, 4), 
        'cases': trial.suggest_categorical('cases', [8, 16]),
        'lr': trial.suggest_float('lr', 1e-4, 1e-2, log=True),
        'weight_decay': trial.suggest_float('weight_decay', 1e-5, 1e-3, log=True),
        'batch_size': trial.suggest_categorical('batch_size', [32, 64, 128])
    }

    # Instanciando o Modelo Base (MainModel)
    base_model = MainModel(
        categories=tuple(categories_list),
        num_continuous=num_continuous,
        dim=params['dim'],
        dim_out=2, # Binário
        depth=params['depth'],
        heads=params['heads'],
        attn_dropout=params['attn_dropout'],
        ff_dropout=params['ff_dropout'],
        U=params['U'],
        cases=params['cases']
    )

    # Wrapper para Keras4Torch (Num_Cat)
    full_model = Num_Cat(base_model, num_number=num_continuous, classes=2, Sample_size=params['dim'])
    
    # Construção do Modelo
    model = keras4torch.Model(full_model).build([num_continuous, len(categories_list)])
    optimizer = torch.optim.AdamW(model.parameters(), lr=params['lr'], weight_decay=params['weight_decay'])
    
    model.compile(optimizer=optimizer, loss=F.cross_entropy, metrics=['accuracy'])

    # Critério de Parada (Patience=20 conforme PDF)
    es = EarlyStopping(monitor='val_loss', patience=20)
    
    # Treinamento (verbose=0 para não poluir o output)
    history = model.fit(
        X_train, y_train,
        epochs=200, # Limite seguro para o Optuna
        batch_size=params['batch_size'],
        validation_data=(X_val, y_val),
        callbacks=[es],
        verbose=0 
    )
    
    # Retorna o melhor loss de validação
    return es.best_score

In [None]:
print("--- Iniciando Otimização de Hiperparâmetros (Optuna) ---")
# Cria o estudo para MINIMIZAR a perda (loss)
study = optuna.create_study(direction='minimize')

# Executa 20 tentativas (trials) - Ajuste se tiver tempo/GPU sobrando
study.optimize(objective, n_trials=20)

print("\n--- Otimização Concluída ---")
print("Melhores Hiperparâmetros encontrados:")
print(study.best_params)

In [None]:
best_p = study.best_params

print(f"Treinando modelo final com: {best_p}")

# Recria a arquitetura com os melhores parâmetros
final_base = MainModel(
    categories=tuple(categories_list),
    num_continuous=num_continuous,
    dim=best_p['dim'],
    dim_out=2,
    depth=best_p['depth'],
    heads=best_p['heads'],
    attn_dropout=best_p['attn_dropout'],
    ff_dropout=best_p['ff_dropout'],
    U=best_p['U'],
    cases=best_p['cases']
)

final_wrapper = Num_Cat(final_base, num_number=num_continuous, classes=2, Sample_size=best_p['dim'])
final_model = keras4torch.Model(final_wrapper).build([num_continuous, len(categories_list)])

optimizer = torch.optim.AdamW(final_model.parameters(), lr=best_p['lr'], weight_decay=best_p['weight_decay'])
final_model.compile(optimizer=optimizer, loss=F.cross_entropy, metrics=['accuracy'])

# Callbacks para o modelo final
# Salva o melhor modelo da história do treinamento
cp = ModelCheckpoint('best_stab_model.pt', monitor='val_loss', save_best_only=True)
es_final = EarlyStopping(monitor='val_loss', patience=20)

history = final_model.fit(
    X_train, y_train,
    epochs=500, # Mais épocas para o modelo final
    batch_size=best_p['batch_size'],
    validation_data=(X_val, y_val),
    callbacks=[es_final, cp],
    verbose=1
)

# Carrega os pesos da melhor época
final_model.load_weights('best_stab_model.pt')
print("Melhor modelo carregado.")

In [None]:
# --- Predições no Conjunto de Teste ---
logits = final_model.predict(X_test)
# Aplica Softmax para ter probabilidades
probs = F.softmax(torch.tensor(logits), dim=1).numpy()
y_pred_class = np.argmax(probs, axis=1)
y_prob_churn = probs[:, 1] # Probabilidade da classe 1 (Churn)

# 1. Relatório de Classificação
print("\n=== Relatório de Classificação (Teste) ===")
print(classification_report(y_test.numpy(), y_pred_class))

# 2. Cálculo do KS (Kolmogorov-Smirnov) - MÉTRICA PRINCIPAL
class_0_probs = y_prob_churn[y_test.numpy() == 0]
class_1_probs = y_prob_churn[y_test.numpy() == 1]

ks_stat, p_val = ks_2samp(class_0_probs, class_1_probs)

print(f"\n=== Métrica KS (Kolmogorov-Smirnov) ===")
print(f"KS Statistic: {ks_stat:.4f}")
print(f"P-value: {p_val}")

if ks_stat > 0.4:
    print(">> Resultado: BOM/MUITO BOM (KS > 0.4)")
else:
    print(">> Resultado: REGULAR (KS < 0.4)")

# 3. Gráfico da Curva KS (Opcional, mas recomendado no PDF)
def plot_ks_curve(y_true, y_probs):
    plt.figure(figsize=(10, 6))
    # Ordena probabilidades
    thresholds = np.sort(y_probs)
    
    # Calcula CDFs empíricas
    tpr = [] # True Positive Rate (acumulado classe 1)
    fpr = [] # False Positive Rate (acumulado classe 0)
    
    n_pos = np.sum(y_true)
    n_neg = len(y_true) - n_pos
    
    # Truque rápido para plotar KS
    for th in thresholds:
        tp = np.sum((y_probs >= th) & (y_true == 1))
        fp = np.sum((y_probs >= th) & (y_true == 0))
        tpr.append(tp / n_pos)
        fpr.append(fp / n_neg)
        
    plt.plot(thresholds, tpr, label='Classe 1 (Churn)')
    plt.plot(thresholds, fpr, label='Classe 0 (Não Churn)')
    plt.title(f'Curva KS - Estatística: {ks_stat:.4f}')
    plt.xlabel('Threshold (Probabilidade)')
    plt.ylabel('Proporção Acumulada')
    plt.legend()
    plt.grid(True)
    plt.show()

plot_ks_curve(y_test.numpy(), y_prob_churn)