<a href="https://colab.research.google.com/github/BENfrost-20/copilot-codespaces-vscode/blob/main/Cross_Validation_and_Tuning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Artificial Neural Networks and Deep Learning**

---

## **Lecture 3: Cross Validation and Tuning**

<img src="https://drive.google.com/uc?export=view&id=13uEsx8hOr-fJQ7EtjQgDynZ8QvHuTTBb" width="500"/>



## 🌐 **Google Drive Connection**

## ⚙️ **Libraries Import**

In [None]:
# Set seed for reproducibility
SEED = 42

# Import necessary libraries
import os

# Set environment variables before importing modules
os.environ['PYTHONHASHSEED'] = str(SEED)
os.environ['MPLCONFIGDIR'] = os.getcwd() + '/configs/'

# Suppress warnings
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)
warnings.simplefilter(action='ignore', category=Warning)

# Import necessary modules
import logging
import random
import numpy as np

# Set seeds for random number generators in NumPy and Python
np.random.seed(SEED)
random.seed(SEED)

# Import PyTorch
import torch
torch.manual_seed(SEED)
from torch import nn
from torchsummary import summary
from torch.utils.tensorboard import SummaryWriter
from torch.utils.data import TensorDataset, DataLoader
logs_dir = "tensorboard"
!pkill -f tensorboard
%load_ext tensorboard
!mkdir -p models

if torch.cuda.is_available():
    device = torch.device("cuda")
    torch.cuda.manual_seed_all(SEED)
    torch.backends.cudnn.benchmark = True
else:
    device = torch.device("cpu")

print(f"PyTorch version: {torch.__version__}")
print(f"Device: {device}")

# Import other libraries
import copy
import shutil
from itertools import product
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
from sklearn.model_selection import train_test_split

# Configure plot display settings
sns.set(font_scale=1.4)
sns.set_style('white')
plt.rc('font', size=14)
%matplotlib inline

## ⏳ **Data Loading**

In [None]:
# Load the Glass dataset from CSV file
os.environ["DATASET_NAME"] = "glass_dataset.csv"
os.environ["DATASET_URL"] = "1xyZvjIw2nR5QtlfN9vumuPUjr3SFZLob"
if not os.path.exists(os.environ["DATASET_NAME"]):
    print("Downloading data...")
    ! gdown -q ${DATASET_URL}
    print("Download completed")
else:
    print("Data already downloaded. Using cached data...")
data = pd.read_csv('glass_dataset.csv')

## 🔎 **Exploration and Data Analysis**

In [None]:
# Display the first 10 rows of the Glass dataset
data.head(10)

In [None]:
# Print the shape of the Glass dataset
print('Glass dataset shape', data.shape)

# Generate summary statistics for the Glass dataset
data.describe()

In [None]:
# Get the target values from the Glass dataset
target = data['Glass Class'].values
print('Target shape', target.shape)

# Calculate the unique target labels and their counts
unique, count = np.unique(target, return_counts=True)
print('Target labels:', unique)
for i in range(len(unique)):
    print(f'Class {unique[i]} has {count[i]} samples')

In [None]:
# Plot pairwise relationships between features colored by glass class
sns.pairplot(
    data=data,
    hue='Glass Class',
    corner=True,        # Display only lower triangle for efficiency
    palette='tab10'     # Color palette suitable for 6 classes
)
plt.show()

## 🔄 **Data Preprocessing**

In [None]:
# Prepare features and labels as float32 and int64 arrays
X = data.drop('Glass Class', axis=1).astype(np.float32).values
y = target.astype(np.int64)

In [None]:
def make_loader(ds, batch_size, shuffle, drop_last):
    # Determine optimal number of worker processes for data loading
    cpu_cores = os.cpu_count() or 2
    num_workers = max(2, min(4, cpu_cores))

    # Create DataLoader with performance optimizations
    return DataLoader(
        ds,
        batch_size=batch_size,
        shuffle=shuffle,
        drop_last=drop_last,
        num_workers=num_workers,
        pin_memory=True,  # Faster GPU transfer
        pin_memory_device="cuda" if torch.cuda.is_available() else "",
        prefetch_factor=4,  # Load 4 batches ahead
    )

## 🛠️ **Model Building**

In [None]:
# Define a simple feedforward neural network
class FeedForwardNet(nn.Module):
    def __init__(self, in_features, hidden_layers, hidden_size, dropout_rate, num_classes):
        super().__init__()
        modules = []
        # First layer
        modules.append(nn.Linear(in_features, hidden_size))
        if dropout_rate > 0 :
            modules.append(nn.Dropout(dropout_rate))
        modules.append(nn.ReLU())

        # Additional hidden layers
        for _ in range(hidden_layers):
            modules.append(nn.Linear(hidden_size, hidden_size))
            if dropout_rate > 0 :
                modules.append(nn.Dropout(dropout_rate))
            modules.append(nn.ReLU())

        # Output layer
        modules.append(nn.Linear(hidden_size, num_classes))
        self.net = nn.Sequential(*modules)

    def forward(self, x):
        return self.net(x)

## 🧮 **Network Parameters**

## 🧠 **Model Training**

