#### Library & Package Imports

In [None]:
!pip install skorch torch scikit-learn
import xgboost as xgb
from sklearn.model_selection import train_test_split
import pandas as pd
import numpy as np
from sklearn.metrics import (
    confusion_matrix,
    classification_report,
    ConfusionMatrixDisplay,
    roc_auc_score,
    roc_curve,
    precision_score,
    recall_score,
    f1_score,
    precision_recall_curve,
    accuracy_score,
    log_loss,
    PrecisionRecallDisplay,
    make_scorer,
    RocCurveDisplay
)
import matplotlib.pyplot as plt
from sklearn.model_selection import GridSearchCV, RepeatedStratifiedKFold, RandomizedSearchCV
from sklearn.metrics import make_scorer, average_precision_score
from sklearn.linear_model import LogisticRegression
import seaborn as sns
import imblearn
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import KFold
from sklearn.model_selection import GridSearchCV, StratifiedKFold
from xgboost import cv
from xgboost import XGBClassifier
import scipy as stats
from scipy.spatial.distance import mahalanobis
from numpy.linalg import inv
from scipy.spatial import distance
from skorch import NeuralNetClassifier
from skorch.callbacks import EarlyStopping, Checkpoint, EpochScoring
from skorch.helper import predefined_split
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

In [None]:
# Import optuna for faster processing
!pip install optuna
import optuna

Collecting optuna
  Downloading optuna-4.5.0-py3-none-any.whl.metadata (17 kB)
Collecting alembic>=1.5.0 (from optuna)
  Downloading alembic-1.16.4-py3-none-any.whl.metadata (7.3 kB)
Collecting colorlog (from optuna)
  Downloading colorlog-6.9.0-py3-none-any.whl.metadata (10 kB)
