In [57]:
import itertools
import math
import random
from dataclasses import dataclass

import torch
from torch import nn
from torch.utils.data import DataLoader, Subset, random_split
from torchvision import datasets, transforms
from sklearn.metrics import accuracy_score

import time
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import warnings

warnings.filterwarnings("ignore")

In [58]:
# MacoOS device agnostic code:

if torch.cuda.is_available():
    device = "cuda" # Use NVIDIA GPU (if available)
elif torch.backends.mps.is_available():
    device = "mps" # Use Apple Silicon GPU (if available)
else:
    device = "cpu" # Default to CPU if no GPU is available

device

'mps'

In [59]:
# 2) Data

transform = transforms.Compose([
    transforms.ToTensor()
    , transforms.Normalize((0.5,), (0.5,))
])

train_full = datasets.FashionMNIST(
    root="./data"
    , train=True
    , download=True
    , transform=transform
)
test_ds = datasets.FashionMNIST(
    root="./data"
    , train=False
    , download=True
    , transform=transform
)

In [60]:
torch.manual_seed(27)
random.seed(27)
np.random.seed(27)


# 1) Model factory so we rebuild fresh models each run
class SimpleCNN(nn.Module):
    def __init__(self, channels=32, dropout=0.25):
        super().__init__()
        self.features = nn.Sequential(
            nn.Conv2d(1, channels, kernel_size=3, padding=1)
            , nn.ReLU(inplace=True)
            , nn.MaxPool2d(2)
            , nn.Conv2d(channels, channels * 2, kernel_size=3, padding=1)
            , nn.ReLU(inplace=True)
            , nn.MaxPool2d(2)
        )
        self.classifier = nn.Sequential(
            nn.Flatten()
            , nn.Dropout(dropout)
            , nn.Linear((channels * 2) * 7 * 7, 128)
            , nn.ReLU(inplace=True)
            , nn.Dropout(dropout)
            , nn.Linear(128, 10)
        )

    def forward(self, x):
        return self.classifier(self.features(x))

In [61]:
# 3) Training + evaluation helpers
def train_one_epoch(model, loader, criterion, optimizer):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    for X, y in loader:
        X = X.to(device)
        y = y.to(device)

        optimizer.zero_grad(set_to_none=True)
        logits = model(X)
        loss = criterion(logits, y)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * X.size(0)
        preds = logits.argmax(dim=1)
        correct += (preds == y).sum().item()
        total += y.size(0)

    return running_loss / total, correct / total

@torch.no_grad()
def evaluate(model, loader, criterion=None):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0
    for X, y in loader:
        X = X.to(device)
        y = y.to(device)
        logits = model(X)
        if criterion is not None:
            loss = criterion(logits, y)
            running_loss += loss.item() * X.size(0)
        preds = logits.argmax(dim=1)
        correct += (preds == y).sum().item()
        total += y.size(0)
    loss = running_loss / total if criterion is not None else math.nan
    acc = correct / total
    return loss, acc

def make_optimizer(name, params, lr, weight_decay):
    if name == "adam":
        return torch.optim.Adam(
            params
            , lr=lr
            , weight_decay=weight_decay
        )
    elif name == "sgd":
        return torch.optim.SGD(
            params
            , lr=lr
            , momentum=0.9
            , weight_decay=weight_decay
        )
    else:
        raise ValueError(f"Unknown optimizer {name}")

In [62]:
# 4) K-fold split indices

def kfold_indices(n, k=3, seed=27):
    
    rng = np.random.default_rng(seed)
    indices = np.arange(n)
    rng.shuffle(indices)
    folds = np.array_split(indices, k)
    
    for i in range(k):
        val_idx = folds[i]
        train_idx = np.concatenate([folds[j] for j in range(k) if j != i])
        yield train_idx, val_idx

In [66]:
# 5) Hyperparameter grid

# 6e-4, 1e-3, 6e-3, 1e-2, 6e-2

param_grid = {
    "channels": [32, 64]
    , "dropout": [0.0, 0.25, 0.5]
    , "lr": [6e-4, 1e-3, 6e-3, 1e-2]
    , "weight_decay": [0.0, 1e-4]
    , "batch_size": [128]
    , "optimizer": ["adam", "sgd"]
    , "epochs": [3]  # keep small for demo
}

def param_product(grid):
    keys = list(grid.keys())
    for values in itertools.product(*(grid[k] for k in keys)):
        yield dict(zip(keys, values))

In [None]:
"""
param_grid = {
    "channels": [16, 32, 64]
    , "dropout": [0.0, 0.25, 0.5]
    , "lr": [1e-3, 3e-4]
    , "weight_decay": [0.0, 1e-4]
    , "batch_size": [64, 128]
    , "optimizer": ["adam", "sgd"]
    , "epochs": [5]  # keep small for demo
}

def product(grid):
    keys = list(grid.keys())
    for values in itertools.product(*(grid[k] for k in keys)):
        yield dict(zip(keys, values))
"""

In [67]:
import time
import tqdm.auto as tqdm
import itertools
# from itertools import product