In [None]:
# @title train_one_epoch()
def train_one_epoch(model, train_loader, criterion, optimizer, scaler, device, l1_lambda=0, l2_lambda=0):
    """
    Perform one complete training epoch through the entire training dataset.

    Args:
        model (nn.Module): The neural network model to train
        train_loader (DataLoader): PyTorch DataLoader containing training data batches
        criterion (nn.Module): Loss function (e.g., CrossEntropyLoss, MSELoss)
        optimizer (torch.optim): Optimization algorithm (e.g., Adam, SGD)
        scaler (GradScaler): PyTorch's gradient scaler for mixed precision training
        device (torch.device): Computing device ('cuda' for GPU, 'cpu' for CPU)
        l1_lambda (float): Lambda for L1 regularization
        l2_lambda (float): Lambda for L2 regularization

    Returns:
        tuple: (average_loss, f1 score) - Training loss and f1 score for this epoch
    """
    model.train()  # Set model to training mode

    running_loss = 0.0
    all_predictions = []
    all_targets = []

    # Iterate through training batches
    for batch_idx, (inputs, targets) in enumerate(train_loader):
        # Move data to device (GPU/CPU)
        inputs, targets = inputs.to(device), targets.to(device)

        # Clear gradients from previous step
        optimizer.zero_grad(set_to_none=True)

        # Forward pass with mixed precision (if CUDA available)
        with torch.amp.autocast(device_type=device.type, enabled=(device.type == 'cuda')):
            logits = model(inputs)
            loss = criterion(logits, targets)

            # Add L1 and L2 regularization
            l1_norm = sum(p.abs().sum() for p in model.parameters())
            l2_norm = sum(p.pow(2).sum() for p in model.parameters())
            loss = loss + l1_lambda * l1_norm + l2_lambda * l2_norm


        # Backward pass with gradient scaling
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        # Accumulate metrics
        running_loss += loss.item() * inputs.size(0)
        predictions = logits.argmax(dim=1)
        all_predictions.append(predictions.cpu().numpy())
        all_targets.append(targets.cpu().numpy())

    # Calculate epoch metrics
    epoch_loss = running_loss / len(train_loader.dataset)
    epoch_f1 = f1_score(
        np.concatenate(all_targets),
        np.concatenate(all_predictions),
        average='weighted'
    )

    return epoch_loss, epoch_f1

In [None]:
# @title validate_one_epoch()
def validate_one_epoch(model, val_loader, criterion, device):
    """
    Perform one complete validation epoch through the entire validation dataset.

    Args:
        model (nn.Module): The neural network model to evaluate (must be in eval mode)
        val_loader (DataLoader): PyTorch DataLoader containing validation data batches
        criterion (nn.Module): Loss function used to calculate validation loss
        device (torch.device): Computing device ('cuda' for GPU, 'cpu' for CPU)

    Returns:
        tuple: (average_loss, accuracy) - Validation loss and accuracy for this epoch

    Note:
        This function automatically sets the model to evaluation mode and disables
        gradient computation for efficiency during validation.
    """
    model.eval()  # Set model to evaluation mode

    running_loss = 0.0
    all_predictions = []
    all_targets = []

    # Disable gradient computation for validation
    with torch.no_grad():
        for inputs, targets in val_loader:
            # Move data to device
            inputs, targets = inputs.to(device), targets.to(device)

            # Forward pass with mixed precision (if CUDA available)
            with torch.amp.autocast(device_type=device.type, enabled=(device.type == 'cuda')):
                logits = model(inputs)
                loss = criterion(logits, targets)

            # Accumulate metrics
            running_loss += loss.item() * inputs.size(0)
            predictions = logits.argmax(dim=1)
            all_predictions.append(predictions.cpu().numpy())
            all_targets.append(targets.cpu().numpy())

    # Calculate epoch metrics
    epoch_loss = running_loss / len(val_loader.dataset)
    epoch_accuracy = f1_score(
        np.concatenate(all_targets),
        np.concatenate(all_predictions),
        average='weighted'
    )

    return epoch_loss, epoch_accuracy

In [None]:
# @title log_metrics_to_tensorboard()
def log_metrics_to_tensorboard(writer, epoch, train_loss, train_f1, val_loss, val_f1, model):
    """
    Log training metrics and model parameters to TensorBoard for visualization.

    Args:
        writer (SummaryWriter): TensorBoard SummaryWriter object for logging
        epoch (int): Current epoch number (used as x-axis in TensorBoard plots)
        train_loss (float): Training loss for this epoch
        train_f1 (float): Training f1 score for this epoch
        val_loss (float): Validation loss for this epoch
        val_f1 (float): Validation f1 score for this epoch
        model (nn.Module): The neural network model (for logging weights/gradients)

    Note:
        This function logs scalar metrics (loss/f1 score) and histograms of model
        parameters and gradients, which helps monitor training progress and detect
        issues like vanishing/exploding gradients.
    """
    # Log scalar metrics
    writer.add_scalar('Loss/Training', train_loss, epoch)
    writer.add_scalar('Loss/Validation', val_loss, epoch)
    writer.add_scalar('F1/Training', train_f1, epoch)
    writer.add_scalar('F1/Validation', val_f1, epoch)

    # Log model parameters and gradients
    for name, param in model.named_parameters():
        if param.requires_grad:
            if param.numel() > 0:
                writer.add_histogram(f'{name}/weights', param.data, epoch)
            if param.grad is not None:
                if param.grad.numel() > 0:
                    writer.add_histogram(f'{name}/gradients', param.grad.data, epoch)

