In [None]:
import math, time, copy, os, sys
from dataclasses import dataclass
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, random_split, TensorDataset
from torchvision import datasets, transforms
import matplotlib.pyplot as plt
import torch
import numpy as np

# Determinism
SEED = 42
torch.manual_seed(SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(SEED)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", DEVICE)

In [None]:
class MnistDataloader(object):
    def __init__(self, training_images_filepath, training_labels_filepath,
                 test_images_filepath, test_labels_filepath):
        self.training_images_filepath = training_images_filepath
        self.training_labels_filepath = training_labels_filepath
        self.test_images_filepath = test_images_filepath
        self.test_labels_filepath = test_labels_filepath
    
    def read_images_labels(self, images_filepath, labels_filepath):        
        from array import array
        import struct
        
        with open(labels_filepath, 'rb') as file:
            magic, size = struct.unpack(">II", file.read(8))
            assert magic == 2049, f'Expected 2049, got {magic}'
            labels = np.frombuffer(file.read(), dtype=np.uint8)
        
        with open(images_filepath, 'rb') as file:
            magic, size, rows, cols = struct.unpack(">IIII", file.read(16))
            assert magic == 2051, f'Expected 2051, got {magic}'
            images = np.frombuffer(file.read(), dtype=np.uint8).reshape(size, rows, cols)
        
        return images, labels
            
    def load_data(self):
        x_train, y_train = self.read_images_labels(self.training_images_filepath, self.training_labels_filepath)
        x_test, y_test   = self.read_images_labels(self.test_images_filepath, self.test_labels_filepath)
        return (x_train, y_train), (x_test, y_test)

In [None]:
# ==== FIXED HYPERPARAMETERS (DO NOT CHANGE) ====
EPOCHS = 20
BATCH_SIZE = 256

# Optimizer hyperparams
HP = {
    "momentum": {"lr": 0.05, "momentum": 0.9},
    "amsgrad": {"lr": 0.03, "beta1": 0.9, "beta2": 0.999, "eps": 1e-8},
    "rmsprop": {"lr": 1e-3, "alpha": 0.99, "eps": 1e-8},
    "adam": {"lr": 1e-3, "beta1": 0.9, "beta2": 0.999, "eps": 1e-8},
}

In [None]:
loader = MnistDataloader(
    "/kaggle/input/mnist-dataset/train-images-idx3-ubyte/train-images-idx3-ubyte",
    "/kaggle/input/mnist-dataset/train-labels-idx1-ubyte/train-labels-idx1-ubyte",
    "/kaggle/input/mnist-dataset/t10k-images-idx3-ubyte/t10k-images-idx3-ubyte",
    "/kaggle/input/mnist-dataset/t10k-labels-idx1-ubyte/t10k-labels-idx1-ubyte"
)

(x_train, y_train), (x_test, y_test) = loader.load_data()

# Convert to torch tensors
x_train = torch.tensor(x_train, dtype=torch.float32).unsqueeze(1) / 255.0
y_train = torch.tensor(y_train, dtype=torch.long)
x_test  = torch.tensor(x_test, dtype=torch.float32).unsqueeze(1) / 255.0
y_test  = torch.tensor(y_test, dtype=torch.long)

# Full train dataset
full_train = TensorDataset(x_train, y_train)

# Make a train/val split (55k / 5k)
train_set, val_set = random_split(full_train, [55000, 5000])

# Test dataset
test_set = TensorDataset(x_test, y_test)

# Dataloaders
train_loader = DataLoader(train_set, batch_size=BATCH_SIZE, shuffle=True)
val_loader   = DataLoader(val_set,   batch_size=BATCH_SIZE, shuffle=False)
test_loader  = DataLoader(test_set,  batch_size=BATCH_SIZE, shuffle=False)

print(len(train_set), len(val_set), len(test_set))


In [None]:
# ==== MODEL (fixed small CNN) ====
class MNISTNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, padding=0)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=0)
        self.pool = nn.MaxPool2d(2, 2)
        self.drop1 = nn.Dropout(0.25)
        self.drop2 = nn.Dropout(0.5)
        # For 28x28 -> after two 3x3 convs (no pad), size 24x24 -> pool -> 12x12 -> 64ch -> 64*12*12 = 9216
        self.fc1 = nn.Linear(64 * 12 * 12, 128)
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = self.pool(x)
        x = self.drop1(x)
        x = torch.flatten(x, 1)
        x = F.relu(self.fc1(x))
        x = self.drop2(x)
        x = self.fc2(x)
        return x

def accuracy(logits, targets):
    preds = logits.argmax(dim=1)
    return (preds == targets).float().mean().item()

model = MNISTNet().to(DEVICE)

In [None]:
class BaseOptimizer:
    def __init__(self, params):
        # Expect a list of parameter tensors with .data and .grad
        self.params = [p for p in params if p.requires_grad]
        self.state = {}  # dict[param] = per-param buffers

    @torch.no_grad()
    def zero_grad(self):
        for p in self.params:
            if p.grad is not None:
                p.grad.zero_()

    def step(self):
        raise NotImplementedError

