In [106]:
import numpy as np
import pandas as pd
import optuna
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from pathlib import Path
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, roc_curve
from sklearn.model_selection import KFold
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from kan import KAN

In [107]:

# Carregar e pré-processar dados
path_file = './../data/customer_churn_telecom_services.csv'
df = pd.read_csv(path_file)

# 1. Primeiro aplicar o OneHotEncoder nas colunas categóricas
colunas = ['PaymentMethod', 'Contract', 'InternetService']
ohe = OneHotEncoder(dtype=int, drop='if_binary')
colunas_ohe = ohe.fit_transform(df[colunas]).toarray()

# Criar DataFrame com as colunas codificadas
df_ohe = pd.DataFrame(colunas_ohe, columns=ohe.get_feature_names_out(colunas))

# 2. Concatenar com o DataFrame original (removendo as colunas originais)
data = pd.concat([df.drop(colunas, axis=1), df_ohe], axis=1)

# 3. Agora fazer as substituições nos dados combinados
replace_dict = {
    'Yes': 1,
    'No': 0,
    'Female': 1,
    'Male': 0,
    'No internet service': 0,
    'No phone service': -1
}

data.replace(replace_dict, inplace=True)

# 4. Converter todas as colunas para float32
data = data.astype(np.float32).fillna(0)

# Verificar os tipos de dados
print(data.dtypes)

# Preparar dados
X = data.drop('Churn', axis=1).values
y = data['Churn'].values


gender                                     float32
SeniorCitizen                              float32
Partner                                    float32
Dependents                                 float32
tenure                                     float32
PhoneService                               float32
MultipleLines                              float32
OnlineSecurity                             float32
OnlineBackup                               float32
DeviceProtection                           float32
TechSupport                                float32
StreamingTV                                float32
StreamingMovies                            float32
PaperlessBilling                           float32
MonthlyCharges                             float32
TotalCharges                               float32
Churn                                      float32
PaymentMethod_Bank transfer (automatic)    float32
PaymentMethod_Credit card (automatic)      float32
PaymentMethod_Electronic check 

  data.replace(replace_dict, inplace=True)


In [None]:
def kolmogorov_smirnov(y_true, y_proba):
    # Usa a função `roc_curve` do sklearn para obter os thresholds
    fpr, tpr, thresholds = roc_curve(y_true, y_proba)
    ks_statistic = np.max(np.abs(tpr - fpr))  # KS = max(TPR - FPR)
    return ks_statistic * 100  # Retorna em escala 0-100

# Classe Dataset
class ChurnDataset(Dataset):
    def __init__(self, features, labels):
        self.features = features
        self.labels = labels

    def __len__(self):
        return len(self.features)

    def __getitem__(self, idx):
        return self.features[idx], self.labels[idx]

def train_model(model, train_loader, optimizer, criterion, device, epochs=100):
    model.train()
    for epoch in range(epochs):
        for inputs, labels in train_loader:
            inputs = inputs.detach()  # Isso remove gradientes!
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs).squeeze()
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

def evaluate_model(model, loader, device):
    model.eval()
    y_true, y_pred, y_proba = [], [], []
    with torch.no_grad():
        for inputs, labels in loader:
            inputs = inputs.to(device)
            outputs = torch.sigmoid(model(inputs).squeeze())
            y_true.extend(labels.cpu().numpy())
            y_pred.extend((outputs > 0.5).float().cpu().numpy())
            y_proba.extend(outputs.cpu().numpy())

    return {
        'accuracy': accuracy_score(y_true, y_pred),
        'precision': precision_score(y_true, y_pred),
        'recall': recall_score(y_true, y_pred),
        'f1': f1_score(y_true, y_pred),
        'auc_roc': roc_auc_score(y_true, y_proba),
        'ks': kolmogorov_smirnov(y_true, y_proba)
    }

