In [7]:
import warnings
warnings.filterwarnings("ignore")

In [8]:
import numpy as np
import pandas as pd
import optuna
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from pathlib import Path
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
from sklearn.model_selection import KFold
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from tabkanet.models import TabKANet



In [17]:
# Carregar e pré-processar dados
path_file = './data/customer_churn_telecom_services.csv'
df = pd.read_csv(path_file)

# 1. Primeiro aplicar o OneHotEncoder nas colunas categóricas
colunas = ['PaymentMethod', 'Contract', 'InternetService']
ohe = OneHotEncoder(dtype=int, drop='if_binary')
colunas_ohe = ohe.fit_transform(df[colunas]).toarray()

# Criar DataFrame com as colunas codificadas
df_ohe = pd.DataFrame(colunas_ohe, columns=ohe.get_feature_names_out(colunas))

# 2. Concatenar com o DataFrame original (removendo as colunas originais)
data = pd.concat([df.drop(colunas, axis=1), df_ohe], axis=1)

# 3. Agora fazer as substituições nos dados combinados
replace_dict = {
    'Yes': 1,
    'No': 0,
    'Female': 1,
    'Male': 0,
    'No internet service': 0,
    'No phone service': -1
}

data.replace(replace_dict, inplace=True)

# 4. Converter todas as colunas para float32
data = data.astype(np.float32).fillna(0)

# Verificar os tipos de dados
print(data.dtypes)

# Preparar dados
X = data.drop('Churn', axis=1)
y = data['Churn'].values


gender                                     float32
SeniorCitizen                              float32
Partner                                    float32
Dependents                                 float32
tenure                                     float32
PhoneService                               float32
MultipleLines                              float32
OnlineSecurity                             float32
OnlineBackup                               float32
DeviceProtection                           float32
TechSupport                                float32
StreamingTV                                float32
StreamingMovies                            float32
PaperlessBilling                           float32
MonthlyCharges                             float32
TotalCharges                               float32
Churn                                      float32
PaymentMethod_Bank transfer (automatic)    float32
PaymentMethod_Credit card (automatic)      float32
PaymentMethod_Electronic check 

In [18]:
# Função para calcular os bins
def get_quantile_bins(x_cont, n_bins=4):
    feature_dim = x_cont.shape[1]
    bins = torch.zeros(feature_dim, n_bins + 1)
    for i in range(feature_dim):
        # Converta a coluna específica para tensor e depois calcule os quantis
        quantiles = torch.quantile(torch.tensor(x_cont.iloc[:, i].values, dtype=torch.float32), torch.linspace(0, 1, n_bins + 1))
        bins[i] = quantiles
    return bins

bins = get_quantile_bins(X)

In [19]:
X = X.values

In [33]:
# Classe Dataset
class ChurnDataset(Dataset):
    def __init__(self, features, labels):
        self.features = features
        self.labels = labels

    def __len__(self):
        return len(self.features)

    def __getitem__(self, idx):
        return self.features[idx], self.labels[idx]

def train_model(model, train_loader, optimizer, criterion, device, epochs=100):
    model.train()
    for epoch in range(epochs):
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(categorical_x = [], continuous_x = inputs).squeeze()
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

def evaluate_model(model, loader, device):
    model.eval()
    y_true, y_pred, y_proba = [], [], []
    with torch.no_grad():
        for inputs, labels in loader:
            inputs = inputs.to(device)
            outputs = torch.sigmoid(model(categorical_x = [], continuous_x = inputs).squeeze())
            y_true.extend(labels.cpu().numpy())
            y_pred.extend((outputs > 0.5).float().cpu().numpy())
            y_proba.extend(outputs.cpu().numpy())

    return {
        'accuracy': accuracy_score(y_true, y_pred),
        'precision': precision_score(y_true, y_pred),
        'recall': recall_score(y_true, y_pred),
        'f1': f1_score(y_true, y_pred),
        'auc_roc': roc_auc_score(y_true, y_proba),
    }

