In [2]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import confusion_matrix
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import StratifiedKFold
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score



In [3]:
# Dataset && DataLoader
class HouseDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X.values, dtype=torch.float32)  # Convert DataFrame to tensor
        self.y = torch.tensor(y.values, dtype=torch.long)  # Convert pd.Series to numpy.ndarray and then to tensor

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]


# PyTorch
class MLP(nn.Module):
    def __init__(self, input_size, output_size, dropout_prob=0.5, l2_reg=0.001):
        super(MLP, self).__init__()
        self.fc1 = nn.Linear(input_size, 128)
        self.bn1 = nn.BatchNorm1d(128)
        self.dropout1 = nn.Dropout(dropout_prob)

        self.fc2 = nn.Linear(128, 64)
        self.bn2 = nn.BatchNorm1d(64)
        self.dropout2 = nn.Dropout(dropout_prob)

        self.fc3 = nn.Linear(64, output_size)

        self.l2_reg = l2_reg ## Adjust the value of l2_reg to find the suitable parameter
        
        self.best_loss = float('inf')
        self.patience = 5
        self.current_patience = 0

    # Function: Relu(), using 0.15 dropout in every layer to reduce fit
    def forward(self, x):
        x = F.relu(self.bn1(self.fc1(x)))
        x = self.dropout1(x)
        x = F.relu(self.bn2(self.fc2(x)))
        x = self.dropout2(x)
        x = self.fc3(x)

        return x

    def l2_regularization(self):
        l2_loss = 0
        for param in self.parameters():
            l2_loss += torch.norm(param, p=2) ** 2
        return self.l2_reg * l2_loss
    
    def early_stop(self, val_loss):
        if val_loss < self.best_loss:
            self.best_loss = val_loss
            self.current_patience = 0
        else:
            self.current_patience += 1
            if self.current_patience >= self.patience:
                return True
        return False

In [4]:
def train_model(model, criterion, optimizer, scheduler, train_loader, test_loader, num_epochs):
    losses = []
    val_losses = []
    ## If the pc has cuda, using it to calculate so that the speed is faster
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0
        for i, (inputs, labels) in enumerate(train_loader):
            # Move inputs and labels to the same device as the model
            inputs, labels = inputs.to(device), labels.to(device)
            
            outputs = model(inputs)
            loss = criterion(outputs, labels) + model.l2_regularization()

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        scheduler.step() #Using the scheduler to adjust the learning rate every epoch
        train_loss = running_loss / len(train_loader) # Calculate the training loss
        losses.append(train_loss)

        model.eval()
        val_loss = 0.0
        with torch.no_grad():
            for inputs, labels in test_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                val_loss += loss.item()

        val_loss /= len(test_loader)
        val_losses.append(val_loss)

        print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}")

    return losses, val_losses

In [5]:
# Prediction every fold
def predict_fold(model, test_loader):
    fold_true_labels = []
    fold_predictions = []
    
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)
    
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            fold_true_labels.extend(labels.cpu().numpy())
            fold_predictions.extend(predicted.cpu().numpy())

    return fold_true_labels, fold_predictions

In [6]:
# Evaluation function, return accuracy, precision, recall and F1 score to evaluate the model
def evaluate_fold(y_true, y_pred):
    accuracy = accuracy_score(y_true, y_pred)
    precision = precision_score(y_true, y_pred, average='binary')
    recall = recall_score(y_true, y_pred, average='binary')
    f1 = f1_score(y_true, y_pred, average='binary')
    return accuracy, precision, recall, f1

In [8]:
# Load the data from Bank1, Bank2, Bank3
data1 = pd.read_csv('../data/Bank1_New.csv')
data2 = pd.read_csv('../data/Bank2_New.csv')
data3 = pd.read_csv('../data/Bank3_New.csv')

# Combine them and reset the index of the data
data = pd.concat([data1, data2, data3], ignore_index=True)
data = data.sample(frac=1, random_state=42).reset_index(drop=True)

# Separate features and target variable
X = data.iloc[:, :-1]  # Features
y = data.iloc[:, -1]  # Target variable

# Normalize features (except the second to last column)
scaler = MinMaxScaler()
X_normalized = X.iloc[:, :-1]  # Exclude the second to last column (L-3)
X_normalized = scaler.fit_transform(X_normalized)