In [None]:
# @title fit()
def fit(model, train_loader, val_loader, epochs, criterion, optimizer, scaler, device,
        l1_lambda=0, l2_lambda=0, patience=0, evaluation_metric="val_f1", mode='max',
        restore_best_weights=True, writer=None, verbose=10, experiment_name=""):
    """
    Train the neural network model on the training data and validate on the validation data.

    Args:
        model (nn.Module): The neural network model to train
        train_loader (DataLoader): PyTorch DataLoader containing training data batches
        val_loader (DataLoader): PyTorch DataLoader containing validation data batches
        epochs (int): Number of training epochs
        criterion (nn.Module): Loss function (e.g., CrossEntropyLoss, MSELoss)
        optimizer (torch.optim): Optimization algorithm (e.g., Adam, SGD)
        scaler (GradScaler): PyTorch's gradient scaler for mixed precision training
        device (torch.device): Computing device ('cuda' for GPU, 'cpu' for CPU)
        l1_lambda (float): L1 regularization coefficient (default: 0)
        l2_lambda (float): L2 regularization coefficient (default: 0)
        patience (int): Number of epochs to wait for improvement before early stopping (default: 0)
        evaluation_metric (str): Metric to monitor for early stopping (default: "val_f1")
        mode (str): 'max' for maximizing the metric, 'min' for minimizing (default: 'max')
        restore_best_weights (bool): Whether to restore model weights from best epoch (default: True)
        writer (SummaryWriter, optional): TensorBoard SummaryWriter object for logging (default: None)
        verbose (int, optional): Frequency of printing training progress (default: 10)
        experiment_name (str, optional): Experiment name for saving models (default: "")

    Returns:
        tuple: (model, training_history) - Trained model and metrics history
    """

    # Initialize metrics tracking
    training_history = {
        'train_loss': [], 'val_loss': [],
        'train_f1': [], 'val_f1': []
    }

    # Configure early stopping if patience is set
    if patience > 0:
        patience_counter = 0
        best_metric = float('-inf') if mode == 'max' else float('inf')
        best_epoch = 0

    # Main training loop: iterate through epochs
    for epoch in range(1, epochs + 1):

        # Forward pass through training data, compute gradients, update weights
        train_loss, train_f1 = train_one_epoch(
            model, train_loader, criterion, optimizer, scaler, device, l1_lambda, l2_lambda
        )

        # Evaluate model on validation data without updating weights
        val_loss, val_f1 = validate_one_epoch(
            model, val_loader, criterion, device
        )

        # Store metrics for plotting and analysis
        training_history['train_loss'].append(train_loss)
        training_history['val_loss'].append(val_loss)
        training_history['train_f1'].append(train_f1)
        training_history['val_f1'].append(val_f1)

        # Write metrics to TensorBoard for visualization
        if writer is not None:
            log_metrics_to_tensorboard(
                writer, epoch, train_loss, train_f1, val_loss, val_f1, model
            )

        # Print progress every N epochs or on first epoch
        if verbose > 0:
            if epoch % verbose == 0 or epoch == 1:
                print(f"Epoch {epoch:3d}/{epochs} | "
                    f"Train: Loss={train_loss:.4f}, F1 Score={train_f1:.4f} | "
                    f"Val: Loss={val_loss:.4f}, F1 Score={val_f1:.4f}")

        # Early stopping logic: monitor metric and save best model
        if patience > 0:
            current_metric = training_history[evaluation_metric][-1]
            is_improvement = (current_metric > best_metric) if mode == 'max' else (current_metric < best_metric)

            if is_improvement:
                best_metric = current_metric
                best_epoch = epoch
                torch.save(model.state_dict(), "models/"+experiment_name+'_model.pt')
                patience_counter = 0
            else:
                patience_counter += 1
                if patience_counter >= patience:
                    if verbose > 0:
                        print(f"Early stopping triggered after {epoch} epochs.")
                    break

    # Restore best model weights if early stopping was used
    if restore_best_weights and patience > 0:
        model.load_state_dict(torch.load("models/"+experiment_name+'_model.pt'))
        if verbose > 0:
            print(f"Best model restored from epoch {best_epoch} with {evaluation_metric} {best_metric:.4f}")

    # Save final model if no early stopping
    if patience == 0:
        torch.save(model.state_dict(), "models/"+experiment_name+'_model.pt')

    # Close TensorBoard writer
    if writer is not None:
        writer.close()

    return model, training_history

