# Intro to AI 4 - CW3 Artifical Neural Networks (ANN)

### Setup

In [None]:
import torch
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from torch.utils.data import Dataset, DataLoader
import random
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
import itertools


# Set up seeds (reproduceability)
seed = 42
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)

if (torch.cuda.is_available()):
    print("CUDA available. Using device: ", torch.cuda.get_device_name())
else:
    print("CUDA is not available. Using device: ", torch.device("cpu"))

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


#### Data import

In [None]:
DATA_PATH = r"./interest-rates/index.csv"
df = pd.read_csv(DATA_PATH)

df["date"] = pd.to_datetime(df[["Year", "Month", "Day"]])
df = df.sort_values("date")

print(df.shape)
display(df.head())

In [None]:
def prepare_quarterly_data(df_in: pd.DataFrame, add_lags: bool = False, max_lag: int = 2):
    target_col = "Real GDP (Percent Change)"
    base_feature_cols = [
        "Effective Federal Funds Rate",
        "Unemployment Rate",
        "Inflation Rate",
    ]

    # using a copy of the data
    temp = df_in.copy()
    temp = temp.set_index("date").sort_index()

    # FF between GDP and keep only relevant
    temp[target_col] = temp[target_col].ffill()
    cols_to_keep = [target_col] + base_feature_cols
    temp = temp[cols_to_keep]

    # Interpolate features in time, then back-fill any remaining NAs
    temp[base_feature_cols] = temp[base_feature_cols].interpolate(method="time")
    temp[base_feature_cols] = temp[base_feature_cols].bfill()

    df_q = temp.resample("QS", label="left").mean()

    # create feature lags on this quarterly index.
    feature_cols = base_feature_cols.copy()

    if add_lags:
        for col in base_feature_cols:
            for lag in range(1, max_lag + 1):
                lag_name = f"{col}_lag{lag}q"
                df_q[lag_name] = df_q[col].shift(lag)
                feature_cols.append(lag_name)

        df_q = df_q.dropna(subset=feature_cols + [target_col]) # Drop rows with inital NAs from the lags

    return df_q, feature_cols


In [None]:
# quarterly data with lags
pipeline_input_down_lag, feature_cols_down_lag = prepare_quarterly_data(
    df, add_lags=True, max_lag=2
)

print("Downsampled quarterly + feature lags shape:", pipeline_input_down_lag.shape)

display(pipeline_input_down_lag.head())

In [None]:

data = pipeline_input_down_lag.sort_index()
data = data.reset_index(drop=True)

train_ratio = 0.70
val_ratio   = 0.15

N = len(data)

train_end = int(N * train_ratio)
val_end   = int(N * (train_ratio + val_ratio))

#Split!
train_df = data.iloc[:train_end]
val_df   = data.iloc[train_end:val_end]
test_df  = data.iloc[val_end:]

X_train = train_df[feature_cols_down_lag].values.astype(np.float32)
y_train = train_df["Real GDP (Percent Change)"].values.astype(np.float32)

X_val   = val_df[feature_cols_down_lag].values.astype(np.float32)
y_val   = val_df["Real GDP (Percent Change)"].values.astype(np.float32)

X_test  = test_df[feature_cols_down_lag].values.astype(np.float32)
y_test  = test_df["Real GDP (Percent Change)"].values.astype(np.float32)



#### Standardisation

In [None]:
scaler = StandardScaler()
scaler.fit(X_train)

# aplly transformation to all other feature sets
X_train_scaled = scaler.transform(X_train).astype(np.float32)
X_val_scaled   = scaler.transform(X_val).astype(np.float32)
X_test_scaled  = scaler.transform(X_test).astype(np.float32)

# Pytorch dataset class
class ff_dataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32)

        # Ensure y is always shape (N, 1)
        if len(y.shape) == 1:              # shape (N,)
            y = y.reshape(-1, 1)           # -> shape (N, 1)

        self.y = torch.tensor(y, dtype=torch.float32)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]


train_dataset = ff_dataset(X_train_scaled, y_train)
val_dataset   = ff_dataset(X_val_scaled, y_val)
test_dataset  = ff_dataset(X_test_scaled, y_test)