In [None]:
class MomentumSGD(BaseOptimizer):
    def __init__(self, params, lr=0.05, momentum=0.9):
        super().__init__(params)
        self.lr = lr          # η
        self.momentum = momentum  # γ
        for p in self.params:
            self.state[p] = {"v": torch.zeros_like(p)}

    @torch.no_grad()
    def step(self):
        for p in self.params:
            if p.grad is None:
                continue
            g = p.grad
            st = self.state[p]
            v = st["v"]
            # v_t = γ v_{t-1} + η g_t
            v.mul_(self.momentum).add_(g, alpha=self.lr)
            # θ ← θ - v_t
            p.add_(v, alpha=-1.0)

In [None]:
class AMSGrad(BaseOptimizer):
    def __init__(self, params, lr=0.03, beta1=0.9, beta2=0.999, eps=1e-8):
        super().__init__(params)
        self.lr, self.beta1, self.beta2, self.eps = lr, beta1, beta2, eps
        for p in self.params:
            self.state[p] = {
                "t": 0,
                "m": torch.zeros_like(p),
                "v": torch.zeros_like(p),
                "vmax": torch.zeros_like(p)  # \tilde v_t
            }

    @torch.no_grad()
    def step(self):
        b1, b2, eps = self.beta1, self.beta2, self.eps
        for p in self.params:
            if p.grad is None:
                continue
            g = p.grad
            st = self.state[p]
            st["t"] += 1
            t = st["t"]
            m, v, vmax = st["m"], st["v"], st["vmax"]

            # m_t, v_t
            m.mul_(b1).add_(g, alpha=1 - b1)
            v.mul_(b2).addcmul_(g, g, value=1 - b2)

            # mhat_t, vhat_t
            mhat = m / (1 - b1**t)
            vhat = v / (1 - b2**t)

            # \tilde v_t = max(\tilde v_{t-1}, vhat_t)
            torch.maximum(vmax, vhat, out=vmax)

            # θ update
            p.addcdiv_(mhat, vmax.sqrt().add_(eps), value=-self.lr)

In [None]:
class RMSProp(BaseOptimizer):
    def __init__(self, params, lr=1e-3, alpha=0.9, eps=1e-8):
        super().__init__(params)
        self.lr = lr
        self.alpha = alpha
        self.eps = eps
        for p in self.params:
            self.state[p] = {"Eg2": torch.zeros_like(p)}  # E[g^2]

    @torch.no_grad()
    def step(self):
        a, eps = self.alpha, self.eps
        for p in self.params:
            if p.grad is None:
                continue
            g = p.grad
            Eg2 = self.state[p]["Eg2"]
            # E[g^2]_t = α E[g^2]_{t-1} + (1-α) g_t^2
            Eg2.mul_(a).addcmul_(g, g, value=1 - a)
            # θ ← θ - η g / sqrt(E[g^2]_t + ε)
            denom = (Eg2 + eps).sqrt()
            p.addcdiv_(g, denom, value=-self.lr)

In [None]:
class Adam(BaseOptimizer):
    def __init__(self, params, lr=1e-3, beta1=0.9, beta2=0.999, eps=1e-8):
        super().__init__(params)
        self.lr, self.beta1, self.beta2, self.eps = lr, beta1, beta2, eps
        for p in self.params:
            self.state[p] = {"t": 0, "m": torch.zeros_like(p), "v": torch.zeros_like(p)}

    @torch.no_grad()
    def step(self):
        b1, b2, eps = self.beta1, self.beta2, self.eps
        for p in self.params:
            if p.grad is None:
                continue
            g = p.grad
            st = self.state[p]
            st["t"] += 1
            t = st["t"]
            m, v = st["m"], st["v"]

            # m_t, v_t
            m.mul_(b1).add_(g, alpha=1 - b1)
            v.mul_(b2).addcmul_(g, g, value=1 - b2)

            # bias-corrected
            mhat = m / (1 - b1**t)
            vhat = v / (1 - b2**t)

            # θ update
            p.addcdiv_(mhat, vhat.sqrt().add_(eps), value=-self.lr)

In [None]:
def run_epoch(model, loader, optimizer=None):
    is_train = optimizer is not None
    model.train(is_train)
    total_loss, total_acc, total_count = 0.0, 0.0, 0

    for x, y in loader:
        x, y = x.to(DEVICE), y.to(DEVICE)
        if is_train:
            optimizer.zero_grad()
        logits = model(x)
        loss = F.cross_entropy(logits, y)
        if is_train:
            loss.backward()
            optimizer.step()

        bs = y.size(0)
        total_loss += loss.item() * bs
        total_acc += (logits.argmax(dim=1) == y).float().sum().item()
        total_count += bs

    return total_loss / total_count, total_acc / total_count

