# TO DO

1. finish objective function - look at code on how to call dataloaders and our custom classes on chatgpt
2. get it to plot train loss vs val loss on best model???
3. get it to spit out accuracy, hit rate, f1 and ROC AUC. get it to plot ROC AUC graph??
4. get it to output best HP combo
5. check paper for other tips and tricks they used.
6. look into Cell-based Architectures: Explore more structured approaches for dynamic architectures, such as cell-based architectures (like those used in NASNet or EfficientNet), which offer a balance between flexibility and control



---

# Intro

**Plan**: Import credit card fraud data. Use encoder only transformer network for classifying time series credit card data

**Purpose/Intro**: Task is to develop transformer architecture proof of concept for potential application at work, detecting fraud. In a normal data science project it might be considered best practice to begin with more interpretable models first, for research purposes, but this project is solely for the purpose of assessing the viability of a transformer for this task.

**Hypothesis**: The attention mechanism of the transformer, when combined with an appropriate positional embedding method, is able to capture both long-term and short-term dependencies in time series credit-card fraud data.

**Methodology**: Using cross valdiation techniques on test dataset to calculate appropriate accuracy metrics (adjusting for the significant class imbalance for the dataset), with an aim to assess the viability of transformer networks for fraud classification.

Credit to the below paper, **referred to as Source 1**, for the methodology design: Yu, C., Xu, Y., Cao, J., Zhang, Y., Jin, Y. and Zhu, M. (2024) 'Credit Card Fraud Detection Using Advanced Transformer Model', arXiv preprint arXiv:2406.03733. Available at: https://arxiv.org/abs/2406.03733 (Accessed: 18 December 2024)

This paper has demonstrated the utility for the transformer that we are about to create, by comparing the methodology with various other shallow learning techniques. In future projects I aim to validate this myself.





---

# Data Sourcing and Package loading



In [None]:

#import packages:

import os
os.environ['NUMBAPRO_NVVM'] = '/usr/local/cuda/nvvm/lib64/libnvvm.so'
os.environ['NUMBAPRO_LIBDEVICE'] = '/usr/local/cuda/nvvm/libdevice/'

from google.colab import drive

try:
  import google.colab
  IN_COLAB = True
except:
  IN_COLAB = False

if IN_COLAB:
  # Check if drive is mounted by looking for the mount point in the file system.
  import os
  if not os.path.exists('/content/drive'):
    drive.mount('/content/drive')

#basics
import os
from google.colab import drive
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
!pip install optuna
import optuna


#table one
!pip install tableone
from tableone import TableOne

#torch
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, TensorDataset


#sklearn
from imblearn.over_sampling import RandomOverSampler
from imblearn.over_sampling import SMOTE
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import StratifiedKFold, GridSearchCV
from sklearn.metrics import roc_auc_score
from sklearn.model_selection import cross_val_score, train_test_split, RepeatedStratifiedKFold, KFold
from sklearn.metrics import accuracy_score, roc_auc_score, precision_score, recall_score, f1_score
from sklearn.preprocessing import MinMaxScaler

from imblearn.over_sampling import RandomOverSampler

In [None]:
data_set_filepath = '/content/drive/MyDrive/Colab_Notebooks/Data/creditcard.feather'

df = pd.read_feather(data_set_filepath)

columns = df.columns.tolist()

print(f"The dataset lenghth is {str(len(df))}")
print(f"The number of columns is {str(len(columns))}")
print(f"The column names are {str(columns)}")
df.head(10)

table1 = TableOne(df, columns=columns, groupby= 'Class', pval=True)
print(table1)

data = df







---


# Data loading and preprocessing:
In Source 1 (listed above), it was found that there are performance boosts associated with removing outliers, as it may help with overfitting. This will be done on the training data only. This is to prevent information leakage from our training set.

The source also suggests there is value in oversampling the minority class. This may be due to the unique challenges of such a large class imbalance. This will be done on the training data only. This is to prevent information leakage from our training set.

In addition, we will min-max scale our validation and training sets, and apply this same scaling to the test data.

In [None]:
# **Set device for GPU acceleration**
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# **Error warning if no GPU is detected**
if device.type != 'cuda':
    print("WARNING: GPU is not available. The model will run on the CPU, which might be slower.")
else:
    print("Cuda setup successful")
X = data.iloc[:, :-1]  # Features (all columns except the last one)
y = data.iloc[:, -1]   # Labels (the last column)

# Data Preprocessing Transformation Class
class DataPreprocessingTransform:
    def __init__(self):
        self.scaler = MinMaxScaler()

    def fit_transform(self, X_train):
        """
        Fit scaler on the training set and transform it.
        """
        return pd.DataFrame(self.scaler.fit_transform(X_train), columns=X_train.columns)

    def transform(self, X):
        """
        Apply the scaling transformation based on the training set scaling.
        """
        return pd.DataFrame(self.scaler.transform(X), columns=X.columns)