In [None]:
def k_shuffle_split_cross_validation_round(X, y, epochs, criterion, scaler, device,
                            k, test_size, batch_size, hidden_layers, hidden_size, learning_rate, dropout_rate,
                            l1_lambda=0, l2_lambda=0, patience=0, evaluation_metric="val_f1", mode='max',
                            restore_best_weights=True, writer=None, verbose=10, seed=SEED, experiment_name=""):

    # Initialise containers for results across all splits
    fold_losses = {}
    fold_metrics = {}
    best_scores = {}

    # Initialise model architecture
    in_features = X.shape[1]
    num_classes = len(np.unique(y))
    model = FeedForwardNet(in_features, hidden_layers=hidden_layers, hidden_size=hidden_size,
                           dropout_rate=dropout_rate, num_classes=num_classes).to(device)

    # Store initial weights to reset model for each split
    initial_state = copy.deepcopy(model.state_dict())

    # Iterate through K random splits
    for split_idx in range(k):

        if verbose > 0:
            print(f"Split {split_idx+1}/{k}")

        # Create train-val-test split with stratification
        X_train_val, X_test, y_train_val, y_test = train_test_split(
            X, y,
            test_size=test_size,
            random_state=SEED+split_idx,
            stratify=y
        )
        X_train, X_val, y_train, y_val = train_test_split(
            X_train_val, y_train_val,
            test_size=test_size,
            random_state=SEED+split_idx,
            stratify=y_train_val
        )

        # Normalise features using training set statistics
        train_max = X_train.max(axis=0)
        train_min = X_train.min(axis=0)
        X_train = (X_train - train_min) / (train_max - train_min + 1e-8)
        X_val = (X_val - train_min) / (train_max - train_min + 1e-8)
        X_test = (X_test - train_min) / (train_max - train_min + 1e-8)

        # Create PyTorch datasets
        train_ds = TensorDataset(torch.from_numpy(X_train), torch.from_numpy(y_train))
        val_ds   = TensorDataset(torch.from_numpy(X_val), torch.from_numpy(y_val))
        test_ds  = TensorDataset(torch.from_numpy(X_test), torch.from_numpy(y_test))

        # Create data loaders
        train_loader = make_loader(train_ds, batch_size=batch_size, shuffle=True, drop_last=False)
        val_loader   = make_loader(val_ds, batch_size=batch_size, shuffle=False, drop_last=False)
        test_loader  = make_loader(test_ds, batch_size=batch_size, shuffle=False, drop_last=False)

        # Reset model to initial weights for fair comparison across splits
        model.load_state_dict(initial_state)
        optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate, weight_decay=l2_lambda)

        # Create directory for model checkpoints
        !mkdir -p models/{experiment_name}

        # Train model on current split
        model, training_history = fit(
            model=model,
            train_loader=train_loader,
            val_loader=val_loader,
            epochs=epochs,
            criterion=criterion,
            optimizer=optimizer,
            scaler=scaler,
            device=device,
            writer=writer,
            patience=patience,
            verbose=verbose,
            experiment_name=experiment_name+"/split_"+str(split_idx)
        )

        # Store results for this split
        fold_losses[f"split_{split_idx}"] = training_history['val_loss']
        fold_metrics[f"split_{split_idx}"] = training_history['val_f1']
        best_scores[f"split_{split_idx}"] = max(training_history['val_f1'])

    # Compute mean and standard deviation of best scores across splits
    best_scores["mean"] = np.mean([best_scores[k] for k in best_scores.keys()])
    best_scores["std"] = np.std([best_scores[k] for k in best_scores.keys()])

    if verbose > 0:
        print(f"Best score: {best_scores['mean']:.4f}±{best_scores['std']:.4f}")

    return fold_losses, fold_metrics, best_scores

In [None]:
# Cross-validation
K = 5                    # Number of splits (5 and 10 are considered good values)
TEST_SIZE = 0.2          # Validation/test proportion

# Training
EPOCHS = 100              # Maximum epochs (increase to improve performance)
PATIENCE = 20             # Early stopping patience (increase to improve performance)
VERBOSE = 10             # Print frequency

# Optimisation
LEARNING_RATE = 1e-3     # Learning rate
BATCH_SIZE = 64          # Batch size

# Architecture
HIDDEN_LAYERS = 2        # Hidden layers
HIDDEN_SIZE = 128        # Neurons per layer

# Regularisation
DROPOUT_RATE = 0         # Dropout probability
L1_LAMBDA = 0            # L1 penalty
L2_LAMBDA = 0            # L2 penalty

# Training utilities
scaler = torch.cuda.amp.GradScaler(enabled=(device.type == 'cuda'))
criterion = nn.CrossEntropyLoss()

### **Baseline**

In [None]:
%%time
# Execute K-fold cross-validation with baseline configuration
losses, metrics, best_scores = k_shuffle_split_cross_validation_round(
    X=X,
    y=y,
    epochs=EPOCHS,
    criterion=criterion,
    scaler=scaler,
    device=device,
    k=K,
    test_size=TEST_SIZE,
    batch_size=BATCH_SIZE,
    hidden_layers=HIDDEN_LAYERS,
    hidden_size=HIDDEN_SIZE,
    learning_rate=LEARNING_RATE,
    dropout_rate=DROPOUT_RATE,
    l1_lambda=L1_LAMBDA,
    l2_lambda=L2_LAMBDA,
    verbose=VERBOSE,
    patience=PATIENCE,
    seed=SEED,
    experiment_name="baseline"
)

In [None]:
# @title Plot Hitory
# Create figure with two subplots sharing x axis
fig, axes = plt.subplots(nrows=1, ncols=2, figsize=(18, 5), sharex=True)

# Color palette for K splits
colors = plt.cm.get_cmap('tab10', K)

# Plot validation loss for each split
for split in range(K):
    axes[0].plot(losses[f'split_{split}'][:-PATIENCE], label=f'Split {split+1}',
                 color=colors(split), alpha=0.6)
axes[0].set_title('Validation Loss per Split')
axes[0].set_ylabel('Loss')
axes[0].set_xlabel('Epoch')
axes[0].grid(alpha=0.3)

# Plot validation F1 score for each split
for split in range(K):
    axes[1].plot(metrics[f'split_{split}'][:-PATIENCE], label=f'Split {split+1}',
                 color=colors(split), alpha=0.6)