### Batches

With another helper function so that I can use it later in hyperparameter tuning

In [None]:
batch_size = 16

def loaders_w_batch_size(batch_size):
    train_dataset = ff_dataset(X_train_scaled, y_train)
    val_dataset   = ff_dataset(X_val_scaled,   y_val)
    test_dataset  = ff_dataset(X_test_scaled, y_test)

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False)
    val_loader   = DataLoader(val_dataset,   batch_size=batch_size, shuffle=False)
    test_loader  = DataLoader(test_dataset,  batch_size=batch_size, shuffle=False)

    return train_loader, val_loader, test_loader

train_loader, val_loader, test_loader = loaders_w_batch_size(batch_size)

### Helper functions

I'll use these later for training, evaluating and plotting the model.

In [None]:
def train_one_epoch(model, loader, optimiser, criterion, device):
    model.train()
    running_loss = 0.0
    n_samples = 0

    for X_batch, y_batch in loader:
        X_batch = X_batch.to(device)
        y_batch = y_batch.to(device)

        optimiser.zero_grad()
        outputs = model(X_batch)
        loss = criterion(outputs, y_batch)
        loss.backward()
        optimiser.step()

        batch_size = X_batch.size(0)
        running_loss += loss.item() * batch_size
        n_samples += batch_size

    return running_loss / n_samples

def evaluate(model, loader, criterion, device):
    model.eval()
    running_loss = 0.0
    n_samples = 0

    with torch.no_grad():
        for X_batch, y_batch in loader:
            X_batch = X_batch.to(device)
            y_batch = y_batch.to(device)

            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)

            batch_size = X_batch.size(0)
            running_loss += loss.item() * batch_size
            n_samples += batch_size

    return running_loss / n_samples

def plot_loss_curves(train_losses, val_losses, model_name="Model"):
    epochs = range(1, len(train_losses) + 1)

    plt.figure(figsize=(10, 5))
    plt.plot(epochs, train_losses, label="Training Loss (MSE)", linewidth=2)
    plt.plot(epochs, val_losses, label="Validation Loss (MSE)", linewidth=2)

    plt.xlabel("Epoch", fontsize=12)
    plt.ylabel("Loss (MSE)", fontsize=12)
    plt.title(f"{model_name} Loss Curves", fontsize=14)
    plt.grid(True, which="both", linestyle="--", alpha=0.5)
    plt.legend()
    plt.tight_layout()
    plt.show()


### Baseline MLP Model

#### Model definition

In [None]:
input_dim = X_train_scaled.shape[1]   # define training features size
output_dim = 1                        # only output we want is GDP

class BaselineMLP(nn.Module):
    def __init__(self, in_features, out_features, activation_name="relu"):
        super().__init__()

        # Choose activation
        if activation_name.lower() == "relu":
            act = nn.ReLU()
        elif activation_name.lower() == "tanh":
            act = nn.Tanh()
        elif activation_name.lower() in ["leaky_relu", "leaky-relu", "lrelu"]:
            act = nn.LeakyReLU()
        else:
            raise ValueError(f"Unknown activation_name: {activation_name}")

        self.net = nn.Sequential(
            nn.Linear(in_features, 16),
            act,
            nn.Linear(16, out_features)
        )

    def forward(self, x):
        return self.net(x)

model = BaselineMLP(input_dim, output_dim).to(device)
print(model)


#### Loss function and optimiser

In [None]:
criterion = nn.MSELoss()
optimiser = optim.Adam(model.parameters(), lr=1e-3)

#### Training and evaluation helper functions

These functions can be used with any of the models and will be reused when running through other non-baseline models too.

#### Training loop

In [None]:
num_epochs = 200 # max number of epochs
patience = 20  # if validation loss doesnt improve after this many epochs, stop.

best_val_loss = float("inf")
best_model_state = None
epochs_no_improve = 0

train_losses = []
val_losses = []

