In [None]:
from notebooks.evaluation_prototype import evaluate_nn_model
from notebooks.evaluation_prototype import evaluate_baseline
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.naive_bayes import BernoulliNB
from sklearn.tree import DecisionTreeClassifier
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier

# Create and train the baseline model and then validate their performance

def train_log_model(df, max_iter, X_train, X_val, y_train, y_val):
    model = LogisticRegression(max_iter=max_iter)
    model.fit(X_train, y_train)
    evaluate_baseline(model, 'Logistic Regression', X_val, y_val)
    return model

def train_svm(df, kernel, C, X_train, X_val, y_train, y_val, probability=True):
    model = SVC(kernel=kernel, C=C)
    model.fit(X_train, y_train)
    evaluate_baseline(model, 'SVM', X_val, y_val)
    return model

def train_naive_bayes(df, X_train, X_val, y_train, y_val):
    model = BernoulliNB()
    model.fit(X_train, y_train)
    evaluate_baseline(model, 'Naive Bayes', X_val, y_val) 
    return model

def train_knn(df, n_neighbors, X_train, X_val, y_train, y_val):
    model = KNeighborsClassifier(n_neighbors=n_neighbors)
    model.fit(X_train, y_train)
    evaluate_baseline(model, 'k-NN', X_val, y_val)
    return model

def train_decision_tree(df, max_depth, X_train, X_val, y_train, y_val):
    model = DecisionTreeClassifier(max_depth=max_depth, random_state=42)
    model.fit(X_train, y_train)
    evaluate_baseline(model, 'Decision Tree', X_val, y_val)
    return model

def train_random_forest(df, n_estimators, max_depth, X_train, X_val, y_train, y_val):
    model = RandomForestClassifier(n_estimators=n_estimators, max_depth=max_depth, random_state=42)
    model.fit(X_train, y_train)
    evaluate_baseline(model, 'Random Forest', X_val, y_val)
    return model




In [3]:
from sklearn.datasets import load_digits
import torch
from torch.utils.data import DataLoader, TensorDataset
    
   # Convert to tensors
def to_tensor(X_train, X_val, X_test, y_train, y_val, y_test):
    X_train = torch.tensor(X_train, dtype=torch.float32)
    X_val = torch.tensor(X_val, dtype=torch.float32)
    X_test = torch.tensor(X_test, dtype=torch.float32)
    y_train = torch.tensor(y_train, dtype=torch.float32)
    y_val = torch.tensor(y_val, dtype=torch.float32)
    y_test = torch.tensor(y_test, dtype=torch.float32)
 
    print(X_train.shape)
    print(y_train.shape)
    
    return X_train, X_val, X_test, y_train, y_val, y_test

def create_loaders(df, X_train, X_val, X_test, y_train, y_val, y_test, batch_size):
    
    # Create tensor datasets
    train_dataset = TensorDataset(X_train, y_train)
    validation_dataset = TensorDataset(X_val, y_val)
    test_dataset = TensorDataset(X_test, y_test)
    
    # Create data loaders (shuffle training set)
    train_loader = DataLoader(train_dataset, batch_size = batch_size, shuffle=True, drop_last = True)
    validation_loader = DataLoader(validation_dataset, batch_size=batch_size, shuffle = False)
    test_loader = DataLoader(test_dataset, batch_size = batch_size, shuffle=False)
    
    return train_loader, validation_loader, test_loader


In [8]:
import torch.nn as nn
import torch.nn.functional as F

activation_functions = {
    "relu": nn.ReLU(),
    "leaky_relu": nn.LeakyReLU(),
    "sigmoid": nn.Sigmoid(),
    "tanh": nn.Tanh(),
    "softmax": nn.Softmax(dim=1),
    "elu": nn.ELU(),
    "p_relu": nn.PReLU(),
    "rrelu": nn.RReLU(),
    "log_sigmoid": nn.LogSigmoid,
    "silu": nn.SiLU()
}