def objective(trial):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    # Hiperparâmetros
    grid_size = trial.suggest_int('grid_size', 5, 10)
    spline_order = trial.suggest_int('spline_order', 1, 3)
    width = [X.shape[1]] + [trial.suggest_int('width_'+str(i), 1, 5) for i in range(spline_order-2)] + [1]  
    batch_size = trial.suggest_categorical('batch_size', [16, 32])
    lr = trial.suggest_float('lr', 1e-5, 1e-2, log=True)
    

    # Salva o vetor width como atributo do trial
    trial.set_user_attr('width', str(width))
    
    # KFold
    kf = KFold(n_splits=3, shuffle=True, random_state=42)
    metrics = []


    for fold, (train_idx, val_idx) in enumerate(kf.split(X)):
        # Divisão e normalização
        X_train, X_val = X[train_idx], X[val_idx]
        y_train, y_val = y[train_idx], y[val_idx]

        scaler = StandardScaler()
        X_train = scaler.fit_transform(X_train)
        X_val = scaler.transform(X_val)

        # Datasets
        train_dataset = ChurnDataset(X_train, y_train)
        val_dataset = ChurnDataset(X_val, y_val)

        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
        val_loader = DataLoader(val_dataset, batch_size=batch_size)

        # Modelo KAN

        model = KAN(
            width=width,
            grid=grid_size,
            k=spline_order,
            noise_scale=noise_scale
        ).to(device)

        optimizer = optim.AdamW(model.parameters(), lr=lr)
        criterion = nn.BCEWithLogitsLoss()

        # Treinamento
        train_model(model, train_loader, optimizer, criterion, device)

        # Avaliação
        fold_metrics = evaluate_model(model, val_loader, device)
        metrics.append(fold_metrics)

    # Calcular médias
    avg_metrics = {k: np.mean([m[k] for m in metrics]) for k in metrics[0]}
    for k, v in avg_metrics.items():
        trial.set_user_attr(k, float(v))

    return avg_metrics['auc_roc']



In [110]:

study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=50, show_progress_bar=True)

# Salvar resultados
results_df = study.trials_dataframe()
results_df['param_observado'] = 'Maximize AUC'

file_name = './KAN/optuna_results_kan.xlsx'
Path('./KAN/').mkdir(parents=True, exist_ok=True)

if Path(file_name).exists():
    existing_df = pd.read_excel(file_name)
    combined_df = pd.concat([existing_df, results_df], ignore_index=True).drop_duplicates()
    combined_df.to_excel(file_name, index=False)
else:
    results_df.to_excel(file_name, index=False)

print("\nMelhores hiperparâmetros:")
print(study.best_params)
print("\nMétricas médias:")
print({k: v for k, v in study.best_trial.user_attrs.items()})

[I 2025-04-13 15:51:59,096] A new study created in memory with name: no-name-02b65e77-72a9-46fd-bcd8-c3d7a4ea3a0c
  0%|          | 0/50 [00:00<?, ?it/s]

checkpoint directory created: ./model
saving model version 0.0


  0%|          | 0/50 [02:29<?, ?it/s]


[W 2025-04-13 15:54:28,216] Trial 0 failed with parameters: {'grid_size': 9, 'spline_order': 3, 'width': 1, 'noise_scale': 0.05945262278848789, 'batch_size': 16, 'lr': 6.368045490180835e-05} because of the following error: KeyboardInterrupt().
Traceback (most recent call last):
  File "/home/brunoadsb/Desktop/venvs/IA_venv/lib/python3.13/site-packages/optuna/study/_optimize.py", line 197, in _run_trial
    value_or_values = func(trial)
  File "/tmp/ipykernel_3370/3736719345.py", line 96, in objective
    train_model(model, train_loader, optimizer, criterion, device)
    ~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/tmp/ipykernel_3370/3736719345.py", line 28, in train_model
    loss.backward()
    ~~~~~~~~~~~~~^^
  File "/home/brunoadsb/Desktop/venvs/IA_venv/lib/python3.13/site-packages/torch/_tensor.py", line 626, in backward
    torch.autograd.backward(
    ~~~~~~~~~~~~~~~~~~~~~~~^
        self, gradient, retain_graph, create_graph, inputs=inputs
        ^^^^

KeyboardInterrupt: 