for epoch in range(1, num_epochs + 1):
    train_loss = train_one_epoch(model, train_loader, optimiser, criterion, device)
    val_loss = evaluate(model, val_loader, criterion, device)

    train_losses.append(train_loss)
    val_losses.append(val_loss)

    print(f"[Baseline] Epoch {epoch:03d} | Train MSE: {train_loss:.6f} | Val MSE: {val_loss:.6f}")

    if val_loss < best_val_loss - 1e-6:  # check for improvement accounting for potential noise
        best_val_loss = val_loss
        best_model_state = model.state_dict()
        epochs_no_improve = 0
    else:
        epochs_no_improve += 1

    # early stopping
    if epochs_no_improve >= patience:
        print(f"[Baseline] Early stopping triggered after {epoch} epochs.")
        break

# load best model weights
if best_model_state is not None:
    model.load_state_dict(best_model_state)
    print("[Baseline] Loaded best model (lowest val MSE).")

test_mse = evaluate(model, test_loader, criterion, device)
print(f"\n[Baseline] Final Test MSE (Baseline MLP): {test_mse:.6f}")

plot_loss_curves(train_losses, val_losses, "Baseline MLP")


The baseline MLP model therefore shows an MSE of around 25, translating to around 4-6 % points. This therefore is not great but works as a good baseline for the model to be developed onto, which is what will be done with the next models. The high MSE value shows that quite a bit of underfitting is taking place in this model. However, before taking this as the baseline's best performance, I will attempt to tune the hyperparameters before moving onto model B

### Baseline MLP tuning

In the code block below I will perform the hyperparameter tuning for the baseline MLP model. This will reuse a lot of the code above (I've decided to keep that above split up for understanding/explanation reasons).


#### Model running function

In [None]:
def run_modelA(learning_rate,
               batch_size,
               weight_decay,
               activation_name,
               num_epochs=200,
               patience=20,
               extra_info=False,
               evaluate_model=False):

    # new model for each run
    model = BaselineMLP(input_dim, output_dim, activation_name=activation_name).to(device)

    # create loaders
    train_loader, val_loader, test_loader = loaders_w_batch_size(batch_size)

    criterion  = nn.MSELoss()
    optimiser  = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=weight_decay)

    best_val_loss      = float("inf")
    best_train_loss    = float("inf")
    best_model_state = None
    epochs_no_improve  = 0

    if evaluate_model:
        train_losses = []  # only used if evaluate_model=True
        val_losses   = []  # only used if evaluate_model=True

    for epoch in range(1, num_epochs + 1):
        train_loss = train_one_epoch(model, train_loader, optimiser, criterion, device)
        val_loss   = evaluate(model, val_loader, criterion, device)

        if evaluate_model:
            train_losses.append(train_loss)
            val_losses.append(val_loss)

        if extra_info:
            print(
                f"[Baseline tuning] (lr={learning_rate}, bs={batch_size}, "
                f"wd={weight_decay}, act={activation_name}) "
                f"Epoch {epoch:03d} | Train MSE: {train_loss:.6f} | Val MSE: {val_loss:.6f}"
            )

        if val_loss < best_val_loss - 1e-6:  # check for improvement accounting for potential noise
            best_val_loss   = val_loss
            best_model_state_B = model_B.state_dict()
            best_train_loss = train_loss
            epochs_no_improve = 0
        else:
            epochs_no_improve += 1

        if epochs_no_improve >= patience:
            break


    if best_model_state_B is not None:
        model_B.load_state_dict(best_model_state_B)

    if evaluate_model:
        test_mse = evaluate(model, test_loader, criterion, device)
        return val_losses, train_losses, test_mse

    return best_val_loss, best_train_loss


#### Hypereparameter grid search

In [None]:
learning_rate_grid = [1e-2, 3e-3, 1e-3]
batch_size_grid    = [16]
weight_decay_grid  = [0.0, 1e-4, 1e-3]
activation_grid    = ["relu", "tanh", "leaky_relu"]

hyperparameter_grid = list(itertools.product(
    learning_rate_grid,
    batch_size_grid,
    weight_decay_grid,
    activation_grid,
))

print(f"[Baseline tuning] Total combinations to test for Model A: {len(hyperparameter_grid)}")

n_runs_per_setting = 3  # evaluate each hyperparameter set 3 times

results = []