def train_one_optimizer(opt_name, OptClass, hp):
    model = MNISTNet().to(DEVICE)
    opt = OptClass(model.parameters(), **hp)
    history = {"time": [], "train_loss": [], "train_acc": [], "val_loss": [], "val_acc": []}

    t0 = time.time()
    for epoch in range(1, EPOCHS + 1):
        tr_loss, tr_acc = run_epoch(model, train_loader, opt)
        va_loss, va_acc = run_epoch(model, val_loader, None)

        t = time.time() - t0
        history["time"].append(t)
        history["train_loss"].append(tr_loss); history["train_acc"].append(tr_acc)
        history["val_loss"].append(va_loss);   history["val_acc"].append(va_acc)

        print(f"[{opt_name}] epoch {epoch}/{EPOCHS}  "
              f"train_loss={tr_loss:.4f} acc={tr_acc:.4f}  "
              f"val_loss={va_loss:.4f} acc={va_acc:.4f}  time={t:.1f}s")

    return model, history

In [None]:
optimizers = {
    "Momentum": (MomentumSGD, HP["momentum"]),
    "AMSGrad": (AMSGrad, HP["amsgrad"]),
    "RMSProp": (RMSProp, HP["rmsprop"]),
    "Adam": (Adam, HP["adam"]),
}

all_hist = {}
for name, (OptClass, hp) in optimizers.items():
    _, hist = train_one_optimizer(name, OptClass, hp)
    all_hist[name] = hist

In [None]:
# === Plot: one page with 2x2 subplots; each subplot shows Train/Val Loss (left y) and Accuracy (right y) vs Epoch ===
fig, axes = plt.subplots(2, 2, figsize=(12, 8))
order = ["Momentum", "AMSGrad", "RMSProp", "Adam"]

for ax, name in zip(axes.ravel(), order):
    hist = all_hist[name]
    epochs = list(range(1, len(hist["train_loss"]) + 1))
    # Left y: Loss
    ax.plot(epochs, hist["train_loss"], label="Train Loss")
    ax.plot(epochs, hist["val_loss"], label="Val Loss")
    ax.set_xlabel("Epoch")
    ax.set_ylabel("Loss")
    ax.set_title(name)

    # Right y: Accuracy
    ax2 = ax.twinx()
    ax2.plot(epochs, hist["train_acc"], linestyle="--", label="Train Acc")
    ax2.plot(epochs, hist["val_acc"], linestyle="--", label="Val Acc")
    ax2.set_ylabel("Accuracy")

# Single legend outside
handles1, labels1 = axes[0,0].get_legend_handles_labels()
handles2, labels2 = axes[0,0].twinx().get_legend_handles_labels()
fig.legend(handles1 + handles2, labels1 + labels2, loc="upper center", ncol=4)
fig.suptitle("MNIST: Loss & Accuracy vs Epoch for Four Optimizers", y=0.98)
fig.tight_layout(rect=[0, 0, 1, 0.95])
plt.show()

In [None]:
# === Page-2 Summary Helper ===
import pandas as pd
rows = []
for name in ["Momentum", "AMSGrad", "RMSProp", "Adam"]:
    h = all_hist[name]
    rows.append({
        "optimizer": name,
        "final_train_acc": h["train_acc"][-1],
        "final_val_acc": h["val_acc"][-1],
        "final_train_loss": h["train_loss"][-1],
        "final_val_loss": h["val_loss"][-1],
        "total_time_s": h["time"][-1],
    })
summary_df = pd.DataFrame(rows)
summary_df

In [None]:
# === Plotting (single figure with 4 subplots) ===
def plot_all(histories, save_path=None):
    import matplotlib.pyplot as plt
    plt.figure(figsize=(12, 9))

    # Subplot 1: Train Accuracy
    plt.subplot(2, 2, 1)
    for name, h in histories.items():
        plt.plot(h["train_acc"], label=name)
    plt.xlabel("Epoch"); plt.ylabel("Train Acc"); plt.title("Training Accuracy"); plt.legend()

    # Subplot 2: Val Accuracy
    plt.subplot(2, 2, 2)
    for name, h in histories.items():
        plt.plot(h["val_acc"], label=name)
    plt.xlabel("Epoch"); plt.ylabel("Val Acc"); plt.title("Validation Accuracy"); plt.legend()

    # Subplot 3: Train Loss
    plt.subplot(2, 2, 3)
    for name, h in histories.items():
        plt.plot(h["train_loss"], label=name)
    plt.xlabel("Epoch"); plt.ylabel("Train Loss"); plt.title("Training Loss"); plt.legend()

    # Subplot 4: Val Loss
    plt.subplot(2, 2, 4)
    for name, h in histories.items():
        plt.plot(h["val_loss"], label=name)
    plt.xlabel("Epoch"); plt.ylabel("Val Loss"); plt.title("Validation Loss"); plt.legend()

    plt.tight_layout()
    if save_path is not None:
        plt.savefig(save_path, dpi=200, bbox_inches="tight")
    # Also save to Kaggle working path if available
    try:
        plt.savefig("/kaggle/working/hw4_all_plots.png", dpi=200, bbox_inches="tight")
    except Exception:
        pass
    try:
        plt.savefig("/mnt/data/hw4_all_plots.png", dpi=200, bbox_inches="tight")
    except Exception:
        pass
    plt.show()

# If histories dict 'all_hist' exists, plot now
try:
    _ = plot_all(all_hist, save_path="hw4_all_plots.png")
except NameError:
    pass