# MLP with GA hyperparameter tuning
---

In [None]:
from typing import List, Dict, Tuple
import pandas as pd
import numpy as np
import random

from deap import base, creator, tools, algorithms

import torch
from torch import nn, optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, random_split
import torchvision
import torchvision.transforms as transforms

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, roc_auc_score, confusion_matrix, roc_curve

import multiprocessing
from multiprocessing.pool import ThreadPool

import matplotlib.pyplot as plt
import seaborn as sns

import pickle

import sys
sys.path.append("../")

from config import config

## Basic Config
---

In [None]:
torch.manual_seed(config.SEED)

# Load Data
---

In [None]:
# Define transformations to apply to the images
# Convert to pytorch tensor and normalize
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)) # Normalize for 3 channel image 
])

In [None]:
trainset = torchvision.datasets.CIFAR10(
    root="../data",
    train=True,
    transform=transform,
    download=True
)
len(trainset)

In [None]:
testset = torchvision.datasets.CIFAR10(
    root="../data",
    train=False,
    transform=transform,
    download=True    
)
len(testset)

In [None]:
# image shape and classes
img, label = trainset[0]

print(img.shape, label)
print(trainset.classes)

In [None]:
def build_dataloaders(trainset, testset, batch_size: int, seed: int) -> Tuple[DataLoader, DataLoader, DataLoader]:
    train_size = int(0.8 * len(trainset))
    val_size = len(trainset) - train_size
    train_subset, val_subset = random_split(trainset, [train_size, val_size], generator=torch.Generator().manual_seed(seed))
    
    train_loader = DataLoader(train_subset, batch_size=batch_size, shuffle=True, num_workers=2, generator=torch.Generator().manual_seed(seed))
    val_loader = DataLoader(val_subset, batch_size=batch_size, shuffle=False, num_workers=2, generator=torch.Generator().manual_seed(seed))
    test_loader = DataLoader(testset, batch_size=batch_size, shuffle=False, num_workers=2, generator=torch.Generator().manual_seed(seed))

    return train_loader, val_loader, test_loader

## MLP
---