for learning_rate, batch_size, weight_decay, activation_name in hyperparameter_grid:
    val_mses   = []
    train_mses = []

    for run_idx in range(n_runs_per_setting):
        # Optional: vary seed for each repeat
        torch.manual_seed(42 + run_idx)
        np.random.seed(42 + run_idx)
        random.seed(42 + run_idx)

        best_val_mse, best_train_mse = run_modelA(
            learning_rate=learning_rate,
            batch_size=batch_size,
            weight_decay=weight_decay,
            activation_name=activation_name,
            num_epochs=200,
            patience=20,
            extra_info=False
        )

        val_mses.append(best_val_mse)
        train_mses.append(best_train_mse)

    mean_val_mse = float(np.mean(val_mses))
    std_val_mse  = float(np.std(val_mses))

    results.append({
        "learning_rate": learning_rate,
        "batch_size": batch_size,
        "weight_decay": weight_decay,
        "activation": activation_name,
        "mean_best_val_mse": mean_val_mse,
        "std_best_val_mse": std_val_mse,
    })

    print(
        f"[Baseline tuning] Done: lr={learning_rate}, bs={batch_size}, "
        f"wd={weight_decay}, act={activation_name} "
        f"-> mean best val MSE = {mean_val_mse:.4f} (std={std_val_mse:.4f})"
    )

results_df_A = pd.DataFrame(results)
display(results_df_A.sort_values("mean_best_val_mse").head(10))


#### Analyse tuning results and evaluate best model
The following code will use the top row of the output table (sorted by best val MSE) and use that to retrain and evaluate that specific model.

In [None]:
best_row_A = results_df_A.sort_values("mean_best_val_mse").iloc[0]

best_lr  = float(best_row_A["learning_rate"])
best_bs  = int(best_row_A["batch_size"])
best_wd  = float(best_row_A["weight_decay"])
best_act = str(best_row_A["activation"])

print("[Baseline tuning] Best hyperparameters:")
print(f"  lr        = {best_lr}")
print(f"  batch_size= {best_bs}")
print(f"  weight_decay = {best_wd}")
print(f"  activation   = {best_act}")

model_A_val_losses, model_A_train_losses, model_A_tuned_test_mse = run_modelA(
    learning_rate=best_lr,
    batch_size=best_bs,
    weight_decay=best_wd,
    activation_name=best_act,
    evaluate_model=True,
    extra_info=True
)

print(f"\n[Baseline tuning] Final Tuned Test MSE (Baseline MLP): {model_A_tuned_test_mse:.6f}")

plot_loss_curves(model_A_train_losses, model_A_val_losses, "Baseline MLP (Tuned)")


This hyperparameter tuned model (using LR=0.01, BS=8, WD=0.0001) has the best performance, and results in a test MSE of 18, improved from 25 without tuning.

### Wider + Deeper (Model B)

#### Class definition

In [None]:
class Wider_Deeper_MLP(nn.Module):
    def __init__(self, in_features, out_features):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(in_features, 32),  # hidden layer (wider)
            nn.ReLU(),
            nn.Linear(32, 16),           # hidden layer (deeper)
            nn.ReLU(),
            nn.Linear(16, out_features)  # output layer
        )

        # He (Kaiming) initialisation for ReLU layers
        for m in self.net:
            if isinstance(m, nn.Linear):
                nn.init.kaiming_uniform_(m.weight, nonlinearity="relu")
                nn.init.zeros_(m.bias)

    def forward(self, x):
        return self.net(x)

model_B = Wider_Deeper_MLP(input_dim, output_dim).to(device)
print(model_B)

#### Loss function and optimiser

In [None]:
criterion_B = nn.MSELoss()
optimiser_B = optim.Adam(model_B.parameters(), lr=1e-3)

#### Training loop

In [None]:
num_epochs_B = 200 # max number of epochs
patience_B = 20  # if validation loss doesnt improve after this many epochs, stop.

best_val_loss_B = float("inf")
best_model_state_B = None
epochs_no_improve_B = 0

train_losses_B = []
val_losses_B = []