axes[1].set_title('Validation F1 Score per Split')
axes[1].set_ylabel('F1 Score')
axes[1].set_xlabel('Epoch')
axes[1].grid(alpha=0.3)

# Add shared legend on the right
handles, labels = axes[0].get_legend_handles_labels()
fig.legend(handles, labels, loc='center left', bbox_to_anchor=(1, 0.5), fontsize='small')

# Adjust layout
plt.tight_layout()
plt.subplots_adjust(right=0.975)
plt.show()

## **Hyperparameters Tuning**

In [None]:
def grid_search_cv(X, y, param_grid, fixed_params, cv_params, verbose=True):
    """
    Execute grid search with K-fold cross-validation.

    Args:
        X, y: Training data and labels
        param_grid: Dict of parameters to test, e.g. {'batch_size': [16, 32, 64]}
        fixed_params: Dict of fixed hyperparameters (hidden_size, learning_rate, etc.)
        cv_params: Dict of CV settings (epochs, k, patience, criterion, scaler, device, etc.)
        verbose: Print progress for each configuration

    Returns:
        results: Dict with scores for each configuration
        best_config: Dict with best hyperparameter combination
        best_score: Best mean F1 score achieved
    """
    from itertools import product

    # Generate all parameter combinations
    param_names = list(param_grid.keys())
    param_values = list(param_grid.values())
    combinations = list(product(*param_values))

    results = {}
    best_score = -np.inf
    best_config = None

    total = len(combinations)

    for idx, combo in enumerate(combinations, 1):
        # Create current configuration dict
        current_config = dict(zip(param_names, combo))
        config_str = "_".join([f"{k}_{v}" for k, v in current_config.items()])

        if verbose:
            print(f"\nConfiguration {idx}/{total}:")
            for param, value in current_config.items():
                print(f"  {param}: {value}")

        # Merge current config with fixed parameters
        run_params = {**fixed_params, **current_config}

        # Execute cross-validation
        _, _, fold_scores = k_shuffle_split_cross_validation_round(
            X=X, y=y,
            experiment_name=config_str,
            **run_params,
            **cv_params
        )

        # Store results
        results[config_str] = fold_scores

        # Track best configuration
        if fold_scores["mean"] > best_score:
            best_score = fold_scores["mean"]
            best_config = current_config.copy()
            if verbose:
                print("  NEW BEST SCORE!")

        if verbose:
            print(f"  F1 Score: {fold_scores['mean']:.4f}±{fold_scores['std']:.4f}")

    return results, best_config, best_score


def plot_top_configurations(results, k_splits, top_n=5, figsize=(12, 6)):
    """
    Visualise top N configurations with boxplots of F1 scores across CV splits.

    Args:
        results: Dict of results from grid_search_cv
        k_splits: Number of CV splits used
        top_n: Number of top configurations to display
        figsize: Figure size tuple
    """
    # Sort by mean score
    config_scores = {name: data['mean'] for name, data in results.items()}
    sorted_configs = sorted(config_scores.items(), key=lambda x: x[1], reverse=True)

    # Select top N
    top_configs = sorted_configs[:min(top_n, len(sorted_configs))]

    # Prepare boxplot data
    boxplot_data = []
    labels = []

    # Define a dictionary for replacements, ordered to handle prefixes correctly
    replacements = {
        'batch_size_': 'BS=',
        'learning_rate_': '\nLR=',
        'hidden_layers_': '\nHL=',
        'hidden_size_': '\nHS=',
        'dropout_rate_': '\nDR=',
        'l1_lambda_': '\nL1=',
        'l2_lambda_': '\nL2='
    }

    # Replacements for separators
    separator_replacements = {
        '_learning_rate_': '\nLR=',
        '_hidden_layers_': '\nHL=',
        '_hidden_size_': '\nHS=',
        '_dropout_rate_': '\nDR=',
        '_l1_lambda_': '\nL1=',
        '_l2_lambda_': '\nL2=',
        '_': ''
    }


    for config_name, mean_score in top_configs:
        # Extract best score from each split (auto-detect number of splits)
        split_scores = []
        for i in range(k_splits):
            if f'split_{i}' in results[config_name]:
                split_scores.append(results[config_name][f'split_{i}'])
        boxplot_data.append(split_scores)

        # Verify we have the expected number of splits
        if len(split_scores) != k_splits:
            print(f"Warning: Config {config_name} has {len(split_scores)} splits, expected {k_splits}")

        # Create readable label using the replacements dictionary
        readable_label = config_name
        for old, new in replacements.items():
            readable_label = readable_label.replace(old, new)

        # Apply separator replacements
        for old, new in separator_replacements.items():
             readable_label = readable_label.replace(old, new)

        labels.append(f"{readable_label}\n(μ={mean_score:.3f})")

    # Create plot
    fig, ax = plt.subplots(figsize=figsize)
    bp = ax.boxplot(boxplot_data, labels=labels, patch_artist=True,
                    showmeans=True, meanline=True)

    # Styling
    for patch in bp['boxes']:
        patch.set_facecolor('lightblue')
        patch.set_alpha(0.7)

    # Highlight best configuration
    ax.get_xticklabels()[0].set_fontweight('bold')

    ax.set_ylabel('F1 Score')
    ax.set_xlabel('Configuration')
    ax.set_title(f'Top {len(top_configs)} Configurations - F1 Score Distribution Across {k_splits} Splits')
    ax.grid(alpha=0.3, axis='y')

    plt.tight_layout()
    plt.show()