In [68]:
%%time

# 6) Run grid search with K-fold CV

best_score = -1.0
best_params = None

for params in param_product(param_grid):
    cv_scores = []

    for train_idx, val_idx in kfold_indices(len(train_full), k=3, seed=27):
        train_ds = Subset(train_full, train_idx)
        val_ds = Subset(train_full, val_idx)

        train_loader = DataLoader(
            train_ds
            , batch_size=params["batch_size"]
            , shuffle=True
            , num_workers=2
            , pin_memory=(device == "mps")
        )
        val_loader = DataLoader(
            val_ds
            , batch_size=256
            , shuffle=False
            , num_workers=2
            , pin_memory=(device == "mps")
        )

        model = SimpleCNN(
            channels=params["channels"]
            , dropout=params["dropout"]
        ).to(device)

        criterion = nn.CrossEntropyLoss()
        optimizer = make_optimizer(
            params["optimizer"]
            , model.parameters()
            , lr=params["lr"]
            , weight_decay=params["weight_decay"]
        )

        for epoch in range(params["epochs"]):
            train_one_epoch(model, train_loader, criterion, optimizer)

        _, val_acc = evaluate(model, val_loader, criterion=None)
        cv_scores.append(val_acc)

    mean_cv = float(np.mean(cv_scores))

    if mean_cv > best_score:
        best_score = mean_cv
        best_params = params
        print("New best:", best_score, "with", best_params)

print("Best CV accuracy:", best_score)
print("Best params:", best_params)

New best: 0.88835 with {'channels': 32, 'dropout': 0.0, 'lr': 0.0006, 'weight_decay': 0.0, 'batch_size': 128, 'optimizer': 'adam', 'epochs': 3}
New best: 0.8964166666666666 with {'channels': 32, 'dropout': 0.0, 'lr': 0.001, 'weight_decay': 0.0, 'batch_size': 128, 'optimizer': 'adam', 'epochs': 3}
New best: 0.8977666666666666 with {'channels': 32, 'dropout': 0.0, 'lr': 0.006, 'weight_decay': 0.0001, 'batch_size': 128, 'optimizer': 'adam', 'epochs': 3}


KeyboardInterrupt: 

In [None]:
# 7) Retrain on full training set with best params
final_model = SimpleCNN(
    channels=best_params["channels"]
    , dropout=best_params["dropout"]
).to(device)

final_train_loader = DataLoader(
    train_full
    , batch_size=best_params["batch_size"]
    , shuffle=True
    , num_workers=2
    , pin_memory=(device == "mps")
)

criterion = nn.CrossEntropyLoss()
optimizer = make_optimizer(
    best_params["optimizer"]
    , final_model.parameters()
    , lr=best_params["lr"]
#     , weight_decay=best_params["weight_decay"]
)

for epoch in range(best_params["epochs"]):
    train_one_epoch(final_model, final_train_loader, criterion, optimizer)

In [None]:
# 8) Test evaluation
test_loader = DataLoader(
    test_ds
    , batch_size=256
    , shuffle=False
    , num_workers=2
    , pin_memory=(device == "mps")
)
_, test_acc = evaluate(final_model, test_loader)
print("Test accuracy:", test_acc)

In [55]:
%%time

# Count total epochs to train for a single global bar
total_epochs = 0
for params in product(param_grid):
    for _ in kfold_indices(len(train_full), k=3, seed=27):
        total_epochs += params["epochs"]

# Reset the product iterator (it was exhausted by counting)
best_score = -1.0
best_params = None

with tqdm(total=total_epochs, desc="Training (all runs)") as pbar:
    for params in product(param_grid):
        cv_scores = []

        for train_idx, val_idx in kfold_indices(len(train_full), k=3, seed=27):
            train_ds = Subset(train_full, train_idx)
            val_ds = Subset(train_full, val_idx)

            train_loader = DataLoader(
                train_ds,
                batch_size=params["batch_size"],
                shuffle=True,
                num_workers=2,
                pin_memory=(device == "mps"),
            )
            val_loader = DataLoader(
                val_ds,
                batch_size=256,
                shuffle=False,
                num_workers=2,
                pin_memory=(device == "mps"),
            )

            model = SimpleCNN(
                channels=params["channels"],
                dropout=params["dropout"],
            ).to(device)

            criterion = nn.CrossEntropyLoss()
            optimizer = make_optimizer(
                params["optimizer"],
                model.parameters(),
                lr=params["lr"],
                weight_decay=params["weight_decay"],
            )

            for epoch in range(params["epochs"]):
                train_one_epoch(model, train_loader, criterion, optimizer)
                pbar.update(1)  # advance global bar by one epoch

            _, val_acc = evaluate(model, val_loader, criterion=None)
            cv_scores.append(val_acc)

        mean_cv = float(np.mean(cv_scores))

        if mean_cv > best_score:
            best_score = mean_cv
            best_params = params
            tqdm.write(f"New best: {best_score} with {best_params}")

print("Best CV accuracy:", best_score)
print("Best params:", best_params)

TypeError: tuple indices must be integers or slices, not str