for epoch in range(1, num_epochs_B + 1):
    train_loss = train_one_epoch(model_B, train_loader, optimiser_B, criterion_B, device)
    val_loss = evaluate(model_B, val_loader, criterion_B, device)

    train_losses_B.append(train_loss)
    val_losses_B.append(val_loss)

    print(f"[Model B] Epoch {epoch:03d} | Train MSE: {train_loss:.6f} | Val MSE: {val_loss:.6f}")

    if val_loss < best_val_loss_B - 1e-6:  # check for improvement accounting for potential noise
        best_val_loss_B = val_loss
        best_model_state_B = model_B.state_dict()
        epochs_no_improve_B = 0
    else:
        epochs_no_improve_B += 1

    if epochs_no_improve_B >= patience_B:
        print(f"[Model B] Early stopping triggered after {epoch} epochs.")
        break

# Load best model weights for Model B
if best_model_state_B is not None:
    model_B.load_state_dict(best_model_state_B)
    print("[Model B] Loaded best model (lowest val MSE).")

test_mse_B = evaluate(model_B, test_loader, criterion_B, device)
print(f"\n[Model B] Final Test MSE: {test_mse_B:.6f}")
plot_loss_curves(train_losses_B, val_losses_B, "Wider + Deeper MLP")


### Wider + Deeper MLP (Model B) Tuning

(Similar hyperparameters as Model A, uses modified code from model A tuning)

#### Model running function

In [None]:
def run_modelB(learning_rate,batch_size,weight_decay, num_epochs=200, patience=20, extra_info=False, evaluate_model=False):

    # new model for each run
    model = Wider_Deeper_MLP(input_dim, output_dim).to(device)

    # create loaders
    train_loader, val_loader, test_loader = loaders_w_batch_size(batch_size)

    criterion  = nn.MSELoss()
    optimiser  = optim.Adam(model.parameters(),lr=learning_rate, weight_decay=weight_decay)

    best_val_loss = float("inf")
    best_train_loss = float("inf")
    best_model_state = None
    epochs_no_improve  = 0

    if evaluate_model:
        train_losses = [] # only used if evaluate_model=True
        val_losses = []  # only used if evaluate_model=True

    for epoch in range(1, num_epochs + 1):
        train_loss = train_one_epoch(model, train_loader, optimiser, criterion, device)
        val_loss   = evaluate(model, val_loader, criterion, device)

        if evaluate_model:
            train_losses.append(train_loss)
            val_losses.append(val_loss)

        if extra_info:
            print(
                f"[Model B tuning] (lr={learning_rate}, bs={batch_size}, wd={weight_decay}) "
                f"Epoch {epoch:03d} | Train MSE: {train_loss:.6f} | Val MSE: {val_loss:.6f}"
            )

        if val_loss < best_val_loss - 1e-6: # check for improvement accounting for potential noise
            best_val_loss     = val_loss
            best_train_loss   = train_loss
            best_model_state = model.state_dict()
            epochs_no_improve = 0
        else:
            epochs_no_improve += 1

        if epochs_no_improve >= patience:
            break

    # Load best model weights for Model
    if best_model_state is not None:
        model.load_state_dict(best_model_state)
        print("[Model B] Loaded best model (lowest val MSE).")

    if evaluate_model:
        test_mse = evaluate(model, test_loader, criterion, device)

        return val_losses, train_losses, test_mse

    return best_val_loss, best_train_loss

#### Hyperparameter Grid Search

In [None]:
learning_rate_grid_B = [1e-2, 3e-3, 1e-3]
batch_size_grid_B    = [8, 16]
weight_decay_grid_B  = [0.0, 1e-4, 1e-3]

hyperparameter_grid_B = list(itertools.product(
    learning_rate_grid_B,
    batch_size_grid_B,
    weight_decay_grid_B,
))

print(f"Total combinations to test for Model B: {len(hyperparameter_grid_B)}")

results_B = []

for learning_rate, batch_size, weight_decay in hyperparameter_grid_B:
    best_val_mse, best_train_mse = run_modelB(
        learning_rate=learning_rate,
        batch_size=batch_size,
        weight_decay=weight_decay,
        num_epochs=200,
        patience=20
    )

    results_B.append({
        "learning_rate": learning_rate,
        "batch_size": batch_size,
        "weight_decay": weight_decay,
        "best_val_mse": best_val_mse,
        "train_mse_at_best_val": best_train_mse,
    })

    print(
        f"[Model B tuning] Done: lr={learning_rate}, bs={batch_size}, wd={weight_decay} "
        f"-> best val MSE = {best_val_mse:.4f}"
    )