In [None]:
VERBOSE = 0             # Print frequency

### **Batch Size**

In [None]:
%%time
# Define parameters to search
param_grid = {
    'batch_size': [4, 16, 32, 64, 128]
}

# Fixed hyperparameters (not being tuned)
fixed_params = {
    'hidden_layers': HIDDEN_LAYERS,
    'hidden_size': HIDDEN_SIZE,
    'learning_rate': LEARNING_RATE,
    'dropout_rate': DROPOUT_RATE,
    'l1_lambda': L1_LAMBDA,
    'l2_lambda': L2_LAMBDA
}

# Cross-validation settings
cv_params = {
    'epochs': EPOCHS,
    'criterion': criterion,
    'scaler': scaler,
    'device': device,
    'k': K,
    'test_size': TEST_SIZE,
    'patience': PATIENCE,
    'verbose': VERBOSE,
    'seed': SEED
}

# Execute search
results, best_config, best_score = grid_search_cv(
    X=X, y=y,
    param_grid=param_grid,
    fixed_params=fixed_params,
    cv_params=cv_params
)

In [None]:
# Print the results
print(results)

In [None]:
# Print the best configuration found
print(best_config)

In [None]:
# Print the best performance achieved
print(best_score)

In [None]:
# Visualise results
plot_top_configurations(results, k_splits=K, top_n=5)

### **Learning Rate**

In [None]:
%%time
# Define parameters to search
param_grid = {
    'learning_rate': [1e-4, 5e-4, 1e-3, 5e-3, 1e-2]
}

# Fixed hyperparameters (not being tuned)
fixed_params = {
    'hidden_layers': HIDDEN_LAYERS,
    'hidden_size': HIDDEN_SIZE,
    'batch_size': BATCH_SIZE,
    'dropout_rate': DROPOUT_RATE,
    'l1_lambda': L1_LAMBDA,
    'l2_lambda': L2_LAMBDA
}

# Cross-validation settings
cv_params = {
    'epochs': EPOCHS,
    'criterion': criterion,
    'scaler': scaler,
    'device': device,
    'k': K,
    'test_size': TEST_SIZE,
    'patience': PATIENCE,
    'verbose': VERBOSE,
    'seed': SEED
}

# Execute search
results, best_config, best_score = grid_search_cv(
    X=X, y=y,
    param_grid=param_grid,
    fixed_params=fixed_params,
    cv_params=cv_params
)

In [None]:
# Visualise results
plot_top_configurations(results, k_splits=K, top_n=4)

### **Model Complexity**

In [None]:
%%time
# Define parameters to search
param_grid = {
    'hidden_layers': [1, 2],
    'hidden_size': [32, 128, 512]
}

# Fixed hyperparameters (not being tuned)
fixed_params = {
    'learning_rate': LEARNING_RATE,
    'batch_size': BATCH_SIZE,
    'dropout_rate': DROPOUT_RATE,
    'l1_lambda': L1_LAMBDA,
    'l2_lambda': L2_LAMBDA
}

# Cross-validation settings
cv_params = {
    'epochs': EPOCHS,
    'criterion': criterion,
    'scaler': scaler,
    'device': device,
    'k': K,
    'test_size': TEST_SIZE,
    'patience': PATIENCE,
    'verbose': VERBOSE,
    'seed': SEED
}

# Execute search
results, best_config, best_score = grid_search_cv(
    X=X, y=y,
    param_grid=param_grid,
    fixed_params=fixed_params,
    cv_params=cv_params
)

In [None]:
# Visualise results
plot_top_configurations(results, k_splits=K, top_n=3)

### **Batch Size vs Learning Rate vs Model Complexity**

In [None]:
%%time
# Define parameters to search
param_grid = {
    'batch_size': [16, 32],
    'learning_rate': [1e-3, 5e-3],
    'hidden_layers': [1, 2],
    'hidden_size': [128, 512]
}

# Fixed hyperparameters (not being tuned)
fixed_params = {
    'dropout_rate': DROPOUT_RATE,
    'l1_lambda': L1_LAMBDA,
    'l2_lambda': L2_LAMBDA
}

# Cross-validation settings
cv_params = {
    'epochs': EPOCHS,
    'criterion': criterion,
    'scaler': scaler,
    'device': device,
    'k': K,
    'test_size': TEST_SIZE,
    'patience': PATIENCE,
    'verbose': VERBOSE,
    'seed': SEED
}

# Execute search
results, best_config, best_score = grid_search_cv(
    X=X, y=y,
    param_grid=param_grid,
    fixed_params=fixed_params,
    cv_params=cv_params
)

In [None]:
# Visualise results
plot_top_configurations(results, k_splits=K, top_n=5)

### **Model Regularisation**



In [None]:
%%time
# Define parameters to search
param_grid = {
    'dropout_rate': [0, 0.2, 0.5],
    'l1_lambda': [0, 1e-4, 1e-2],
    'l2_lambda': [0, 1e-4, 1e-2]
}

# Fixed hyperparameters (not being tuned)
fixed_params = {
    'batch_size': 16,
    'learning_rate': 0.005,
    'hidden_layers': 2,
    'hidden_size': 512
}

