In [None]:
!pip install scikit-learn numpy pandas imbalanced-learn optuna tab-transformer-pytorch --quiet

In [None]:
import pandas as pd

data = pd.read_csv("datasets/IBM Dataset 1.csv")

data = data.drop(columns=['EmployeeCount', 'EmployeeNumber', 'StandardHours', 'Over18'], errors='ignore')

engineered_features = [
    ('IncomePerJobLevel', lambda df: df['MonthlyIncome'] / (df['JobLevel'] + 1)),
    ('TotalWorkingYearsToJobLevelRatio', lambda df: df['TotalWorkingYears'] / (df['JobLevel'] + 1)),
    ('YearsAtCompanyToAgeRatio', lambda df: df['YearsAtCompany'] / (df['Age'] + 1)),
    ('YearsAtCompanyToYearsInCurrentRoleRatio', lambda df: df['YearsAtCompany'] / (df['YearsInCurrentRole'] + 1))
]

for name, func in engineered_features:
    data[name] = func(data)

numerical_columns = [x for x in data.select_dtypes(include=['int64', 'float64']).columns if x!= "Attrition"]

categorical_columns = data.select_dtypes(include=['object']).columns


nominal_columns = [ 'Department', 'EducationField', 'JobRole', 'MaritalStatus']


In [None]:
# training code follows the example: https://github.com/lucidrains/tab-transformer-pytorch/issues/6

N_TRIALS = 50

from tab_transformer_pytorch import TabTransformer
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import roc_auc_score, f1_score, precision_score, recall_score, confusion_matrix, precision_recall_curve, auc
from sklearn.model_selection import StratifiedKFold, train_test_split
from imblearn.over_sampling import SMOTE, ADASYN, BorderlineSMOTE
from imblearn.combine import SMOTETomek
import optuna
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from io import StringIO
from optuna.importance import get_param_importances

sampling_techniques = {
    "None": None,
    "SMOTE": SMOTE(random_state=42),
    "BorderlineSMOTE": BorderlineSMOTE(random_state=42),
    "SMOTETomek": SMOTETomek(random_state=42),
    "ADASYN": ADASYN(random_state=42),
}

X_categ = data[categorical_columns]

## using label encoder for tab transformer
label_encoders = {col: LabelEncoder() for col in categorical_columns}

for i, col in enumerate(categorical_columns):
    le = label_encoders[col]
    X_categ[col] = le.fit_transform(X_categ[col])


X_cont = data[numerical_columns].values

y = X_categ['Attrition'].values
X_categ = X_categ.drop(columns=["Attrition"], errors='ignore')
X_categ = X_categ.values.astype(np.int64)

y = y.astype(np.int64)
X_train_categ, X_test_categ, X_train_cont, X_test_cont, y_train, y_test = train_test_split(
    X_categ, X_cont, y, test_size=0.2, stratify=y, random_state=42
)

## change type to int, avoids library error
X_test_categ = X_test_categ.astype(np.int64)
X_test_cont = X_test_cont.astype(np.float32)

## convert everything to correct tensor type
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
X_test_categ_tensor = torch.tensor(X_test_categ, dtype=torch.long).to(device)
X_test_cont_tensor = torch.tensor(X_test_cont, dtype=torch.float).to(device)
y_test_tensor = torch.tensor(y_test, dtype=torch.long).to(device)