results_df_B = pd.DataFrame(results_B)
display(results_df_B.sort_values("best_val_mse").head(10))

#### Best tune plotter

In [None]:
best_row_B = results_df_B.sort_values("best_val_mse").iloc[0]

best_lr_B = float(best_row_B["learning_rate"])
best_bs_B = int(best_row_B["batch_size"])
best_wd_B = float(best_row_B["weight_decay"])

model_B_val_losses, model_B_train_losses, model_B_tuned_test_mse = run_modelB(best_lr_B, best_bs_B, best_wd_B, evaluate_model=True, extra_info=True)
print(f"\n[Model B tuning] Final Tuned Test MSE (Wider + Deeper MLP): {model_B_tuned_test_mse:.6f}")

plot_loss_curves(model_B_train_losses, model_B_val_losses, "Wider + Deeper (Tuned)")


### Regularisation MLP (Model C)

#### Class definition

In [None]:
class regularisation_MLP(nn.Module):
    def __init__(self, in_features, out_features, dropout_p=0.2):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(in_features, 32),
            nn.ReLU(),
            nn.Dropout(dropout_p),
            nn.Linear(32, 16),
            nn.ReLU(),
            nn.Dropout(dropout_p),
            nn.Linear(16, out_features),
        )

        # He (Kaiming) initialisation for ReLU layers
        for m in self.net:
            if isinstance(m, nn.Linear):
                nn.init.kaiming_uniform_(m.weight, nonlinearity="relu")
                nn.init.zeros_(m.bias)

    def forward(self, x):
        return self.net(x)

model_C = regularisation_MLP(input_dim, output_dim).to(device)
print(model_C)


#### Loss function and Optimisation

In [None]:
criterion_C = nn.MSELoss()
optimiser_C = optim.Adam(model_C.parameters(), lr=1e-3)

#### Training Loop

In [None]:
num_epochs_C = 200 # max number of epochs
patience_C = 20  # if validation loss doesnt improve after this many epochs, stop.

best_val_loss_C = float("inf")
best_model_state_C = None
epochs_no_improve_C = 0

train_losses_C = []
val_losses_C = []

for epoch in range(1, num_epochs_C + 1):
    train_loss = train_one_epoch(model_C, train_loader, optimiser_C, criterion_C, device)
    val_loss = evaluate(model_C, val_loader, criterion_C, device)

    train_losses_C.append(train_loss)
    val_losses_C.append(val_loss)

    print(f"[Model C] Epoch {epoch:03d} | Train MSE: {train_loss:.6f} | Val MSE: {val_loss:.6f}")

    if val_loss < best_val_loss_C - 1e-6:  # check for improvement accounting for potential noise
        best_val_loss_C = val_loss
        best_model_state_C = model_C.state_dict()
        epochs_no_improve_C = 0
    else:
        epochs_no_improve_C += 1

    if epochs_no_improve_C >= patience_C:
        print(f"[Model C] Early stopping triggered after {epoch} epochs.")
        break

# Load best model weights for Model C
if best_model_state_C is not None:
    model_C.load_state_dict(best_model_state_C)
    print("[Model C] Loaded best model (lowest val MSE).")

test_mse_C = evaluate(model_C, test_loader, criterion_C, device)
print(f"\n[Model C] Final Test MSE: {test_mse_C:.6f}")
plot_loss_curves(train_losses_C, val_losses_C, "Wider + Deeper MLP")


### Regularisation MLP Tuning

#### Model running function

