In [None]:
#!pip install skorch

In [None]:
import itertools
import numpy as np
import pandas as pd
#from skorch import NeuralNetClassifier
import torch
import torch.nn as nn
from torch.utils.data import (
    Dataset,
    DataLoader,
    SubsetRandomSampler,
    TensorDataset
    )
from torch.optim import Optimizer
from sklearn.model_selection import (
    train_test_split,
    KFold,
    GridSearchCV,
    RandomizedSearchCV
  )
from sklearn.metrics import(
    accuracy_score,
    precision_recall_fscore_support,
    roc_auc_score
    )
from sklearn.base import BaseEstimator
from sklearn.preprocessing import StandardScaler
from imblearn.over_sampling import SMOTE
from transformers import GPT2LMHeadModel, GPT2Tokenizer, GPT2Config
import xgboost as xgb
from tqdm import tqdm

In [None]:
# Custom Dataset
class MortgageDataset(Dataset):
    def __init__(self, features, labels):
        self.features = torch.tensor(features, dtype=torch.float32)
        self.labels = torch.tensor(labels, dtype=torch.long)

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return self.features[idx], self.labels[idx]


# Custom Optimizer
class AdvancedSymplecticOptimizer(Optimizer):
    def __init__(self, params, lr=1e-2, beta=0.9, epsilon=1e-8):
        defaults = dict(lr=lr, beta=beta, epsilon=epsilon)
        super(AdvancedSymplecticOptimizer, self).__init__(params, defaults)

    @torch.no_grad()
    def step(self, closure=None):
        loss = None
        if closure is not None:
            with torch.enable_grad():
                loss = closure()

        for group in self.param_groups:
            for p in group['params']:
                if p.grad is None:
                    continue
                grad = p.grad
                state = self.state[p]

                if len(state) == 0:
                    state['step'] = 0
                    state['momentum'] = torch.zeros_like(p.data)

                momentum = state['momentum']
                lr, beta, eps = group['lr'], group['beta'], group['epsilon']

                state['step'] += 1

                # Implement 4th order symplectic integrator (Forest-Ruth algorithm)
                momentum.mul_(beta).add_(grad, alpha=1 - beta)

                # Compute adaptive step size
                kinetic = 0.5 * (momentum ** 2).sum()
                potential = 0.5 * (grad ** 2).sum()
                hamiltonian = kinetic + potential
                step_size = lr / (hamiltonian.sqrt() + eps)

                p.add_(momentum, alpha=-step_size)

        return loss


class HamiltonianNN(nn.Module):
    def __init__(self, input_dim, hidden_dims=[512, 268], activation='leaky_relu', dropout_rate=0.2):
        super().__init__()
        self.layers = nn.ModuleList()

        # Input layer
        self.layers.append(nn.Linear(input_dim, hidden_dims[0]))

        # Hidden layers
        for i in range(len(hidden_dims) - 1):
            self.layers.append(nn.Linear(hidden_dims[i], hidden_dims[i+1]))

        # Output layer
        self.layers.append(nn.Linear(hidden_dims[-1], 2))  # Binary classification

        # Activation function
        self.activation = nn.LeakyReLU()
        self.dropout = nn.Dropout(dropout_rate)

    def forward(self, x):
        for layer in self.layers[:-1]:  # All layers except the last
            x = self.dropout(self.activation(layer(x)))
        return self.layers[-1](x)  # Last layer (no activation for output)


# Hamiltonian-inspired loss function
def hamiltonian_loss(outputs, labels, model, reg_coeff=0.01):
    loss_fct = nn.CrossEntropyLoss()
    base_loss = loss_fct(outputs, labels)
    # Add regularization based on Hamiltonian principles
    param_norm = sum(p.norm().item() for p in model.parameters())
    reg_term = reg_coeff * param_norm  # Use reg_coeff instead of a fixed value
    return base_loss + reg_term