def objective(trial):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    # Hiperparâmetros
    num_layers = trial.suggest_int('num_layers', 1, 3)
    embedding_dim = trial.suggest_int('embedding_dim', 8, 16, step=4)
    num_heads = trial.suggest_int('num_heads', 1, 2)
    dim_feedforward = trial.suggest_int('dim_feedforward', 32, 128, step=32)
    attn_dropout_rate = trial.suggest_float('attn_dropout_rate', 0.1, 0.5)
    fnn_dropout_rate = trial.suggest_float('fnn_dropout_rate', 0.1, 0.5)
    hidden_dim = trial.suggest_int('hidden_dim', 4, 12, step=4)
    lr = trial.suggest_float('lr', 1e-4, 1e-2, log=True)
    batch_size = trial.suggest_categorical('batch_size', [32, 64, 128])

    # KFold
    kf = KFold(n_splits=3, shuffle=True, random_state=42)
    metrics = []


    for fold, (train_idx, val_idx) in enumerate(kf.split(X)):
        # Divisão e normalização
        X_train, X_val = X[train_idx], X[val_idx]
        y_train, y_val = y[train_idx], y[val_idx]

        scaler = StandardScaler()
        X_train = scaler.fit_transform(X_train)
        X_val = scaler.transform(X_val)

        # Datasets
        train_dataset = ChurnDataset(X_train, y_train)
        val_dataset = ChurnDataset(X_val, y_val)

        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
        val_loader = DataLoader(val_dataset, batch_size=batch_size)

        # Modelo TabKAN
        width = [X.shape[1]] + [hidden_dim] * num_layers + [1]

        model = TabKANet(
            output_dim= 1,  
            vocabulary={},
            num_continuous_features=X.shape[1],
            nhead=num_heads,
            dim_feedforward=dim_feedforward,
            embedding_dim=embedding_dim,
            num_layers=num_layers,
            attn_dropout_rate=attn_dropout_rate,
            mlp_hidden_dims=[16],
            activation="relu",
            ffn_dropout_rate=fnn_dropout_rate,
            learninable_noise=True,
            bins = bins
        ).to(device)

        optimizer = optim.AdamW(model.parameters(), lr=lr)
        criterion = nn.BCEWithLogitsLoss()

        # Treinamento
        train_model(model, train_loader, optimizer, criterion, device)

        # Avaliação
        fold_metrics = evaluate_model(model, val_loader, device)
        metrics.append(fold_metrics)

    # Calcular médias
    avg_metrics = {k: np.mean([m[k] for m in metrics]) for k in metrics[0]}
    for k, v in avg_metrics.items():
        trial.set_user_attr(k, float(v))

    return avg_metrics['auc_roc']


In [32]:
study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=50, show_progress_bar=True)

# Salvar resultados
results_df = study.trials_dataframe()
results_df['param_observado'] = 'Maximize AUC'

file_name = './KAN/optuna_results_kan.xlsx'
Path('./KAN/').mkdir(parents=True, exist_ok=True)

if Path(file_name).exists():
    existing_df = pd.read_excel(file_name)
    combined_df = pd.concat([existing_df, results_df], ignore_index=True).drop_duplicates()
    combined_df.to_excel(file_name, index=False)
else:
    results_df.to_excel(file_name, index=False)

print("\nMelhores hiperparâmetros:")
print(study.best_params)
print("\nMétricas médias:")
print({k: v for k, v in study.best_trial.user_attrs.items()})

[I 2025-04-13 00:03:24,340] A new study created in memory with name: no-name-b7c1c578-9e54-4103-ae09-0cbfa10a0ce9
  0%|          | 0/50 [02:34<?, ?it/s]

[W 2025-04-13 00:05:59,311] Trial 0 failed with parameters: {'num_layers': 3, 'embedding_dim': 8, 'num_heads': 1, 'dim_feedforward': 128, 'attn_dropout_rate': 0.1988509545479662, 'fnn_dropout_rate': 0.4415696674265611, 'hidden_dim': 12, 'lr': 0.0052409814200158125, 'batch_size': 128} because of the following error: TypeError("TabKANet.forward() missing 1 required positional argument: 'continuous_x'").
Traceback (most recent call last):
  File "/home/brunoadsb/Desktop/venvs/IA_venv/lib/python3.13/site-packages/optuna/study/_optimize.py", line 197, in _run_trial
    value_or_values = func(trial)
  File "/tmp/ipykernel_537421/4010084857.py", line 104, in objective
    fold_metrics = evaluate_model(model, val_loader, device)
  File "/tmp/ipykernel_537421/4010084857.py", line 30, in evaluate_model
    outputs = torch.sigmoid(model(inputs).squeeze())
                            ~~~~~^^^^^^^^
  File "/home/brunoadsb/Desktop/venvs/IA_venv/lib/python3.13/site-packages/torch/nn/modules/module.py




TypeError: TabKANet.forward() missing 1 required positional argument: 'continuous_x'

In [29]:
print(y)

[0. 0. 1. ... 0. 1. 0.]


In [30]:
print(X)

[[1. 0. 1. ... 1. 0. 0.]
 [0. 0. 0. ... 1. 0. 0.]
 [0. 0. 0. ... 1. 0. 0.]
 ...
 [1. 0. 1. ... 1. 0. 0.]
 [0. 1. 1. ... 0. 1. 0.]
 [0. 0. 0. ... 0. 1. 0.]]