categorical_columns = [x for x in categorical_columns if x != "Attrition"]
def objective(trial, sampling_technique):
    dim = 32
    depth = 6
    heads = 8
    dim_out = 1
    attn_dropout = trial.suggest_float("attn_dropout", 0.1, 0.5)
    ff_dropout = trial.suggest_float("ff_dropout", 0.1, 0.5)
    lr = trial.suggest_float("lr", 1e-5, 1e-2, log=True)
    weight = trial.suggest_int("weight", 1,5, step=1)

    skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
    f1_scores = []

    for train_idx, val_idx in skf.split(X_train_categ, y_train):
        X_train_fold_categ, X_val_fold_categ = X_train_categ[train_idx], X_train_categ[val_idx]
        X_train_fold_cont, X_val_fold_cont = X_train_cont[train_idx], X_train_cont[val_idx]
        y_train_fold, y_val_fold = y_train[train_idx], y_train[val_idx]
     
        X_train_fold_categ = X_train_fold_categ.astype(np.int64)
        X_val_fold_categ = X_val_fold_categ.astype(np.int64)

        scaler = StandardScaler()
        X_train_fold_cont = scaler.fit_transform(X_train_fold_cont)
        X_val_fold_cont = scaler.transform(X_val_fold_cont)

        cont_mean = X_train_fold_cont.mean(0)
        cont_std = X_train_fold_cont.std(0)
        cont_mean = torch.tensor(cont_mean, dtype=torch.float) if not isinstance(cont_mean, torch.Tensor) else cont_mean.float()
        cont_std = torch.tensor(cont_std, dtype=torch.float) if not isinstance(cont_std, torch.Tensor) else cont_std.float()



        cont_mean_std = torch.stack([cont_mean.to(device), cont_std.to(device)], dim=1)

        if sampling_technique is not None:
            X_train_combined = np.hstack((X_train_fold_categ, X_train_fold_cont))
            X_resampled, y_resampled = sampling_technique.fit_resample(X_train_combined, y_train_fold)
            X_train_fold_categ, X_train_fold_cont = X_resampled[:, :len(categorical_columns)], X_resampled[:, len(categorical_columns):]
            y_train_fold = y_resampled

        X_train_categ_tensor = torch.tensor(X_train_fold_categ, dtype=torch.long).to(device)
        X_train_cont_tensor = torch.tensor(X_train_fold_cont, dtype=torch.float).to(device)
        y_train_tensor = torch.tensor(y_train_fold, dtype=torch.long).to(device)

        X_val_categ_tensor = torch.tensor(X_val_fold_categ, dtype=torch.long).to(device)
        X_val_cont_tensor = torch.tensor(X_val_fold_cont, dtype=torch.float).to(device)
        y_val_tensor = torch.tensor(y_val_fold, dtype=torch.long).to(device)

        model = TabTransformer(
            categories=tuple([len(label_encoders[col].classes_) for col in categorical_columns]),
            num_continuous=len(numerical_columns),
            dim=dim,
            depth=depth,
            heads=heads,
            attn_dropout=attn_dropout,
            ff_dropout=ff_dropout,
            mlp_act = nn.ReLU(),
            continuous_mean_std = cont_mean_std
        ).to(device)

        pos_weight = torch.tensor([weight], device=device)

        criterion = nn.BCEWithLogitsLoss(pos_weight=pos_weight)
        optimizer = torch.optim.Adam(model.parameters(), lr=lr)

        model.train()
        for epoch in range(50):
            optimizer.zero_grad()
            preds = model(X_train_categ_tensor, X_train_cont_tensor).squeeze(1)
            loss = criterion(preds.to(device), y_train_tensor.float().to(device))
            loss.backward()
            optimizer.step()

        model.eval()
        with torch.no_grad():
            val_preds = model(X_val_categ_tensor, X_val_cont_tensor).squeeze(1)
            val_preds_binary = (torch.sigmoid(val_preds) > 0.5).long()
            f1 = f1_score(y_val_tensor.cpu().numpy(), val_preds_binary.cpu().numpy(), average="weighted")
            f1_scores.append(f1)

    return np.mean(f1_scores)