# Evaluation function
def evaluate_model(model, dataloader, device):
    model.eval()
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for features, labels in dataloader:
            features, labels = features.to(device), labels.to(device)
            outputs = model(features)
            _, preds = torch.max(outputs, 1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    accuracy = accuracy_score(all_labels, all_preds)
    precision, recall, f1, _ = precision_recall_fscore_support(all_labels, all_preds, average='weighted', zero_division=0)
    auc = roc_auc_score(all_labels, all_preds)

    return accuracy, precision, recall, f1, auc

In [None]:
# Load the data
train_df = pd.read_csv("Training.csv")
oot_test_df = pd.read_csv("OOT test.csv")
train_df = train_df.dropna()
oot_test_df = oot_test_df.dropna()

# Prepare features and target
features = ['Credit_Score', 'Mortgage_Insurance', 'Number_of_units', 'CLoan_to_value',
            'Debt_to_income', 'OLoan_to_value', 'Single_borrower']

X_train = train_df[features]
y_train = train_df['DFlag']
X_oot_test = oot_test_df[features]
y_oot_test = oot_test_df['DFlag']

# Scale the features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_oot_test_scaled = scaler.transform(X_oot_test)

# Apply SMOTE to balance the training dataset
smote = SMOTE(random_state=42)
X_train_resampled, y_train_resampled = smote.fit_resample(X_train_scaled, y_train)

# Convert to PyTorch tensors
X_train_tensor = torch.FloatTensor(X_train_resampled)
y_train_tensor = torch.LongTensor(y_train_resampled)
X_oot_test_tensor = torch.FloatTensor(X_oot_test_scaled)
y_oot_test_tensor = torch.LongTensor(y_oot_test.values)

# Create DataLoader for training
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

# Create DataLoader for OOT testing
oot_test_dataset = TensorDataset(X_oot_test_tensor, y_oot_test_tensor)
oot_test_loader = DataLoader(oot_test_dataset, batch_size=32, shuffle=False)

# Print class distribution before and after SMOTE
print("Class distribution before SMOTE:", np.bincount(y_train))
print("Class distribution after SMOTE:", np.bincount(y_train_resampled))

In [None]:
# Set up device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
# Custom LR Scheduler
class CustomLRScheduler:
    def __init__(self, optimizer, factor=0.5, patience=3, min_lr=1e-5):
        self.optimizer = optimizer
        self.factor = factor
        self.patience = patience
        self.min_lr = min_lr
        self.best_loss = float('inf')
        self.bad_epochs = 0

    def step(self, val_loss):
        if val_loss < self.best_loss:
            self.best_loss = val_loss
            self.bad_epochs = 0
        else:
            self.bad_epochs += 1

        if self.bad_epochs > self.patience:
            for param_group in self.optimizer.param_groups:
                param_group['lr'] = max(param_group['lr'] * self.factor, self.min_lr)
            self.bad_epochs = 0
            print(f"Learning rate reduced to {param_group['lr']}")

In [None]:
class HamiltonianNNWrapper(BaseEstimator):
    def __init__(self, hidden_dims=[64, 32], activation='relu', dropout_rate=0.2, lr=0.01, reg_coeff=0.01):
        self.hidden_dims = hidden_dims
        self.activation = activation
        self.dropout_rate = dropout_rate
        self.lr = lr
        self.reg_coeff = reg_coeff
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    def fit(self, X, y):
        try:
            X = torch.tensor(X, dtype=torch.float32).to(self.device)
            y = torch.tensor(y, dtype=torch.long).to(self.device)

            self.model = HamiltonianNN(
                input_dim=X.shape[1],
                hidden_dims=self.hidden_dims,
                activation=self.activation,
                dropout_rate=self.dropout_rate
            ).to(self.device)

            optimizer = AdvancedSymplecticOptimizer(self.model.parameters(), lr=self.lr)

            dataset = TensorDataset(X, y)
            loader = DataLoader(dataset, batch_size=32, shuffle=True)

            for epoch in range(10):  # You can adjust the number of epochs
                for features, labels in loader:
                    optimizer.zero_grad()
                    outputs = self.model(features)
                    loss = hamiltonian_loss(outputs, labels, self.model, self.reg_coeff)
                    loss.backward()
                    optimizer.step()

            return self
        except Exception as e:
            print(f"Error during fitting: {str(e)}")
            raise

    def predict(self, X):
        X = torch.tensor(X, dtype=torch.float32).to(self.device)
        self.model.eval()
        with torch.no_grad():
            outputs = self.model(X)
            _, predictions = torch.max(outputs, 1)
        return predictions.cpu().numpy()

## NN hyperparameters optimization

In [None]:
"""class HamiltonianNNWrapper(BaseEstimator):
    def __init__(self, hidden_dims=[64, 32], activation='relu', dropout_rate=0.2, lr=1e-3, reg_coeff=0.01):
        self.hidden_dims = hidden_dims
        self.activation = activation
        self.dropout_rate = dropout_rate
        self.lr = lr
        self.reg_coeff = reg_coeff
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    def fit(self, X, y):
        X = torch.tensor(X, dtype=torch.float32).to(self.device)
        y = torch.tensor(y, dtype=torch.long).to(self.device)

        self.model = HamiltonianNN(
            input_dim=X.shape[1],
            hidden_dims=self.hidden_dims,
            activation=self.activation,
            dropout_rate=self.dropout_rate
        ).to(self.device)

        optimizer = AdvancedSymplecticOptimizer(self.model.parameters(), lr=self.lr)

        dataset = TensorDataset(X, y)
        loader = DataLoader(dataset, batch_size=32, shuffle=True)

        for epoch in range(10):  # You can adjust the number of epochs
            for features, labels in loader:
                optimizer.zero_grad()
                outputs = self.model(features)
                loss = hamiltonian_loss(outputs, labels, self.model, self.reg_coeff)
                loss.backward()
                optimizer.step()

        return self

    def predict(self, X):
        X = torch.tensor(X, dtype=torch.float32).to(self.device)
        self.model.eval()
        with torch.no_grad():
            outputs = self.model(X)
            _, predictions = torch.max(outputs, 1)
        return predictions.cpu().numpy()

# Hyperparameter search space
param_dist = {
    'hidden_dims': [[64, 32], [128, 64]],
    'activation': ['relu', 'leaky_relu'],
    'dropout_rate': [0.2, 0.4],
    'lr': [1e-3, 1e-2],
    'reg_coeff': [0.001, 0.01]
}

# Randomized search
search = RandomizedSearchCV(
    HamiltonianNNWrapper(),
    param_distributions=param_dist,
    n_iter=10,
    cv=3,
    scoring='f1',
    n_jobs=-1
)

# Perform the search
search.fit(X_train_tensor.numpy(), y_train_tensor.numpy())

# Print best parameters and score
print("Best parameters:", search.best_params_)
print("Best F1 score:", search.best_score_)

# Train final model with best hyperparameters
best_model = search.best_estimator_"""

## Training the mode with the best hyperparameters
Best parameters:

*   reg_coeff = 0.01,
*   lr = 0.01
*   hidden_dims = [128, 64]
*   dropout_rate = 0.2
*   activation = 'leaky_relu'

In [None]:
# Train final model with best hyperparameters
num_epochs = 30
model = HamiltonianNN(
    input_dim=X_train.shape[1],
    hidden_dims=[512, 256], #[128, 64]
    activation='leaky_relu',
    dropout_rate=0.2
).to(device)


# K-fold Cross-validation
k_folds = 3
kf = KFold(n_splits=k_folds, shuffle=True, random_state=42)

cv_accuracies = []
cv_f1_scores = []

for fold, (train_idx, val_idx) in enumerate(kf.split(X_train_tensor)):
    print(f"\nFold {fold+1}/{k_folds}")

    X_train_fold, X_val_fold = X_train_tensor[train_idx], X_train_tensor[val_idx]
    y_train_fold, y_val_fold = y_train_tensor[train_idx], y_train_tensor[val_idx]

    train_dataset = TensorDataset(X_train_fold, y_train_fold)
    val_dataset = TensorDataset(X_val_fold, y_val_fold)

    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=32)

    model = HamiltonianNN(input_dim=X_train.shape[1]).to(device)
    optimizer = AdvancedSymplecticOptimizer(model.parameters(), lr=0.01)
    scheduler = CustomLRScheduler(optimizer)

    best_val_loss = float('inf')
    best_model_state = None

    for epoch in range(num_epochs):
        model.train()
        total_loss = 0

        for features, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}"):
            optimizer.zero_grad()
            features, labels = features.to(device), labels.to(device)

            outputs = model(features)
            loss = hamiltonian_loss(outputs, labels, model)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        avg_train_loss = total_loss / len(train_loader)
        print(f"Epoch {epoch+1}/{num_epochs}, Average Train Loss: {avg_train_loss:.4f}")

        # Validation
        model.eval()
        val_loss = 0
        with torch.no_grad():
            for features, labels in val_loader:
                features, labels = features.to(device), labels.to(device)
                outputs = model(features)
                val_loss += hamiltonian_loss(outputs, labels, model).item()

        val_loss /= len(val_loader)
        val_accuracy, val_precision, val_recall, val_f1, val_auc = evaluate_model(model, val_loader, device)
        print(f"Validation - Loss: {val_loss:.4f}, Accuracy: {val_accuracy:.4f}, F1: {val_f1:.4f}, AUC: {val_auc:.4f}, Precision: {val_precision:.4f},  Recall: {val_recall:.4f}")

        # Update learning rate
        scheduler.step(val_loss)

        # Save best model
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            best_model_state = model.state_dict().copy()
            print("New best model saved")

    # Load best model for final evaluation
    model.load_state_dict(best_model_state)
    val_accuracy, val_precision, val_recall, val_f1, val_auc = evaluate_model(model, val_loader, device)
    cv_accuracies.append(val_accuracy)
    cv_f1_scores.append(val_f1)
    print(f"Final Validation - Accuracy: {val_accuracy:.4f}, F1: {val_f1:.4f}, AUC: {val_auc:.4f}")