# Add the PCA result column back to the normalized features
X_normalized = pd.DataFrame(X_normalized, columns=X.columns[:-1])
X_normalized["L-3"] = X.iloc[:, -2]  # Add the PCA result column

# Split the dataset into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X_normalized, y, test_size=0.2, random_state=42)

# Initialize the model, loss function and optimizer
input_size = X.shape[1]
output_size = 2
model = MLP(input_size, output_size)
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-5)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=32, gamma=0.1)


# Training the model
num_splits = 5 # K = 5
skf = StratifiedKFold(n_splits=num_splits, shuffle=True, random_state=42)

metrics_per_fold = []

for fold, (train_index, test_index) in enumerate(skf.split(X, y)):
    print(f"Fold {fold + 1}")
    X_train, X_test = X_normalized.iloc[train_index], X_normalized.iloc[test_index]
    y_train, y_test = y.iloc[train_index], y.iloc[test_index]

    train_dataset = HouseDataset(X_train, y_train)
    test_dataset = HouseDataset(X_test, y_test)

    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

    model = MLP(input_size, output_size)

    # Cross-entropy is used to determine how close the actual output is to the desired output
    # It is suitable for classification models
    criterion = nn.CrossEntropyLoss() 
    optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-5)
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=32, gamma=0.1)

    num_epochs = 15
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    losses, val_losses = train_model(model, criterion, optimizer, scheduler, train_loader, test_loader, num_epochs)

    fold_true_labels, fold_predictions = predict_fold(model, test_loader)
    # Get the evaluation rates every FOLD
    fold_accuracy, fold_precision, fold_recall, fold_f1 = evaluate_fold(fold_true_labels, fold_predictions)
    print(f"Fold {fold + 1} Accuracy: {fold_accuracy:.4f}")
    print(f"Fold {fold + 1} Precision: {fold_precision:.4f}")
    print(f"Fold {fold + 1} Recall: {fold_recall:.4f}")
    print(f"Fold {fold + 1} F1 Score: {fold_f1:.4f}")

    metrics_per_fold.append((fold_accuracy, fold_precision, fold_recall, fold_f1))
        
    fold_confusion_matrix = confusion_matrix(fold_true_labels, fold_predictions)
    print(f"Fold {fold + 1} Confusion Matrix:\n{fold_confusion_matrix}")

# Final results
average_metrics = np.mean(metrics_per_fold, axis=0)
print(f"Average Accuracy: {average_metrics[0]:.4f}")
print(f"Average Precision: {average_metrics[1]:.4f}")
print(f"Average Recall: {average_metrics[2]:.4f}")
print(f"Average F1 Score: {average_metrics[3]:.4f}")



Fold 1
Epoch [1/15], Loss: 0.5938, Val Loss: 0.2568
Epoch [2/15], Loss: 0.4880, Val Loss: 0.2429
Epoch [3/15], Loss: 0.4343, Val Loss: 0.2335
Epoch [4/15], Loss: 0.4063, Val Loss: 0.2299
Epoch [5/15], Loss: 0.3780, Val Loss: 0.2255
Epoch [6/15], Loss: 0.3543, Val Loss: 0.2262
Epoch [7/15], Loss: 0.3395, Val Loss: 0.2256
Epoch [8/15], Loss: 0.3301, Val Loss: 0.2229
Epoch [9/15], Loss: 0.3211, Val Loss: 0.2260
Epoch [10/15], Loss: 0.3122, Val Loss: 0.2258
Epoch [11/15], Loss: 0.3086, Val Loss: 0.2259
Epoch [12/15], Loss: 0.3019, Val Loss: 0.2260
Epoch [13/15], Loss: 0.2962, Val Loss: 0.2238
Epoch [14/15], Loss: 0.2962, Val Loss: 0.2259
Epoch [15/15], Loss: 0.2891, Val Loss: 0.2236
Fold 1 Accuracy: 0.9112
Fold 1 Precision: 0.7944
Fold 1 Recall: 0.6043
Fold 1 F1 Score: 0.6864
Fold 1 Confusion Matrix:
[[1649   51]
 [ 129  197]]
Fold 2
Epoch [1/15], Loss: 0.5732, Val Loss: 0.2489
Epoch [2/15], Loss: 0.4789, Val Loss: 0.2353
Epoch [3/15], Loss: 0.4347, Val Loss: 0.2333
Epoch [4/15], Loss: 0.4

KeyboardInterrupt: 