# Cross-validation settings
cv_params = {
    'epochs': EPOCHS,
    'criterion': criterion,
    'scaler': scaler,
    'device': device,
    'k': K,
    'test_size': TEST_SIZE,
    'patience': PATIENCE,
    'verbose': VERBOSE,
    'seed': SEED
}

# Execute search
results, best_config, best_score = grid_search_cv(
    X=X, y=y,
    param_grid=param_grid,
    fixed_params=fixed_params,
    cv_params=cv_params
)

In [None]:
# Visualise results
plot_top_configurations(results, k_splits=K, top_n=5)

### **Batch Size vs Learning Rate vs Model Complexity vs Model Regularisation**

In [None]:
%%time
# Define parameters to search
param_grid = {
    'batch_size': [16, 32],
    'learning_rate': [1e-3, 5e-3],
    'hidden_layers': [1, 2],
    'hidden_size': [128, 512],
    'dropout_rate': [0, 0.2, 0.5],
    'l1_lambda': [0, 1e-4, 1e-2],
    'l2_lambda': [0, 1e-4, 1e-2]
}

# Fixed hyperparameters (not being tuned)
fixed_params = {}

# Cross-validation settings
cv_params = {
    'epochs': EPOCHS,
    'criterion': criterion,
    'scaler': scaler,
    'device': device,
    'k': K,
    'test_size': TEST_SIZE,
    'patience': PATIENCE,
    'verbose': VERBOSE,
    'seed': SEED
}

# Execute search
results, best_config, best_score = grid_search_cv(
    X=X, y=y,
    param_grid=param_grid,
    fixed_params=fixed_params,
    cv_params=cv_params
)

In [None]:
# Visualise results
plot_top_configurations(results, k_splits=K, top_n=5)

## 🕹️ **Inference**

In [None]:
"""
Configuration 143/432:
  batch_size: 16
  learning_rate: 0.005
  hidden_layers: 1
  hidden_size: 512
  dropout_rate: 0
  l1_lambda: 0.01
  l2_lambda: 0.0001
  F1 Score: 0.7582±0.0602
"""

best_configuration = {}
best_configuration['batch_size'] = 16
best_configuration['learning_rate'] = 0.005
best_configuration['hidden_layers'] = 1
best_configuration['hidden_size'] = 512
best_configuration['dropout_rate'] = 0
best_configuration['l1_lambda'] = 0.01
best_configuration['l2_lambda'] = 0.0001

In [None]:
# Retrieve the best hyperparameters
best_batch_size = best_configuration['batch_size']
best_learning_rate = best_configuration['learning_rate']
best_hidden_layers = best_configuration['hidden_layers']
best_hidden_size = best_configuration['hidden_size']
best_dropout_rate = best_configuration['dropout_rate']
best_l1_lambda = best_configuration['l1_lambda']
best_l2_lambda = best_configuration['l2_lambda']

# Initialize lists to store metrics for each split on the test set
test_accuracies = []
test_precisions = []
test_recall_scores = [] # Corrected typo
test_f1_scores = []
all_test_targets = []
all_test_preds = []


for split in range(K):
    print(f"Evaluating Split {split+1}/{K}")

    # Regenerate the data splits to ensure the correct test set for this split
    X_train_val, X_test, y_train_val, y_test = train_test_split(
        X,
        y,
        test_size=TEST_SIZE,
        random_state=SEED+split,
        stratify=y
    )
    # Further split train_val to get train and validation sets (needed for normalization)
    X_train, X_val, y_train, y_val = train_test_split(
        X_train_val,
        y_train_val,
        test_size=TEST_SIZE,
        random_state=SEED+split, # Use same random state for consistent splitting
        stratify=y_train_val
    )

    # Apply the same normalization learned from the training set of this split to the test set
    max_df = X_train.max(axis=0)
    min_df = X_train.min(axis=0)
    # Add a small epsilon to avoid division by zero in case of constant features
    X_test_normalized = (X_test - min_df) / (max_df - min_df + 1e-8)


    # Create a TensorDataset and DataLoader for the test set
    test_ds  = TensorDataset(torch.from_numpy(X_test_normalized), torch.from_numpy(y_test))
    test_loader  = make_loader(test_ds, batch_size=best_batch_size, shuffle=False, drop_last=False)

    # Initialize the model with the best hyperparameters found
    model = FeedForwardNet(in_features=X.shape[1],
                           hidden_layers=best_hidden_layers,
                           hidden_size=best_hidden_size,
                           dropout_rate=best_dropout_rate,
                           num_classes=len(np.unique(y))).to(device)

    # Construct the path to the saved model weights for this specific split and best configuration
    config_name = f"batch_size_{best_batch_size}_learning_rate_{best_learning_rate}_hidden_layers_{best_hidden_layers}_hidden_size_{best_hidden_size}_dropout_rate_{best_dropout_rate}_l1_lambda_{best_l1_lambda}_l2_lambda_{best_l2_lambda}"
    model_path = f"models/{config_name}/split_{split}_model.pt"

    # Load the model weights
    model.load_state_dict(torch.load(model_path))
    model.eval() # Set model to evaluation mode


    # Make predictions on the test set for this split
    split_test_preds, split_test_targets = [], []
    with torch.no_grad():  # Disable gradient computation for inference
        for xb, yb in test_loader:
            xb = xb.to(device)

            # Forward pass: get model predictions
            logits = model(xb)
            preds = logits.argmax(dim=1).cpu().numpy()

            # Store batch results
            split_test_preds.append(preds)
            split_test_targets.append(yb.numpy())

    # Combine all batches into single arrays for this split
    split_test_preds = np.concatenate(split_test_preds)
    split_test_targets = np.concatenate(split_test_targets)

    # Calculate metrics for this split's test set
    split_test_acc = accuracy_score(split_test_targets, split_test_preds)
    split_test_prec = precision_score(split_test_targets, split_test_preds, average='weighted')
    split_test_rec = recall_score(split_test_targets, split_test_preds, average='weighted')
    split_test_f1 = f1_score(split_test_targets, split_test_preds, average='weighted')

    # Print F1 score for the current split
    print(f"  Test F1 Score for Split {split+1}: {split_test_f1:.4f}")


    # Store metrics for calculating average later
    test_accuracies.append(split_test_acc)
    test_precisions.append(split_test_prec)
    test_recall_scores.append(split_test_rec)
    test_f1_scores.append(split_test_f1)

    # Extend the overall lists for the confusion matrix
    all_test_targets.extend(split_test_targets)
    all_test_preds.extend(split_test_preds)