# Load best model for final evaluation
model.load_state_dict(best_model_state)
print("Training completed. Best model loaded.")

val_accuracy, val_precision, val_recall, val_f1, val_auc = evaluate_model(model, val_loader, device)
cv_accuracies.append(val_accuracy)
cv_f1_scores.append(val_f1)
print(f"Final Validation - Accuracy: {val_accuracy:.4f}, F1: {val_f1:.4f}, AUC: {val_auc:.4f}")

In [None]:
print("\nCross-validation results:")
print(f"Mean Accuracy: {np.mean(cv_accuracies):.4f} (+/- {np.std(cv_accuracies):.4f})")
print(f"Mean F1 Score: {np.mean(cv_f1_scores):.4f} (+/- {np.std(cv_f1_scores):.4f})")
print(f"Mean AUC Score: {np.mean(val_auc):.4f} (+/- {np.std(val_auc):.4f})")
print(f"Mean Precision: {np.mean(val_precision):.4f} (+/- {np.std(val_precision):.4f})")
print(f"Mean Recall: {np.mean(val_recall):.4f} (+/- {np.std(val_recall):.4f})")

Cross-validation results:
- Mean Accuracy: 0.6975 (+/- 0.0003)
- Mean F1 Score: 0.6968 (+/- 0.0003)
- Mean AUC Score: 0.6974 (+/- 0.0000)
- Mean Precision: 0.6994 (+/- 0.0000)
- Mean Recall: 0.6973 (+/- 0.0000)