In [None]:
def run_modelC(learning_rate, batch_size, weight_decay, dropout_p,
                          num_epochs=200, patience=20, extra_info=False, evaluate_model=False):
    """
    Train Model C (ModelC_MLP) with a specific set of hyperparameters and
    return the best validation MSE and corresponding training MSE.
    """

    # Fresh Model C for this run (same architecture as Model B, but with dropout)
    model_C = regularisation_MLP(input_dim, output_dim, dropout_p=dropout_p).to(device)

    # Fresh loaders in case batch size changes
    train_loader, val_loader, test_loader = loaders_w_batch_size(batch_size)

    criterion  = nn.MSELoss()
    optimiser  = optim.Adam(
        model_C.parameters(),
        lr=learning_rate,
        weight_decay=weight_decay,
    )

    best_val_loss      = float("inf")
    best_train_loss    = float("inf")
    best_model_state   = None
    epochs_no_improve  = 0

    if evaluate_model:
        train_losses = [] # only used if evaluate_model=True
        val_losses = []  # only used if evaluate_model=True

    for epoch in range(1, num_epochs + 1):
        train_loss = train_one_epoch(model_C, train_loader, optimiser, criterion, device)
        val_loss   = evaluate(model_C, val_loader, criterion, device)

        if evaluate_model:
            train_losses.append(train_loss)
            val_losses.append(val_loss)

        if extra_info:
            print(
                f"[Model C tuning] (lr={learning_rate}, bs={batch_size}, wd={weight_decay}, p={dropout_p} "
                f"Epoch {epoch:03d} | Train MSE: {train_loss:.6f} | Val MSE: {val_loss:.6f}"
            )

        # Early stopping based on validation loss
        if val_loss < best_val_loss - 1e-6:  # small delta to avoid floating-point noise
            best_val_loss     = val_loss
            best_train_loss   = train_loss
            best_model_state  = model_C.state_dict()
            epochs_no_improve = 0
        else:
            epochs_no_improve += 1

        if epochs_no_improve >= patience:
            break

    # Optionally reload best weights if you want to reuse the model afterwards
    if best_model_state is not None:
        model_C.load_state_dict(best_model_state)

    if evaluate_model:
        test_mse = evaluate(model, test_loader, criterion, device)

        return val_losses, train_losses, test_mse

    return best_val_loss, best_train_loss

#### Hyperparameter Grid

In [None]:
learning_rate_grid_C = [1e-3, 3e-4]
batch_size_grid_C    = [8, 16]
weight_decay_grid_C  = [0.0, 1e-4]
dropout_grid_C       = [0.1, 0.2, 0.3]

hyperparameter_grid_C = list(itertools.product(
    learning_rate_grid_C,
    batch_size_grid_C,
    weight_decay_grid_C,
    dropout_grid_C,
))

print(f"Total combinations to test for Model C: {len(hyperparameter_grid_C)}")

results_C = []

for learning_rate, batch_size, weight_decay, dropout_p in hyperparameter_grid_C:
    best_val_mse, best_train_mse = run_modelC(
        learning_rate=learning_rate,
        batch_size=batch_size,
        weight_decay=weight_decay,
        dropout_p=dropout_p,
        num_epochs=200,
        patience=20
    )

    results_C.append({
        "learning_rate": learning_rate,
        "batch_size": batch_size,
        "weight_decay": weight_decay,
        "dropout_p": dropout_p,
        "best_val_mse": best_val_mse,
        "train_mse_at_best_val": best_train_mse,
    })

    print(
        f"[Model C tuning] Done: lr={learning_rate}, bs={batch_size}, wd={weight_decay}, p={dropout_p} "
        f"-> best val MSE = {best_val_mse:.4f}"
    )

results_df_C = pd.DataFrame(results_C)
display(results_df_C.sort_values("best_val_mse").head(10))

#### Best tune plotter

In [None]:
best_row_C = results_df_C.sort_values("best_val_mse").iloc[0]

best_lr_C = float(best_row_C["learning_rate"])
best_bs_C = int(best_row_C["batch_size"])
best_wd_C = float(best_row_C["weight_decay"])
best_do_C = float(best_row_C["dropout_p"])

model_C_val_losses, model_C_train_losses, model_C_tuned_test_mse = run_modelC(best_lr_C, best_bs_C, best_wd_C, best_do_C, evaluate_model=True, extra_info=True)
print(f"\n[Regularisation tuning] Final Tuned Test MSE (Regularisation MLP): {model_C_tuned_test_mse:.6f}")

plot_loss_curves(model_C_train_losses, model_C_val_losses, "Regularisation (Tuned)")