# Custom PyTorch Dataset
class CustomDataset(Dataset):
    def __init__(self, X, y, transform=None, resample=False):
        """
        Custom dataset to handle transformations and optional resampling.

        Parameters:
        - X: Features (pandas DataFrame)
        - y: Labels (pandas Series)
        - transform: Transformation function for features
        - resample: Whether to apply SMOTE for resampling the minority class
        """
        self.transform = transform

        if resample:
            X, y = self._apply_smote(X, y)

        self.X = X.reset_index(drop=True)  # Ensure consistent indexing
        self.y = y.reset_index(drop=True)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, index):
        sample_X = self.X.iloc[index]
        sample_y = self.y.iloc[index]

        if self.transform:
            sample_X = self.transform(sample_X)

        return sample_X.values, sample_y

    def _apply_smote(self, X, y):
        """
        Applies SMOTE to the data to resample the minority class.
        """
        smote = SMOTE(sampling_strategy='auto', random_state=42)
        X_resampled, y_resampled = smote.fit_resample(X, y)
        return pd.DataFrame(X_resampled, columns=X.columns), pd.Series(y_resampled)


# Function to remove outliers using IQR
def remove_outliers(X):
    """
    Removes outliers from the dataset based on the IQR method.
    """
    Q1 = X.quantile(0.25)
    Q3 = X.quantile(0.75)
    IQR = Q3 - Q1
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR
    X_filtered = X[(X >= lower_bound) & (X <= upper_bound)]
    return X_filtered





---
# Transformer Model



We are going to implement the transformer and optimise the hyperparameters using the Optuna package. As per the paper listed above, we will resample from the minority class when training.

In [None]:

# **Transformer Model with Batch Normalization and Residual Connections**
class FraudDetectionTransformer(nn.Module):

      #init method dynamically builds model architecture
      def __init__(self, input_dim, embed_dim, num_heads, ff_dim_base, dropout, ff_dropout, activation_function, num_ff_layers, use_batchnorm, use_layernorm):
          super(FraudDetectionTransformer, self).__init__()

          self.embedding = nn.Linear(input_dim, embed_dim)  # Embedding layer
          self.use_batchnorm = use_batchnorm # Store whether to use batchnorm

          self.batch_norm = nn.BatchNorm1d(input_dim) if self.use_batchnorm else None # Batch Normalization before embedding

          # Store the activation function
          self.activation_function = activation_function

          # Multi-head Attention Layer
          self.multihead_attn = nn.MultiheadAttention(embed_dim, num_heads, dropout=dropout)

          # Dynamically create feedforward network in __init__
          ff_network = []
          for i in range(num_ff_layers):
              ff_dim = ff_dim_base * (2 ** i)
              ff_network.extend([
                  nn.Linear(embed_dim if i == 0 else ff_dim // 2, ff_dim),
                  self.get_activation_function(),
                  nn.Dropout(ff_dropout)
              ])

          self.ff_network = nn.Sequential(*ff_network)  # Store the created network

          # Layer Normalization after the dynamic feedforward network
          self.layer_norm = nn.LayerNorm(embed_dim) if use_layernorm else None

          self.pooling = nn.AdaptiveAvgPool1d(1)  # Global average pooling
          self.fc = nn.Linear(embed_dim, 1)  # Fully connected layer for classification
          self.sigmoid = nn.Sigmoid()  # Sigmoid activation for binary classification


      def forward(self, x):
          # Apply batch normalization if specified
          if self.use_batchnorm:
              x = self.batch_norm(x)

          # Apply embedding layer
          embedded_x = self.embedding(x)

          # Add sequence dimension for the transformer
          embedded_x = embedded_x.unsqueeze(1)

          # Transpose for multi-head attention (sequence_length, batch_size, embed_dim)
          embedded_x = embedded_x.permute(1, 0, 2)

          # Pass through multi-head attention and add residual connection
          attn_output, _ = self.multihead_attn(embedded_x, embedded_x, embedded_x)
          attn_output = attn_output + embedded_x

          # Pass through feedforward layers with residual connection
          transformer_output = self.ff_network(attn_output.permute(1, 0, 2))  # (batch_size, sequence_length, embed_dim)
          transformer_output = transformer_output + attn_output.permute(1, 0, 2)

          # Layer normalization if specified
          if self.layer_norm:
              transformer_output = self.layer_norm(transformer_output)

          # Global pooling over the sequence length dimension
          pooled_output = self.pooling(transformer_output.permute(0, 2, 1)).squeeze()

          # Final fully connected layer and sigmoid activation
          fc_output = self.fc(pooled_output)
          return self.sigmoid(fc_output)


# **Training Function with Early Stopping**
def train_model_with_early_stopping(model, train_loader, val_loader, epochs, lr, patience):

    model = model.to(device) #switch to GPU
    optimizer = torch.optim.Adam(model.parameters(), lr=lr) #select Adam for optimization
    criterion = nn.BCELoss() #set loss funct

    #history
    train_losses = []
    val_losses = []
    best_val_loss = float('inf')
    patience_counter = 0
    best_model = None


    for epoch in range(epochs):
        model.train() #set model to training mode
        train_loss = 0.0

        for inputs, labels in train_loader:
            optimizer.zero_grad()
            outputs = model(inputs).squeeze()
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
        train_losses.append(train_loss / len(train_loader))

        # Validation phase
        model.eval()
        val_loss = 0.0

        with torch.no_grad(): #no gradient needed for val phase
            for inputs, labels in val_loader: #unpack data
                outputs = model(inputs).squeeze() #squeeze removes unecessary dimensions of tensors e.g. [batch_size,1] -> [batch_size]
                loss = criterion(outputs, labels)
                val_loss += loss.item()
        val_losses.append(val_loss / len(val_loader)) #avg loss

        # Early stopping check
        if val_losses[-1] < best_val_loss:
            best_val_loss = val_losses[-1]
            best_model = model.state_dict()  # Save the best model
            patience_counter = 0
        else:
            patience_counter += 1
            if patience_counter >= patience:
                print("Early stopping triggered.")
                break

        print(f"Epoch {epoch + 1}/{epochs} - Train Loss: {train_losses[-1]:.4f} - Val Loss: {val_losses[-1]:.4f}")

    model.load_state_dict(best_model)  # Load the best model before returning
    return train_losses, val_losses, best_model

# **Validation Metrics Calculation**
def evaluate(model, data_loader):
    model.eval() #set to eval mode
    y_true = []
    y_pred = []

    with torch.no_grad():
        for inputs, labels in data_loader:
            outputs = model(inputs).squeeze()
            y_true.extend(labels.cpu().numpy()) #extend is like appending for multiple elements at once
            y_pred.extend(outputs.cpu().numpy())

    # Convert predictions to binary format for accuracy and hit rate
    y_pred_binary = [1 if pred >= 0.5 else 0 for pred in y_pred]

    # Metrics
    accuracy = accuracy_score(y_true, y_pred_binary)
    hit_rate = np.sum(np.logical_and(np.array(y_true) == 1, np.array(y_pred_binary) == 1)) / np.sum(np.array(y_true) == 1)
    roc_auc = roc_auc_score(y_true, y_pred)

    return accuracy, hit_rate, roc_auc

# **Plotting Function**
def plot_losses(train_losses, val_losses):
    plt.figure(figsize=(10, 6))
    plt.plot(train_losses, label="Training Loss")
    plt.plot(val_losses, label="Validation Loss")
    plt.xlabel("Epochs")
    plt.ylabel("Loss")
    plt.title("Training and Validation Loss")
    plt.legend()
    plt.show()
import torch
import optuna
from sklearn.model_selection import KFold
from torch.utils.data import DataLoader
import torch.nn as nn

# Modify the objective function to save the best model
def objective(trial):
    # Define hyperparameters to tune
    num_heads = trial.suggest_int("num_heads", low=4, high=8, step=2)
    embed_dim = trial.suggest_int("embed_dim", low=64, high=512, step=num_heads)
    ff_dim_base = trial.suggest_int("ff_dim_base", low=64, high=512, step=64)
    dropout = trial.suggest_float("dropout", low=0.1, high=0.5, step=0.1)
    ff_dropout = trial.suggest_float("ff_dropout", low=0.1, high=0.5, step=0.1)
    activation_function = trial.suggest_categorical("activation_function", ["relu", "tanh", "sigmoid"])
    num_ff_layers = trial.suggest_int("num_ff_layers", low=1, high=3, step=1)
    use_batchnorm = trial.suggest_categorical("use_batchnorm", [True, False])
    use_layernorm = trial.suggest_categorical("use_layernorm", [True, False])
    batch_size = trial.suggest_int("batch_size", 64, 512, step=64)

    # Print the hyperparameters being trialed
    print(f"Trial {trial.number}:")
    print(f"  num_heads: {num_heads}")
    print(f"  embed_dim: {embed_dim}")
    print(f"  ff_dim_base: {ff_dim_base}")
    print(f"  dropout: {dropout}")
    print(f"  ff_dropout: {ff_dropout}")
    print(f"  activation_function: {activation_function}")
    print(f"  num_ff_layers: {num_ff_layers}")
    print(f"  use_batchnorm: {use_batchnorm}")
    print(f"  use_layernorm: {use_layernorm}")
    print(f"  batch_size: {batch_size}")
    print("-" * 50)

    # K-Fold Cross Validation
    kf = KFold(n_splits=5, shuffle=True, random_state=42)
    fold_scores = []
    best_val_score = -float('inf')
    best_model_state_dict = None

    for train_index, val_index in kf.split(X):
        X_train, X_val = X.iloc[train_index], X.iloc[val_index]
        y_train, y_val = y.iloc[train_index], y.iloc[val_index]

        # Preprocess the data
        transform = DataPreprocessingTransform()
        X_train_scaled = transform.fit_transform(X_train)
        X_val_scaled = transform.transform(X_val)

        # Remove outliers
        X_train_filtered = X_train_scaled.apply(remove_outliers)
        X_val_filtered = X_val_scaled.apply(remove_outliers)

        # Create PyTorch datasets
        train_dataset = CustomDataset(X_train_filtered, y_train, resample=True)
        val_dataset = CustomDataset(X_val_filtered, y_val)

        # Create DataLoaders
        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
        val_loader = DataLoader(val_dataset, batch_size=batch_size)

        # Initialize the model, loss function, and optimizer
        model = FraudDetectionTransformer(
            input_dim=input_dim,
            embed_dim=embed_dim,
            num_heads=num_heads,
            ff_dim_base=ff_dim_base,
            dropout=dropout,
            ff_dropout=ff_dropout,
            activation_function=activation_function,
            num_ff_layers=num_ff_layers,
            use_batchnorm=use_batchnorm,
            use_layernorm=use_layernorm
        ).to(device)

        optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
        criterion = nn.BCELoss()

        # Train the model
        train_model(model, train_loader, val_loader, optimizer, criterion)
        val_score = evaluate_model(model, val_loader)

        # Keep track of the best model
        if val_score > best_val_score:
            best_val_score = val_score
            best_model_state_dict = model.state_dict()

        fold_scores.append(val_score)

    avg_fold_score = sum(fold_scores) / len(fold_scores)

    # Save the best model after all folds
    if best_model_state_dict is not None:
        torch.save(best_model_state_dict, 'best_model.pth')

    return avg_fold_score


# **Validation Metrics**
accuracy, hit_rate, roc_auc = evaluate(model, val_loader)
print(f"Validation Accuracy: {accuracy:.4f}")
print(f"Validation Hit Rate: {hit_rate:.4f}")
print(f"Validation ROC AUC: {roc_auc:.4f}")


#=============================================== Test on Test Set ===============================================#

# Load the best model
best_model = FraudDetectionTransformer(
    input_dim=input_dim,
    embed_dim=best_embed_dim,  # Use the best values from the Optuna study
    num_heads=best_num_heads,
    ff_dim_base=best_ff_dim_base,
    dropout=best_dropout,
    ff_dropout=best_ff_dropout,
    activation_function=best_activation_function,
    num_ff_layers=best_num_ff_layers,
    use_batchnorm=best_use_batchnorm,
    use_layernorm=best_use_layernorm
).to(device)

best_model.load_state_dict(torch.load('best_model.pth'))

# Evaluate on test set
test_loader = DataLoader(test_dataset, batch_size=32)
test_accuracy, test_hit_rate, test_roc_auc = evaluate(best_model, test_loader)
print(f"Test Accuracy: {test_accuracy:.4f}")
print(f"Test Hit Rate: {test_hit_rate:.4f}")
print(f"Test ROC AUC: {test_roc_auc:.4f}")




---

# Auto commit to github

In [None]:
import datetime
import os

# Navigate to the repository directory (if not already there)
%cd /content/drive/MyDrive/Colab_Notebooks/Deep_Learning_Practice

with open('/content/drive/MyDrive/IAM/PAT.txt', 'r') as file:
      github_pat = file.read().strip()
os.environ['GITHUB_PAT'] = github_pat

!git remote add origin "https://github.com/archiegoodman2/machine_learning_practice"

# Replace with your actual username and email (or configure globally)
USERNAME="archiegoodman2"
EMAIL="archiegoodman2011@gmail.com"

# Set global username and email configuration
!git config --global user.name "$USERNAME"
!git config --global user.email "$EMAIL"

now = datetime.datetime.now()
current_datetime = now.strftime("%Y-%m-%d %H:%M")

# Set remote URL using the PAT from environment variable
!git remote set-url origin https://{os.environ['GITHUB_PAT']}@github.com/archiegoodman2/machine_learning_practice.git

# Replace with your desired commit message
COMMIT_MESSAGE = str(current_datetime) + " " + " began code for HP tuning "

# Stage all changes
!git add .

# Commit the changes
!git commit -m "$COMMIT_MESSAGE"

# Push to origin
!git push origin master


/content/drive/MyDrive/Colab_Notebooks/Deep_Learning_Practice
error: remote origin already exists.