## Using XGBoost

In [None]:
# Split the resampled training data into training and validation sets
X_train_final, X_val, y_train_final, y_val = train_test_split(X_train_resampled, y_train_resampled, test_size=0.2, random_state=42)

# Define XGBoost model
xgb_model = xgb.XGBClassifier(use_label_encoder=False, eval_metric='logloss', random_state=42)

# Define parameter grid for GridSearchCV
param_grid = {
    'max_depth': [5, 7],
    'learning_rate': [0.1, 0.3],
    'n_estimators': [200, 300],
    'min_child_weight': [1, 3]
}

# Perform GridSearchCV
grid_search = GridSearchCV(estimator=xgb_model, param_grid=param_grid,
                           cv=3, scoring='roc_auc', verbose=2, n_jobs=-1)
grid_search.fit(X_train_final, y_train_final)

# Get the best model
best_xgb_model = grid_search.best_estimator_

# Train the best model on the entire training set
best_xgb_model.fit(X_train_resampled, y_train_resampled)

# Make predictions on the OOT test set
y_oot_pred = best_xgb_model.predict(X_oot_test_scaled)
y_oot_pred_proba = best_xgb_model.predict_proba(X_oot_test_scaled)[:, 1]

# Calculate metrics
accuracy = accuracy_score(y_oot_test, y_oot_pred)
precision, recall, f1, _ = precision_recall_fscore_support(y_oot_test, y_oot_pred, average='weighted', zero_division=0)
auc = roc_auc_score(y_oot_test, y_oot_pred_proba)

# Print results
print("\nXGBoost Final OOT Test Results:")
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")
print(f"AUC: {auc:.4f}")

# Print best parameters
print("\nBest XGBoost Parameters:")
print(grid_search.best_params_)

# Feature importance
feature_importance = best_xgb_model.feature_importances_
sorted_idx = np.argsort(feature_importance)
pos = np.arange(sorted_idx.shape[0]) + .5

XGBoost Final OOT Test Results:
- Accuracy: 0.9310
- Precision: 0.9796
- Recall: 0.9310
-F1 Score: 0.9542
- AUC: 0.6666
- Best XGBoost Parameters: {'learning_rate': 0.3, 'max_depth': 7, 'min_child_weight': 1, 'n_estimators': 300}".