class FraudClassifier(nn.Module):
    def __init__(self, input_size, output_size, hidden_layers, activations):
        super(FraudClassifier, self).__init__()
        # Input validation
        if len(hidden_layers) != len(activations):
            raise ValueError("Each hidden layer must have a corresponding activation function")

        self.layers = nn.ModuleList()
        self.activations = []

        prev_size = input_size
        for size, act in zip(hidden_layers, activations):
            self.layers.append(nn.Linear(prev_size, size))
            self.activations.append(activation_functions[act.lower()])
            prev_size = size

        # Output layer (1 node for binary classification)
        self.output_layer = nn.Linear(prev_size, output_size)

    def forward(self, x):
        for layer, activation in zip(self.layers, self.activations):
            x = activation(layer(x))
        x = self.output_layer(x)  # raw logits
        return x.view(-1)  # flatten for BCEWithLogitsLoss

In [5]:
from tqdm import tqdm

# Method for training the nn model for nth times based on the number of epochs specified 
def train_neural_network(model, train_loader, loss_function, optimizer, device, threshold, epochs):
    # lists to store the loss and accuracy of each epoch
    loss_history = []
    accuracy_history = []
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)

    # loop through each epoch
    for epoch in tqdm(range(epochs), desc="Training", unit="epoch"):
        model.train() # Train the model
        running_loss = 0.0 # Accumulate loss over each epoch
        correct = 0 # Count of correct prediction
        total = 0 # Total number of samples seen

        # Iterate over batches from the training data loader
        for batch_x, batch_y in train_loader:
            batch_x = batch_x.to(device)
            batch_y = batch_y.to(device).view(-1)
            optimizer.zero_grad() # Reset gradients
            logits = model(batch_x).view(-1) # Forward Pass
            loss = loss_function(logits, batch_y) # Compute loss
            loss.backward() # Backpropagation
            optimizer.step() # Update weights

            running_loss += loss.item() * batch_x.size(0) # Accumulate weighted batch loss
            preds = (torch.sigmoid(logits) >= threshold).float() # Get predicted classes
            correct += (preds == batch_y).sum().item() # Count correct predictions
            total += batch_y.size(0) # Update total samples

        # Compute average loss and accuracy for the epoch
        epoch_loss = running_loss / total
        epoch_accuracy = correct / total
        loss_history.append(epoch_loss)
        accuracy_history.append(epoch_accuracy)

        

        print(f"Epoch {epoch+1}: Loss = {epoch_loss:.4f}, Accuracy = {epoch_accuracy:.4f}")

    return model


In [None]:
from torch import nn, optim

optimizer_options = ["adam", "sgd", "momentum", "nesterov", "adagrad", "adadelta", "rmsprop"]

def perform(df, train_loader, test_loader, input_size, output_size, hidden_layers, activations, pos_weight, threshold, epochs, optimizer_index, lr, momentum):

    # Define model, loss function, and optimizer
    # Instantiate model, loss, and optimizer
    model = FraudClassifier(input_size=input_size, output_size=output_size, hidden_layers=hidden_layers, activations=activations)
    
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    # Get optimizer name from list
    optimizer_name = optimizer_options[optimizer_index].lower()

    # Select Optimiser
    if optimizer_name == "adam":
        optimizer = optim.Adam(model.parameters(), lr=lr)
    elif optimizer_name == "sgd":
        optimizer = optim.SGD(model.parameters(), lr=lr)
    elif optimizer_name == "momentum":
        optimizer = optim.SGD(model.parameters(), lr=lr, momentum=momentum)
    elif optimizer_name == "nesterov":
        optimizer = optim.SGD(model.parameters(), lr=lr, momentum=momentum, nesterov=True)
    elif optimizer_name == "adagrad":
        optimizer = optim.Adagrad(model.parameters(), lr=lr)
    elif optimizer_name == "adadelta":
        optimizer = optim.Adadelta(model.parameters(), lr=lr)
    elif optimizer_name == "rmsprop":
        optimizer = optim.RMSprop(model.parameters(), lr=lr, momentum=momentum)
    else:
        raise ValueError(f"Unknown optimizer: {optimizer_name}")
    
    # Compute pos_weight as a Tensor
    pos_weight_tensor = torch.tensor([pos_weight], dtype=torch.float).to(device)
    
    # Define the weighted loss
    loss_function = nn.BCEWithLogitsLoss(pos_weight=pos_weight_tensor)
    
    # Train
    fraud_nn = train_neural_network(
        model,
        train_loader,
        loss_function,
        optimizer,
        device,
        threshold,
        epochs
    )
    
    # Evaluate on test set
    fraud_nn.eval()
    correct = 0
    total = 0
    
    evaluate_nn_model(model, test_loader, device, threshold)

    return model