# Calculate and print average metrics across all splits on the test set
print("\nAverage metrics across all splits on the test set:")
print(f"Mean Accuracy: {np.mean(test_accuracies):.4f} ± {np.std(test_accuracies):.4f}")
print(f"Mean Precision: {np.mean(test_precisions):.4f} ± {np.std(test_precisions):.4f}")
print(f"Mean Recall: {np.mean(test_recall_scores):.4f} ± {np.std(test_recall_scores):.4f}")
print(f"Mean F1 score: {np.mean(test_f1_scores):.4f} ± {np.std(test_f1_scores):.4f}")


# Generate confusion matrix for the concatenated test sets (across all splits)
cm = confusion_matrix(all_test_targets, all_test_preds)

# Create numeric labels for heatmap annotation
labels = np.array([f"{num}" for num in cm.flatten()]).reshape(cm.shape)

# Visualise confusion matrix
plt.figure(figsize=(8, 7))
sns.heatmap(cm, annot=labels, fmt='',
            cmap='Blues')
plt.xlabel('Predicted labels')
plt.ylabel('True labels')
plt.title('Aggregated Confusion Matrix — Test Sets Across Splits')
plt.tight_layout()
plt.show()

In [None]:
def clean_models_directory(scores, best_configuration):
    """
    Deletes all model directories and files in the 'models' directory
    except for the directory containing the best model based on the
    best_configuration.

    Args:
        scores (dict): Dictionary containing the results of the hyperparameter search.
        best_configuration (dict): Dictionary containing the hyperparameters
                                   of the best performing configuration.
    """
    models_dir = "models"
    if not os.path.exists(models_dir):
        print(f"Models directory '{models_dir}' not found.")
        return

    # Construct the expected directory name for the best configuration
    best_config_dir_name = f"bs_{best_configuration['batch_size']}_lr_{best_configuration['learning_rate']}_hl_{best_configuration['hidden_layers']}_hs_{best_configuration['hidden_size']}_dr_{best_configuration['dropout_rate']}_l1_{best_configuration['l1_lambda']}_l2_{best_configuration['l2_lambda']}"
    best_config_path = os.path.join(models_dir, best_config_dir_name)

    # Add a check to ensure the best configuration directory exists
    if not os.path.exists(best_config_path):
        print(f"Error: Best model directory '{best_config_path}' not found. Cannot clean directory safely.")
        return

    print(f"Keeping the best model directory: {best_config_path}")

    # Iterate through all items in the models directory
    for item in os.listdir(models_dir):
        item_path = os.path.join(models_dir, item)

        # If the item is a file or a directory and not the best configuration directory, delete it
        if item_path != best_config_path:
            if os.path.isdir(item_path):
                print(f"Deleting directory: {item_path}")
                shutil.rmtree(item_path)
            elif os.path.isfile(item_path):
                print(f"Deleting file: {item_path}")
                os.remove(item_path)

    print("Models directory cleaned.")

# clean_models_directory(scores, best_configuration)

#  
<img src="https://airlab.deib.polimi.it/wp-content/uploads/2019/07/airlab-logo-new_cropped.png" width="350">

##### Connect with us:
- <img src="https://upload.wikimedia.org/wikipedia/commons/thumb/8/81/LinkedIn_icon.svg/2048px-LinkedIn_icon.svg.png" width="14"> **LinkedIn:**  [AIRLab Polimi](https://www.linkedin.com/company/airlab-polimi/)
- <img src="https://upload.wikimedia.org/wikipedia/commons/thumb/9/95/Instagram_logo_2022.svg/800px-Instagram_logo_2022.svg.png" width="14"> **Instagram:** [airlab_polimi](https://www.instagram.com/airlab_polimi/)

##### Contributors:
- **Eugenio Lomurno**: eugenio.lomurno@polimi.it
- **Alberto Archetti**: alberto.archetti@polimi.it
- **Roberto Basla**: roberto.basla@polimi.it
- **Carlo Sgaravatti**: carlo.sgaravatti@polimi.it

```
   Copyright 2025 Eugenio Lomurno, Alberto Archetti, Roberto Basla, Carlo Sgaravatti

   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
   You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License.
```