Downloading optuna-4.5.0-py3-none-any.whl (400 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m400.9/400.9 kB[0m [31m11.9 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading alembic-1.16.4-py3-none-any.whl (247 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m247.0/247.0 kB[0m [31m13.3 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading colorlog-6.9.0-py3-none-any.whl (11 kB)
Installing collected packages: colorlog, alembic, optuna
Successfully installed alembic-1.16.4 colorlog-6.9.0 optuna-4.5.0


#### Import Data

In [None]:
file_id_1 = '18c5DynpKSiey55WdTBkNE7Iwb7l_HL-k'
gdown.download(f'https://drive.google.com/uc?id={file_id_1}', 'data2011.csv', quiet=False)
df1 = pd.read_csv('data2011.csv')

file_id_2 = '1bJsC9bUmrMHXlKIv82Gkl-Qxldy9D-KQ'
gdown.download(f'https://drive.google.com/uc?id={file_id_2}', 'data2102.csv', quiet=False)
df2 = pd.read_csv('data2102.csv')

file_id_3 = '1BU41bihK6rCTVWmyUFr4gEmYwIclKeMD'
gdown.download(f'https://drive.google.com/uc?id={file_id_3}', 'data2105.csv', quiet=False)
df3 = pd.read_csv('data2105.csv')

file_id_4 = '1VUA3AgnL7ouqCY3vrui7G6qr5RbbJwDQ'
gdown.download(f'https://drive.google.com/uc?id={file_id_4}', 'data2108.csv', quiet=False)
df4 = pd.read_csv('data2108.csv')

file_id_5 = '1GSL8AOlv9fWylFU-HAKbIbOCxuN1b754'
gdown.download(f'https://drive.google.com/uc?id={file_id_5}', 'data2111.csv', quiet=False)
df5 = pd.read_csv('data2111.csv')

#### Data Processing

In [None]:
## Rename Columns
def rename(df):
    return df.rename(columns={
        'RREL16': 'primary_income',
        'RREL13': 'employment_status',
        'RREL27': 'loan_purpose',
        'RREL25': 'original_term',
        'RREL30': 'current_balance',
        'RREL29': 'original_balance',
        'RREL43': 'current_interest_rate',
        'RREL42': 'interest_type',
        'RREL69': 'account_status',
        'RREL39': 'payment_due',
        'RREL67': 'arrears_balance',
        'RREL68': 'days_in_arrears',
        'RREL71': 'default_amount',
        'RREC6': 'collateral_region',
        'RREC7': 'occupancy_type',
        'RREC9': 'property_type',
        'RREC16': 'original_ltv',
        'RREC17': 'original_valuation',
        'RREC12': 'current_ltv',
        'RREC13': 'current_valuation',
        'age': 'age',
        'PrepaymentFee': 'prepayment_fee',
        'PrepaymentHistory': 'prepayment_history',
        'RREL30_t_1': 'past_balance',
        'RREL39_t_1': 'past_payment_due',
        'RREL43_t_1': 'past_interest_rate',
        'RREC12_t_1': 'past_ltv',
        'RREC13_t_1': 'past_valuation',
        'incentive': 'incentive',
        'target': 'target'
    })

In [None]:
## Embed Categorical columns
def embed(df):
    df['employment_status'] = df['employment_status'].astype('category')
    df['loan_purpose'] = df['loan_purpose'].astype('category')
    df['collateral_region'] = df['collateral_region'].astype('category')
    df['occupancy_type'] = df['occupancy_type'].astype('category')
    df['property_type'] = df['property_type'].astype('category')
    df['interest_type'] = df['interest_type'].astype('category')
    df['account_status'] = df['account_status'].astype('category')
    df['prepayment_fee'] = df['prepayment_fee'].astype('category')
    df['prepayment_history'] = df['prepayment_history'].astype('category')
    return df

In [None]:
# Rename variables
df1 = rename(df1)
df2 = rename(df2)
df3 = rename(df3)
df4 = rename(df4)
df5 = rename(df5)

In [None]:
# Drop single PNNR observation
df5 = df5[df5['employment_status'] != 'PNNR']

In [None]:
# Embed categorical variables
df1 = embed(df1)
df2 = embed(df2)
df3 = embed(df3)
df4 = embed(df4)
df5 = embed(df5)

In [None]:
# Split data from targets
X1 = df1.drop(['target', 'prepayment_fee'], axis=1)
y1 = df1['target']
X2 = df2.drop(['target', 'prepayment_fee'], axis=1)
y2 = df2['target']
X3 = df3.drop(['target', 'prepayment_fee'], axis=1)
y3 = df3['target']
X4 = df4.drop(['target', 'prepayment_fee'], axis=1)
y4 = df4['target']
X5 = df5.drop(['target', 'prepayment_fee'], axis=1)
y5 = df5['target']

#### Mahalanobis Undersampling

In [None]:
# Concatenate the datasets
temp = pd.concat([df1, df2], ignore_index=True)
temp = temp.drop(['prepayment_fee'], axis=1)

# Store categorical columns before dropping them
categorical_cols = ['employment_status', 'loan_purpose', 'collateral_region',
                   'occupancy_type', 'property_type', 'interest_type',
                   'account_status', 'prepayment_history']

# Create a DataFrame with just the categorical variables and index
categorical_data = temp[categorical_cols].copy()
categorical_data['original_index'] = temp.index

# Separate numeric features and target (drop categoricals)
numeric_temp = temp.drop(categorical_cols, axis=1)

# Separate features and target
X_temp = numeric_temp.drop('target', axis=1)
y_temp = numeric_temp['target']

# Scale numeric features only
scaler = StandardScaler()
X_scaled = pd.DataFrame(scaler.fit_transform(X_temp), columns=X_temp.columns)

# Reattach target to scaled numeric features
temp_scaled = pd.concat([X_scaled, y_temp], axis=1)

# Split into prepaid and non-prepaid
prepaid_df = temp_scaled.loc[temp_scaled['target'] == 1]
non_prepaid_df = temp_scaled.loc[temp_scaled['target'] == 0]

# Extract just the numeric features (drop target)
X_prepaid = prepaid_df.drop(columns=['target']).values
X_non_prepaid = non_prepaid_df.drop(columns=['target']).values

# Compute the mean and covariance of the prepaid group
mean_vec = np.mean(X_prepaid, axis=0)
cov_matrix = np.cov(X_prepaid, rowvar=False)
inv_cov_matrix = inv(cov_matrix + np.eye(cov_matrix.shape[0]) * 1e-6)  # regularization

# Compute Mahalanobis distances for each non-prepaid observation
mahal_distances = [distance.mahalanobis(x, mean_vec, inv_cov_matrix) for x in X_non_prepaid]

# Add distances to non_prepaid_df
non_prepaid_df = non_prepaid_df.copy()
non_prepaid_df['mahal_dist'] = mahal_distances

# Sort by distance and select the closest N non-prepaid samples (e.g. 2x the number of prepaids)
n = len(prepaid_df) * 2
selected_non_prepaids = non_prepaid_df.nsmallest(n, 'mahal_dist').drop(columns=['mahal_dist'])

# Combine with prepaid
balanced_numeric_df = pd.concat([prepaid_df, selected_non_prepaids], ignore_index=True)

# Get the original indices of the selected samples
selected_indices = balanced_numeric_df.index

# Retrieve the corresponding categorical data
selected_categorical_data = categorical_data.loc[categorical_data['original_index'].isin(selected_indices)]

# Drop the original_index column we added
selected_categorical_data = selected_categorical_data.drop(columns=['original_index'])

# Reset indices for proper merging
balanced_numeric_df = balanced_numeric_df.reset_index(drop=True)
selected_categorical_data = selected_categorical_data.reset_index(drop=True)

# Combine numeric and categorical data
final_balanced_df = pd.concat([balanced_numeric_df, selected_categorical_data], axis=1)

# Concatenate test data
temp = pd.concat([df3, df4], ignore_index=True)
temp = temp.drop(['prepayment_fee'], axis=1)

# Get categorical data from test data
categorical_data = temp[categorical_cols].copy()
categorical_data['original_index'] = temp.index

# Get numeric data from test data
numeric_temp = temp.drop(categorical_cols, axis=1)
X_temp = numeric_temp.drop('target', axis=1)
y_temp = numeric_temp['target']

# Scale numeric test data
X_scaled = pd.DataFrame(scaler.transform(X_temp), columns=X_temp.columns)

# Reset index for merge of categorical and scaled numeric test data
X_scaled = X_scaled.reset_index(drop=True)
selected_categorical_data = categorical_data.loc[categorical_data['original_index'].isin(X_temp.index)].drop(columns=['original_index']).reset_index(drop=True)

# Final validation set
X_mah_val = pd.concat([X_scaled, selected_categorical_data], axis=1)
y_mah_val = pd.concat([y3, y4], ignore_index=True)

# Final train set
X_mah_train = final_balanced_df.drop(['target'], axis=1)
y_mah_train = final_balanced_df['target']

#### R-GAN Oversampling

In [None]:
# Function to keep numeric variables
def keep(datasets):
    columns_to_keep = ['primary_income', 'original_term', 'current_balance', 'original_balance', 'current_interest_rate',
'payment_due', 'arrears_balance', 'days_in_arrears', 'default_amount', 'original_ltv', 'current_ltv',
'original_valuation', 'current_valuation', 'age', 'past_balance', 'past_payment_due', 'past_interest_rate',
'past_ltv', 'past_valuation', 'incentive', 'target']
    return [df[columns_to_keep] for df in datasets]

X1_gan, X2_gan = keep([df1, df2])

In [None]:
# Concatenate training data
X_gan = pd.concat([X1_gan, X2_gan], ignore_index = True)
# Extract positive observations
X_gan_pos = X_gan.loc[X_gan['target'] == 1]

In [None]:
# Remove targets and scale numeric variables
X_gan_pos = X_gan_pos.drop(columns=["target"])
minority_data = X_gan_pos.to_numpy().astype(np.float32)
scaler = StandardScaler()
minority_data = scaler.fit_transform(minority_data)

In [None]:
# Define the Generator network: creates synthetic data from random noise
class Generator(nn.Module):
    def __init__(self, input_dim=8, output_dim=30):
        super().__init__()
        # Build the generator as a sequence of layers
        self.model = nn.Sequential(
            nn.utils.spectral_norm(nn.Linear(input_dim, 32)),  # Spectral norm helps training stability
            nn.LeakyReLU(0.2),                                # LeakyReLU avoids dead neurons
            nn.utils.spectral_norm(nn.Linear(32, 128)),
            nn.LeakyReLU(0.2),
            nn.utils.spectral_norm(nn.Linear(128, 512)),
            nn.LeakyReLU(0.2),
            nn.utils.spectral_norm(nn.Linear(512, output_dim)), # Output layer: generates synthetic features
        )

    def forward(self, z):
        # Pass random noise through the network to generate data
        return self.model(z)

# Define the Discriminator network: distinguishes real from synthetic data
class Discriminator(nn.Module):
    def __init__(self, input_dim=30):
        super().__init__()
        # Build discriminator with separate layer sequences
        self.layer1 = nn.Sequential(
            nn.Linear(input_dim, 512),
            nn.LayerNorm(512),  # Layer normalization stabilizes training
            nn.CELU()           # CELU activation: smooth alternative to ReLU
        )
        self.layer2 = nn.Sequential(
            nn.Linear(512, 128),
            nn.LayerNorm(128),
            nn.CELU()
        )
        self.layer3 = nn.Sequential(
            nn.Linear(128, 32),
            nn.LayerNorm(32),
            nn.CELU()
        )
        self.layer4 = nn.Sequential(
            nn.Linear(32, 8),
            nn.LayerNorm(8),
            nn.CELU()
        )
        self.output = nn.Linear(8, 1)  # Final output: real/fake score

    def forward(self, x, return_hidden=False):
        # Pass input through all layers
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        # Optionally return hidden layer features for additional loss calculation
        if return_hidden:
            return self.output(x), x
        return self.output(x)

# Custom loss function to ensure synthetic data matches real data statistics
def similarity_loss(real_hidden, fake_hidden):
    # Calculate mean feature values for real and synthetic data
    real_mean = torch.mean(real_hidden, dim=0)
    fake_mean = torch.mean(fake_hidden, dim=0)
    # Use mean squared error to make distributions similar
    return F.mse_loss(real_mean, fake_mean)

from torch.utils.data import DataLoader, TensorDataset

# Training configuration parameters
z_dim = 8              # Size of random noise vector input to generator
feature_dim = 20       # Number of features in output data
batch_size = 64        # Number of samples per training batch
epochs = 1000          # Total number of training iterations
lr = 0.0001            # Learning rate for discriminator
lambda_gp = 10         # Weight for gradient penalty loss
eta_sim = 0.01         # Weight for similarity loss

# Set up device (GPU if available, else CPU)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Initialize generator and discriminator networks
G = Generator(input_dim=z_dim, output_dim=feature_dim).to(device)
D = Discriminator(input_dim=feature_dim).to(device)

# Set up optimizers for both networks
g_opt = optim.Adam(G.parameters(), lr=0.0002, betas=(0.5, 0.9))  # Generator optimizer
d_opt = optim.Adam(D.parameters(), lr=lr, betas=(0.5, 0.9))      # Discriminator optimizer

# Prepare real minority class data for training
real_dataset = TensorDataset(torch.tensor(minority_data, dtype=torch.float32))
real_loader = DataLoader(real_dataset, batch_size=batch_size, shuffle=True)  # Data loader for training

In [None]:
# Calculates the gradient penalty for WGAN-GP, which helps stabilize training
def gradient_penalty(D, real_data, fake_data):
    # Create random interpolation between real and fake data
    alpha = torch.rand(real_data.size(0), 1).to(device)
    alpha = alpha.expand_as(real_data)
    interpolates = (alpha * real_data + ((1 - alpha) * fake_data)).requires_grad_(True)

    # Get discriminator's opinion on the interpolated data
    d_interpolates = D(interpolates)

    # Calculate gradients of the discriminator's output with respect to the interpolated data
    gradients = torch.autograd.grad(
        outputs=d_interpolates, inputs=interpolates,
        grad_outputs=torch.ones_like(d_interpolates),
        create_graph=True, retain_graph=True
    )[0]

    # Calculate the norm (magnitude) of these gradients
    grad_norm = gradients.norm(2, dim=1)

    # Penalize gradients that deviate from 1 (Lipschitz constraint)
    return lambda_gp * ((grad_norm - 1) ** 2).mean()

# Main training loop
for epoch in range(epochs):
    # Process data in batches
    for real_batch, in real_loader:
        real_batch = real_batch.to(device)

        # === Train Discriminator ===
        # Multiple steps to ensure the discriminator is well-trained before generator updates
        for _ in range(4):  # Discriminator steps
            # Generate fake data from random noise
            z = torch.randn(real_batch.size(0), z_dim).to(device)
            fake_data = G(z).detach()  # Detach to avoid training generator here

            # Get discriminator scores for real and fake data
            d_real = D(real_batch)
            d_fake = D(fake_data)

            # Calculate gradient penalty for stability
            gp = gradient_penalty(D, real_batch, fake_data)

            # Wasserstein loss with gradient penalty
            d_loss = -torch.mean(d_real) + torch.mean(d_fake) + gp

            # Update discriminator weights
            D.zero_grad()
            d_loss.backward()
            d_opt.step()

        # === Train Generator ===
        # Multiple steps to improve generator against current discriminator
        for _ in range(4):  # Generator steps
            # Generate new fake data
            z = torch.randn(real_batch.size(0), z_dim).to(device)
            fake_data = G(z)

            # Get discriminator scores and hidden features for both real and fake data
            d_fake_score, fake_hidden = D(fake_data, return_hidden=True)
            _, real_hidden = D(real_batch, return_hidden=True)

            # Calculate how similar the distributions of real and fake features are
            sim_loss = similarity_loss(real_hidden, fake_hidden)

            # Total generator loss: try to fool discriminator + match feature distributions
            g_loss = -torch.mean(d_fake_score) + eta_sim * sim_loss

            # Update generator weights
            G.zero_grad()
            g_loss.backward()
            g_opt.step()

    # Print progress every 100 epochs
    if epoch % 100 == 0:
        print(f"[Epoch {epoch}] D_loss: {d_loss.item():.4f}, G_loss: {g_loss.item():.4f}")

In [None]:
# Create function for synthetic data generation
def generate_synthetic(G, n_samples=1000, z_dim=8):
    G.eval()
    with torch.no_grad():
        z = torch.randn(n_samples, z_dim).to(device)
        synth_data = G(z).cpu().numpy()
    return synth_data

# Create synthetic data
synthetic_minority = generate_synthetic(G, n_samples=10000)


In [None]:
columns_to_keep = ['primary_income', 'original_term', 'current_balance', 'original_balance', 'current_interest_rate',
'payment_due', 'arrears_balance', 'days_in_arrears', 'default_amount', 'original_ltv', 'current_ltv',
'original_valuation', 'current_valuation', 'age', 'past_balance', 'past_payment_due', 'past_interest_rate',
'past_ltv', 'past_valuation', 'incentive']

# Use synthetic_minority directly, because it is already scaled
synth_df = pd.DataFrame(synthetic_minority, columns=columns_to_keep)
synth_df['target'] = 1

# Use X_gan also in its scaled version:
X_gan_scaled = X_gan.copy()
X_gan_scaled[columns_to_keep] = scaler.transform(X_gan[columns_to_keep])

augmented_df = pd.concat([X_gan_scaled, synth_df], ignore_index=True)

In [None]:
# Split targets from data
X_gan_train = augmented_df.drop(['target'], axis=1)
y_gan_train = augmented_df['target']

In [None]:
# Keep numeric columns
X3_gan, X4_gan = keep([df3, df4])
gan_test = pd.concat([X3_gan, X4_gan], ignore_index=True)

# Split and scale, then convert back to DataFrame
X_gan_test_raw = gan_test.drop(columns=["target"])
X_gan_test = pd.DataFrame(
    scaler.transform(X_gan_test_raw),
    columns=X_gan_test_raw.columns,
    index=X_gan_test_raw.index
)
# Target
y_gan_test = gan_test["target"]

## R-GAN Model

In [None]:
# Define hyperparameter grid
param_grid = {
    'max_depth': [3, 5, 7],
    'learning_rate': [0.01, 0.05, 0.1],
    'subsample': [0.7, 0.8, 0.9],
    'colsample_bytree': [0.7, 0.8, 0.9],
    'gamma': [0, 0.1, 0.2],
    'min_child_weight': [1, 5, 10],
    'reg_alpha': [0, 0.1, 1],
    'reg_lambda': [0, 1, 10],
    'scale_pos_weight': [50, 100],
    'n_estimators': [500, 1000]
}

# Set up XGBoost without early stopping in the initializer
xgb_clf = xgb.XGBClassifier(
    objective='binary:logistic',
    eval_metric=['aucpr', 'logloss'],
    enable_categorical=True,
    use_label_encoder=False,
    verbosity=0,
    random_state=42
)

# 5-Fold Stratified Cross Validation
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)

# Custom scoring for imbalanced data
scoring = {
    'precision': make_scorer(precision_score, zero_division=0),
    'recall': make_scorer(recall_score, zero_division=0),
    'f1': make_scorer(f1_score, zero_division=0),
    'aucpr': 'average_precision'
}

# RandomizedSearchCV
grid = RandomizedSearchCV(
    estimator=xgb_clf,
    param_distributions=param_grid,
    n_iter=30,  # Reduced for faster execution
    scoring=scoring,
    refit='aucpr',
    cv=cv,
    n_jobs=-1,
    verbose=1,
    random_state=42,
    return_train_score=True
)

# Fit without early stopping in grid search
grid.fit(X_gan_train, y_gan_train)

# Now train final model with early stopping using best params
best_params = grid.best_params_.copy()


final_model = xgb.XGBClassifier(
    **best_params,
    objective='binary:logistic',
    eval_metric=['aucpr', 'logloss'],
    early_stopping_rounds=50,
    enable_categorical=True,
    use_label_encoder=False,
    verbosity=1,
    random_state=42
)

# Now apply early stopping
final_model.fit(
    X_gan_train, y_gan_train,
    eval_set=[(X_gan_test, y_gan_test)],
    verbose=True
)
# Best model
print("Best Parameters:", grid.best_params_)

# Evaluate on test set with optimal threshold
y_proba = final_model.predict_proba(X_gan_test)[:, 1]

# Find optimal threshold
precision, recall, thresholds = precision_recall_curve(y_gan_test, y_proba)
f1_scores = 2 * (precision * recall) / (precision + recall + 1e-9)
optimal_idx = np.argmax(f1_scores)
optimal_threshold = thresholds[optimal_idx]
y_pred = (y_proba >= optimal_threshold).astype(int)

# Metrics
print(f"\nOptimal Threshold: {optimal_threshold:.4f}")
print(f"Confusion Matrix:\n{confusion_matrix(y_gan_test, y_pred)}")
print(f"Precision: {precision_score(y_gan_test, y_pred, zero_division=0):.4f}")
print(f"Recall:    {recall_score(y_gan_test, y_pred, zero_division=0):.4f}")
print(f"F1 Score:  {f1_score(y_gan_test, y_pred, zero_division=0):.4f}")
print(f"ROC AUC:   {roc_auc_score(y_gan_test, y_proba):.4f}")
print(f"PR AUC:    {average_precision_score(y_gan_test, y_proba):.4f}")

# Precision-Recall Curve
plt.figure(figsize=(8, 6))
disp = PrecisionRecallDisplay.from_estimator(final_model, X_gan_test, y_gan_test)
plt.title('Precision-Recall Curve')
plt.grid(True)
plt.show()

# Learning curves
results = final_model.evals_result()
plt.figure(figsize=(10, 4))
plt.plot(results['validation_0']['logloss'], label='Test Log Loss')
plt.plot(results['validation_0']['aucpr'], label='Test AUC-PR')
plt.xlabel('Iterations')
plt.ylabel('Metric Value')
plt.title('Learning Curves')
plt.legend()
plt.grid(True)
plt.show()

# 1. ROC Curve (Enhanced)
plt.figure(figsize=(8, 6))
roc_auc = roc_auc_score(y_gan_test, y_proba)

# Plot without automatic legend
RocCurveDisplay.from_estimator(
    final_model,
    X_gan_test,
    y_gan_test,
    name=None  # Disable auto-legend
)
plt.plot([0, 1], [0, 1], 'k--', label='Random (AUC = 0.5)')
plt.title('Oversampled ROC Curve', fontsize=12, pad=20)
plt.grid(True, alpha=0.3)

# Add custom legend
plt.legend(
    [f"XGBoost (AUC = {roc_auc:.2f})", "Random Classifier"],
    loc='lower right',
    framealpha=1
)
plt.show()

# 2. Precision-Recall Curve (Enhanced)
plt.figure(figsize=(8, 6))
ap_score = average_precision_score(y_gan_test, y_proba)

# Create without auto-legend
PrecisionRecallDisplay.from_predictions(
    y_gan_test,
    y_proba,
    name="XGBoost"
)
plt.title('Oversampled Precision-Recall Curve', fontsize=12, pad=20)
plt.grid(True, alpha=0.3)

# Add custom legend
plt.legend(
    [f"XGBoost (AP = {ap_score:.2f})"],
    loc='upper right',
    framealpha=1
)
plt.show()

# 3. Enhanced Learning Curves
plt.figure(figsize=(10, 5))
plt.plot(results['validation_0']['logloss'],
         label='Validation Log Loss',
         color='#1f77b4',
         linewidth=2)
plt.plot(results['validation_0']['aucpr'],
         label='Validation PR-AUC',
         color='#ff7f0e',
         linewidth=2)
plt.xlabel('Boosting Rounds', fontsize=10)
plt.ylabel('Metric Value', fontsize=10)
plt.title('(c) XGBoost Learning Curves', fontsize=12, pad=20)
plt.legend(fontsize=10)
plt.grid(True, alpha=0.3)
plt.show()

## Mahalanobis Model

In [None]:
# Define hyperparameter grid
param_grid = {
    'max_depth': [3, 5, 7],
    'learning_rate': [0.01, 0.05, 0.1],
    'subsample': [0.7, 0.8, 0.9],
    'colsample_bytree': [0.7, 0.8, 0.9],
    'gamma': [0, 0.1, 0.2],
    'min_child_weight': [1, 5, 10],
    'reg_alpha': [0, 0.1, 1],
    'reg_lambda': [0, 1, 10],
    'scale_pos_weight': [50, 100],
    'n_estimators': [500, 1000]
}

# Set up XGBoost without early stopping in the initializer
xgb_clf = xgb.XGBClassifier(
    objective='binary:logistic',
    eval_metric=['aucpr', 'logloss'],
    enable_categorical=True,
    use_label_encoder=False,
    verbosity=0,
    random_state=42
)

# 5-Fold Stratified Cross Validation
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)

# Custom scoring for imbalanced data
scoring = {
    'precision': make_scorer(precision_score, zero_division=0),
    'recall': make_scorer(recall_score, zero_division=0),
    'f1': make_scorer(f1_score, zero_division=0),
    'aucpr': 'average_precision'
}

# RandomizedSearchCV
grid = RandomizedSearchCV(
    estimator=xgb_clf,
    param_distributions=param_grid,
    n_iter=30,  # Reduced for faster execution
    scoring=scoring,
    refit='aucpr',
    cv=cv,
    n_jobs=-1,
    verbose=1,
    random_state=42,
    return_train_score=True
)

# Fit without early stopping in grid search
grid.fit(X_mah_train, y_mah_train)

# Now train final model with early stopping using best params
best_params = grid.best_params_.copy()


final_model = xgb.XGBClassifier(
    **best_params,  # Includes all best params except n_estimators
    objective='binary:logistic',
    eval_metric=['aucpr', 'logloss'],
    early_stopping_rounds=50,
    enable_categorical=True,
    use_label_encoder=False,
    verbosity=1,
    random_state=42
)

# Now apply early stopping
final_model.fit(
    X_mah_train, y_mah_train,
    eval_set=[(X_mah_val, y_mah_val)],
    verbose=True
)
# Best model
print("Best Parameters:", grid.best_params_)

# Evaluate on test set with optimal threshold
y_proba = final_model.predict_proba(X_mah_val)[:, 1]

# Find optimal threshold
precision, recall, thresholds = precision_recall_curve(y_mah_val, y_proba)
f1_scores = 2 * (precision * recall) / (precision + recall + 1e-9)
optimal_idx = np.argmax(f1_scores)
optimal_threshold = thresholds[optimal_idx]
y_pred = (y_proba >= optimal_threshold).astype(int)

# Metrics
print(f"\nOptimal Threshold: {optimal_threshold:.4f}")
print(f"Confusion Matrix:\n{confusion_matrix(y_mah_val, y_pred)}")
print(f"Precision: {precision_score(y_mah_val, y_pred, zero_division=0):.4f}")
print(f"Recall:    {recall_score(y_mah_val, y_pred, zero_division=0):.4f}")
print(f"F1 Score:  {f1_score(y_mah_val, y_pred, zero_division=0):.4f}")
print(f"ROC AUC:   {roc_auc_score(y_mah_val, y_proba):.4f}")
print(f"PR AUC:    {average_precision_score(y_mah_val, y_proba):.4f}")

# Precision-Recall Curve
plt.figure(figsize=(8, 6))
disp = PrecisionRecallDisplay.from_estimator(final_model, X_mah_val, y_mah_val)
plt.title('Precision-Recall Curve')
plt.grid(True)
plt.show()

# Learning curves
results = final_model.evals_result()
plt.figure(figsize=(10, 4))
plt.plot(results['validation_0']['logloss'], label='Test Log Loss')
plt.plot(results['validation_0']['aucpr'], label='Test AUC-PR')
plt.xlabel('Iterations')
plt.ylabel('Metric Value')
plt.title('Learning Curves')
plt.legend()
plt.grid(True)
plt.show()