combined_results = []
for sampling_name, sampling_technique in sampling_techniques.items():
    print(f"Optimizing {sampling_name}")

    study = optuna.create_study(direction="maximize")
    study.optimize(lambda trial: objective(trial, sampling_technique), n_trials=N_TRIALS)
    try:
        importance = get_param_importances(study)
    except:
        importance = None

    best_params = study.best_params

    if sampling_technique is not None:
        X_train_combined = np.hstack((X_train_categ, X_train_cont))
        X_resampled, y_resampled = sampling_technique.fit_resample(X_train_combined, y_train)
        X_train_categ_final = X_resampled[:, :len(categorical_columns)]
        X_train_cont_final = X_resampled[:, len(categorical_columns):]
    else:
        X_train_categ_final, X_train_cont_final, y_resampled = X_train_categ, X_train_cont, y_train


    scaler = StandardScaler()
    X_train_cont_final = scaler.fit_transform(X_train_cont_final)
    X_test_cont_scaled = scaler.transform(X_test_cont)

    # Convert data to tensors
    X_train_categ_tensor = torch.tensor(X_train_categ_final.astype(np.int64), dtype=torch.long).to(device)
    X_train_cont_tensor = torch.tensor(X_train_cont_final.astype(np.float32), dtype=torch.float).to(device)
    y_train_tensor = torch.tensor(y_resampled, dtype=torch.long).to(device)

    cont_mean = X_train_cont_tensor.mean(0)
    cont_std = X_train_cont_tensor.std(0)
    cont_mean = torch.tensor(cont_mean) if not isinstance(cont_mean, torch.Tensor) else cont_mean
    cont_std = torch.tensor(cont_std) if not isinstance(cont_std, torch.Tensor) else cont_std
    cont_mean_std = torch.stack([cont_mean, cont_std], dim=1)

    best_model = TabTransformer(
        categories=tuple([len(label_encoders[col].classes_) for col in categorical_columns]),
        num_continuous=len(numerical_columns),
        dim=32,
        depth=6,
        heads=8,
        attn_dropout=best_params["attn_dropout"],
        ff_dropout=best_params["ff_dropout"],
        mlp_act = nn.ReLU(),
        continuous_mean_std = cont_mean_std
    ).to(device)

    # tried with pos weight as well but it didn't improve anything
    pos_weight = torch.tensor([best_params["weight"]], device=device)


    criterion = nn.BCEWithLogitsLoss(pos_weight=pos_weight)
    optimizer = torch.optim.Adam(best_model.parameters(), lr=best_params["lr"])

    best_model.train()
    for epoch in range(50):
        optimizer.zero_grad()
        preds = best_model(X_train_categ_tensor, X_train_cont_tensor).squeeze(1)
        loss = criterion(preds, y_train_tensor.float())
        loss.backward()
        optimizer.step()

    best_model.eval()
    with torch.no_grad():
        test_preds = best_model(X_test_categ_tensor, X_test_cont_tensor).squeeze(1)
        test_preds_prob = torch.sigmoid(test_preds).cpu().numpy()
        test_preds_binary = (test_preds_prob > 0.5).astype(int)

    f1_weighted = f1_score(y_test, test_preds_binary, average="weighted")
    precision_weighted = precision_score(y_test, test_preds_binary, average="weighted")
    recall_weighted = recall_score(y_test, test_preds_binary, average="weighted")
    roc_auc = roc_auc_score(y_test, test_preds_prob)

    precision, recall, _ = precision_recall_curve(y_test, test_preds_prob, pos_label=1)
    pr_auc = auc(recall, precision)
    conf_matrix = confusion_matrix(y_test, test_preds_binary)   
    accuracy = (test_preds_binary == y_test).mean()

    combined_results.append({
        "Model": "TabTransformer",
        "Sampling": sampling_name,
        "Validation F1-Score (CV)": study.best_value,
        "Test F1-Score (Weighted)": f1_weighted,
        "Test Precision (Weighted)": precision_weighted,
        "Test Recall (Weighted)": recall_weighted,
        "Test ROC-AUC": roc_auc,
        "Accuracy": accuracy,
        "Confusion Matrix": conf_matrix.tolist(),
        "Best Parameters": best_params,
        "importance": importance
    })


In [None]:
results_df = pd.DataFrame(combined_results)
results_df.to_csv("tabtransformer_results.csv")