In [None]:
class MLP(nn.Module):
    def __init__(self, input_dim: int, hidden_dim: int, output_dim: int, dropout_p: float, hidden_layers, activation_functions: list):
        super().__init__()
        self.layers = nn.ModuleList()

        # Input layer
        self.layers.append(nn.Linear(input_dim, hidden_dim))
        self.layers.append(activation_functions[0]())
        self.layers.append(nn.Dropout(dropout_p))

        # Hidden layers
        for i in range(hidden_layers - 1):
            self.layers.append(nn.Linear(hidden_dim // (2 ** i), hidden_dim // (2 ** (i + 1))))
            self.layers.append(activation_functions[i + 1]())
            self.layers.append(nn.Dropout(dropout_p))
        
        # Output layer
        self.layers.append(nn.Linear(hidden_dim // (2 ** (hidden_layers - 1)), output_dim))
    
    def forward(self, x) -> torch.Tensor:
        for layer in self.layers:
            x = layer(x)
        return x
    
    def get_params(self) -> Tuple:
        return tuple(layer for layer in self.layers)

In [None]:
class CNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=32 * 2, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(in_channels=32 * 2, out_channels=64 * 2, kernel_size=3, padding=1)
        self.conv3 = nn.Conv2d(in_channels=64 * 2, out_channels=64 * 2, kernel_size=3, padding=1)
        self.conv4 = nn.Conv2d(in_channels=64 * 2, out_channels=128 * 2, kernel_size=3, padding=1)
        self.conv5 = nn.Conv2d(in_channels=128 * 2, out_channels=128 * 2, kernel_size=3, padding=1)
        self.conv6 = nn.Conv2d(in_channels=128 * 2, out_channels=128 * 2, kernel_size=3, padding=1)
        self.conv7 = nn.Conv2d(in_channels=128 * 2, out_channels=256 * 2, kernel_size=3, padding=1)
        self.conv8 = nn.Conv2d(in_channels=256 * 2, out_channels=256 * 2, kernel_size=3, padding=1)
        self.conv9 = nn.Conv2d(in_channels=256 * 2, out_channels=256 * 2, kernel_size=3, padding=1)

        self.bn1 = nn.BatchNorm2d(32 * 2)
        self.bn2 = nn.BatchNorm2d(128 * 2)
        self.bn3 = nn.BatchNorm2d(256 * 2)

        self.maxpool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.dropout = nn.Dropout(0.2)

        self.fc1 = nn.Linear(4096 * 2, 4096 * 2)
        self.fc2 = nn.Linear(4096 * 2, 2048 * 2)
        self.fc3 = nn.Linear(2048 * 2, 10)
        self.relu = nn.ReLU()

    def forward(self, x):

        x = self.relu(self.bn1(self.conv1(x)))
        x = self.relu(self.conv2(x))
        x = self.relu(self.conv3(x))
        x = self.maxpool(x)

        x = self.relu(self.bn2(self.conv4(x)))
        x = self.relu(self.conv5(x))
        x = self.relu(self.conv6(x))
        x = self.maxpool(x)
        x = self.dropout(x)

        x = self.relu(self.bn3(self.conv7(x)))
        x = self.relu(self.conv8(x))
        x = self.relu(self.conv9(x))
        x = self.maxpool(x)
        x = self.dropout(x)

        x = torch.flatten(x, start_dim=1)
        x = self.relu(self.fc1(x))
        x = self.relu(self.fc2(x))
        x = self.dropout(x)
        x = self.fc3(x)
        return x

In [None]:
model = CNN().to(config.DEVICE)

In [None]:
def calculate_metrics(logits: torch.Tensor, labels: torch.Tensor) -> dict:
    predicted = torch.argmax(logits, dim=1).cpu()
    accuracy = (predicted.eq(labels).sum().item()) / labels.shape[0]
    return {"accuracy": accuracy}

In [None]:
@torch.no_grad()
def evaluate(model: nn.Module, loader: DataLoader, criterion) -> Tuple[float, float]:
    """
    Evaluate the model on the given DataLoader. Returns the average loss and accuracy.
    """
    model.eval()
    total_loss, total_acc, n = 0.0, 0.0, 0
    for x, y in loader:
        x, y = x.to(config.DEVICE), y.to(config.DEVICE)
        logits = model(x)           # Forward pass
        loss = criterion(logits, y) # Compute loss
        metrics = calculate_metrics(logits, y.cpu())

        batch_size = x.size(0)
        total_loss += loss.item() * batch_size
        total_acc += metrics["accuracy"] * batch_size
        n += batch_size
    return total_loss / n, total_acc / n

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-5)

In [None]:
train_loader, val_loader, test_loader = build_dataloaders(trainset, testset, 16, config.SEED)

# Training loop
best_val_acc = 0.0
patience = 10
patience_counter = 0

train_losses = []
train_accuracies = []
val_losses = []
val_accuracies = []

for epoch in range(config.EPOCHS):
    model.train()
    train_loss = 0.0
    correct_train = 0
    total_train = 0
    for x, y in train_loader:
        x, y = x.to(config.DEVICE), y.to(config.DEVICE)
        logits = model(x)
        loss = criterion(logits, y)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        train_loss += loss.item()
        predicted = torch.argmax(logits, dim=1).cpu()
        total_train += y.size(0)
        correct_train += (predicted == y.cpu()).sum().item()
    
    train_losses.append(train_loss / len(train_loader))
    train_accuracies.append(correct_train / total_train)

    val_loss, val_acc = evaluate(model, val_loader, criterion)
    val_losses.append(val_loss)
    val_accuracies.append(val_acc)
    print(f"Epoch {epoch+1}/{config.EPOCHS} - Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")
    
    if val_acc > best_val_acc:
        best_val_acc = val_acc
        best_epoch = epoch + 1
        patience_counter = 0
    else:
        patience_counter += 1
        if patience_counter >= patience:
            print(f"Early stopping triggered at epoch {epoch+1}")
            break

print(f"\nBest validation accuracy: {best_val_acc:.4f}")

In [None]:
test_loss, test_acc = evaluate(model, test_loader, criterion)
print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.4f}")

## Genetic Algorithm Hyperparameter Optimization (DEAP)
---

In [None]:
creator.create("FitnessMax", base.Fitness, weights=(1.0,))
creator.create("Individual", dict, fitness=creator.FitnessMax)

In [None]:
toolbox = base.Toolbox()

# Attribute generator
toolbox.register("attr_batch_size", random.choice, [16, 32, 64, 128, 256, 512, 1024])
toolbox.register("attr_hidden_size", random.choice, [16, 32, 64, 128, 256])
toolbox.register("attr_hidden_layers", random.randint, 1, 5)
toolbox.register("attr_activation_function", random.choice, [nn.Tanh, nn.ReLU, nn.LeakyReLU])
toolbox.register("attr_dropout_p", random.uniform, 0.0, 0.3)
toolbox.register("attr_optimizer", random.choice, [optim.SGD, optim.Adam, optim.RMSprop])
toolbox.register("attr_lr", random.choice, [0.001, 0.01, 0.005, 0.05, 0.003, 0.03])
toolbox.register("attr_weight_decay", random.choice, [0.0, 1e-5, 1e-4, 1e-3])

In [None]:
optim.RMSprop()

In [None]:
def create_individual():
    layers = toolbox.attr_hidden_layers()
    return {
        "batch_size": toolbox.attr_batch_size(),
        "hidden_size": toolbox.attr_hidden_size(),
        "hidden_layers": layers,
        "activation_functions": [toolbox.attr_activation_function() for _ in range(layers)],
        "dropout_p": toolbox.attr_dropout_p(),
        "optimizer": toolbox.attr_optimizer(),
        "lr": toolbox.attr_lr(),
        "weight_decay": toolbox.attr_weight_decay()
    }

In [None]:
create_individual()

In [None]:
# Structure initializers
toolbox.register("individual", tools.initIterate, creator.Individual, create_individual)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)

In [None]:
def eval_mlp(individual: creator.Individual) -> Tuple[float]:
    hidden_size = individual["hidden_size"]
    hidden_layers = individual["hidden_layers"]
    activation_functions = individual["activation_functions"]
    dropout_p = individual["dropout_p"]
    batch_size = individual["batch_size"]
    optimizer = individual["optimizer"]
    lr = individual["lr"]
    weight_decay = individual["weight_decay"]

    # Instantiate model
    model = MLP(
        input_dim=config.INPUT_DIM,
        hidden_dim=hidden_size,
        output_dim=config.OUTPUT_DIM,
        dropout_p=dropout_p,
        hidden_layers=hidden_layers,
        activation_functions=activation_functions
    ).to(config.DEVICE)

    criterion = nn.BCEWithLogitsLoss()
    optim = optimizer(model.parameters(), lr=lr, weight_decay=weight_decay)

    train_loader, val_loader, _ = build_dataloaders(batch_size, config.SEED)

    # Training loop
    best_val_acc = 0.0
    patience = 5
    patience_counter = 0
    for epoch in range(config.EPOCHS):
        model.train()
        for x, y in train_loader:
            x, y = x.to(config.DEVICE), y.to(config.DEVICE)
            logits = model(x)
            loss = criterion(logits, y)
            optim.zero_grad()
            loss.backward()
            optim.step()

        val_loss, val_acc = evaluate(model, val_loader, criterion)
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            patience_counter = 0
        else:
            patience_counter += 1
            if patience_counter >= patience:
                break

    return (best_val_acc,)

In [None]:
def mutate(individual: creator.Individual, indpb: float) -> Tuple[creator.Individual]:
    if random.uniform(0.0, 1.0) < indpb:
        individual["batch_size"] = toolbox.attr_batch_size()
    if random.uniform(0.0, 1.0) < indpb:
        individual["hidden_size"] = toolbox.attr_hidden_size()
    if random.uniform(0.0, 1.0) < indpb:
        individual["hidden_layers"] = toolbox.attr_hidden_layers()
    if random.uniform(0.0, 1.0) < indpb:
        individual["dropout_p"] = toolbox.attr_dropout_p()
    if random.uniform(0.0, 1.0) < indpb:
        individual["optimizer"] = toolbox.attr_optimizer()
    if random.uniform(0.0, 1.0) < indpb:
        individual["lr"] = toolbox.attr_lr()
    if random.uniform(0.0, 1.0) < indpb:
        individual["weight_decay"] = toolbox.attr_weight_decay()

    for af in range(len(individual["activation_functions"])):
        if random.uniform(0.0, 1.0) < indpb:
            individual["activation_functions"][af] = toolbox.attr_activation_function()

    while individual["hidden_layers"] < len(individual["activation_functions"]):
        individual["activation_functions"].pop()

    while individual["hidden_layers"] > len(individual["activation_functions"]):
        individual["activation_functions"].append(toolbox.attr_activation_function())

    return (individual,)

In [None]:
def crossover(ind1: creator.Individual, ind2: creator.Individual) -> Tuple[creator.Individual]:
    attributes = list(ind1.keys())
    attribute = random.choice(attributes)
    ind1[attribute], ind2[attribute] = ind2[attribute], ind1[attribute]

    # Normalize activation_functions to match hidden_layers for both inds
    for ind in (ind1, ind2):
        while ind["hidden_layers"] < len(ind["activation_functions"]):
            ind["activation_functions"].pop()
        while ind["hidden_layers"] > len(ind["activation_functions"]):
            ind["activation_functions"].append(toolbox.attr_activation_function())

    return ind1, ind2

In [None]:
def select(population: List[creator.Individual], k: int, tournsize: int) -> List[creator.Individual]:
    selected = []
    for _ in range(k):
        aspirants = random.sample(population, tournsize)
        best = max(aspirants, key=lambda ind: ind.fitness.values)
        selected.append(best)
    return selected

In [None]:
# Operators
toolbox.register("evaluate", eval_mlp)
toolbox.register("mate", crossover)
toolbox.register("mutate", mutate, indpb=config.MUTATION_PROB)
toolbox.register("select", select, tournsize=3)

In [None]:
def GA() -> Tuple[List[creator.Individual], tools.Logbook]:
    pool = ThreadPool(multiprocessing.cpu_count())
    toolbox.register("map", pool.map)
    
    population = toolbox.population(n=config.POPULATION_SIZE)
    hof = tools.HallOfFame(1)

    stats = tools.Statistics(lambda ind: ind.fitness.values)
    stats.register("avg", np.mean)
    stats.register("std", np.std)
    stats.register("min", np.min)
    stats.register("max", np.max)

    population, logbook = algorithms.eaSimple(
        population,
        toolbox,
        cxpb=config.CROSSOVER_PROB,
        mutpb=config.MUTATION_PROB,
        ngen=config.NUMBER_OF_GENERATIONS,
        stats=stats,
        halloffame=hof,
        verbose=True,
    )

    pool.close()
    pool.join()

    return population, logbook

In [None]:
if __name__ == "__main__":
    pop, log = GA()

In [None]:
best_individual = tools.selBest(pop, 1)[0]
best_individual

In [None]:
best_individual.fitness.values[0]

In [None]:
gen = log.select("gen")
fit_avg = log.select("avg")
fit_max = log.select("max")

plt.plot(gen, fit_avg, label="Average Fitness")
plt.plot(gen, fit_max, label="Max Fitness")
plt.xlabel("Generation")
plt.ylabel("Fitness")
plt.title("Fitness over Generations")
plt.legend()
plt.show()

In [None]:
pop_file = "../results/final_pop.pkl"
logbook_file = "../results/logbook.pkl"

In [None]:
# Save
with open(pop_file, "wb") as f:
    pickle.dump(pop, f)

with open(logbook_file, "wb") as f:
    pickle.dump(log, f)

In [None]:
# Load
with open(pop_file, "rb") as f:
    pop = pickle.load(f)

with open(logbook_file, "rb") as f:
    logbook = pickle.load(f)

In [None]:
batch_size = best_individual["batch_size"]
hidden_size = best_individual["hidden_size"]
hidden_layers = best_individual["hidden_layers"]
activation_functions = best_individual["activation_functions"]
dropout_p = best_individual["dropout_p"]
optimizer = best_individual["optimizer"]
lr = best_individual["lr"]
weight_decay = best_individual["weight_decay"]

train_loader, val_loader, test_loader = build_dataloaders(batch_size, config.SEED)

model = MLP(
    input_dim=config.INPUT_DIM,
    hidden_dim=hidden_size,
    output_dim=config.OUTPUT_DIM,
    dropout_p=dropout_p,
    hidden_layers=hidden_layers,
    activation_functions=activation_functions,
).to(config.DEVICE)

criterion = nn.BCEWithLogitsLoss()
optim = optimizer(model.parameters(), lr=lr, weight_decay=weight_decay)

In [None]:
best_val_acc = 0.0
patience = 10
patience_counter = 0
history = {"val_loss": [], "val_acc": []}
for epoch in range(config.EPOCHS):
    model.train()
    for x, y in train_loader:
        x, y = x.to(config.DEVICE), y.to(config.DEVICE)
        logits = model(x)
        loss = criterion(logits, y)
        optim.zero_grad()
        loss.backward()
        optim.step()

    val_loss, val_acc = evaluate(model, val_loader, criterion)
    history["val_loss"].append(val_loss)
    history["val_acc"].append(val_acc)

    print(f"Epoch {epoch+1}/{config.EPOCHS} - Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")

    if val_acc > best_val_acc:
        best_val_acc = val_acc
        patience_counter = 0
    else:
        patience_counter += 1
        if patience_counter >= patience:
            break

In [None]:
model.eval()
test_loss, test_acc = evaluate(model, test_loader, criterion)

y_true = []
y_prob = []
with torch.no_grad():
    for x, y in test_loader:
        x = x.to(config.DEVICE)
        logits = model(x)
        probs = torch.sigmoid(logits).squeeze(1).cpu().numpy()
        y_prob.extend(probs.tolist())
        y_true.extend(y.squeeze(1).numpy().tolist())

y_true = np.array(y_true).astype(int)
y_prob = np.array(y_prob)
y_pred = (y_prob >= 0.5).astype(int)

acc = accuracy_score(y_true, y_pred)
prec, rec, f1, _ = precision_recall_fscore_support(y_true, y_pred, average="binary", zero_division=0)
try:
    auc = roc_auc_score(y_true, y_prob)
except ValueError:
    auc = float("nan")
cm = confusion_matrix(y_true, y_pred)

print({
    "test_loss": round(test_loss, 4),
    "test_acc": round(acc, 4),
    "precision": round(prec, 4),
    "recall": round(rec, 4),
    "f1": round(f1, 4),
    "roc_auc": round(auc, 4) if not np.isnan(auc) else auc,
})

In [None]:
plt.figure(figsize=(6, 5))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", cbar=False)
plt.xlabel("Predicted Label")
plt.ylabel("True Label")
plt.title("Confusion Matrix")
plt.show()

In [None]:
fpr, tpr, _ = roc_curve(y_true, y_prob)
plt.figure()
plt.plot(fpr, tpr, label=f"ROC AUC = {auc:.3f}")
plt.plot([0, 1], [0, 1], "r--")
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("ROC Curve")
plt.legend()
plt.show()