<a href="https://colab.research.google.com/github/JinzhiT/5750-project-3/blob/main/project3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Math 5750/6880: Mathematics of Data Science \
Project 3

# 1. Fashion-MNIST image classification using sklearn

In [None]:
from tensorflow.keras.datasets import fashion_mnist
from sklearn.preprocessing import StandardScaler

# Load Fashion-MNIST
# Classes (0-9): T-shirt/top, Trouser, Pullover, Dress, Coat, Sandal, Shirt, Sneaker, Bag, Ankle boot
(X_train, y_train), (X_test, y_test) = fashion_mnist.load_data()
X_train = X_train.reshape(len(X_train), -1)
X_test  = X_test.reshape(len(X_test), -1)

# Scale features
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

In [2]:
from sklearn.neural_network import MLPClassifier
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix

import os, json, time, itertools, shutil
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from tensorflow.keras.datasets import fashion_mnist
from sklearn.preprocessing import StandardScaler
from sklearn.neural_network import MLPClassifier
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from sklearn.model_selection import train_test_split

OUTDIR = "results"
if os.path.exists(OUTDIR):
    # keep previous run as a backup
    shutil.rmtree(OUTDIR)
os.makedirs(OUTDIR, exist_ok=True)
os.makedirs(os.path.join(OUTDIR, "cm"), exist_ok=True)

CLASS_NAMES = [
    "T-shirt/top","Trouser","Pullover","Dress","Coat",
    "Sandal","Shirt","Sneaker","Bag","Ankle boot"
]

print("[INFO] Loading Fashion-MNIST ...")
(X_train, y_train), (X_test, y_test) = fashion_mnist.load_data()
X_train = X_train.reshape(len(X_train), -1).astype("float32")
X_test  = X_test.reshape(len(X_test), -1).astype("float32")

# optional: hold out a validation split from train (so we can use early_stopping='True' w/ sklearn)
X_train, X_val, y_train, y_val = train_test_split(
    X_train, y_train, test_size=0.1, random_state=42, stratify=y_train
)

# Standardize (fit on train only; apply to val/test)
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_val   = scaler.transform(X_val)
X_test  = scaler.transform(X_test)


experiments = [
    # Baseline (small)
    dict(name="baseline_100relu_adam",
         hidden_layer_sizes=(100,),
         activation="relu",
         solver="adam",
         learning_rate_init=0.001,
         alpha=0.0001,
         batch_size=256,
         early_stopping=False,
         max_iter=20,
         tol=1e-4,
         n_iter_no_change=5,
    ),
    # Deeper/wider
    dict(name="deep_256x128x64_relu_adam",
         hidden_layer_sizes=(256,128,64),
         activation="relu",
         solver="adam",
         learning_rate_init=0.001,
         alpha=0.0001,
         batch_size=256,
         early_stopping=True,
         max_iter=50,
         tol=1e-4,
         n_iter_no_change=5,
    ),
    # Tanh
    dict(name="wide_512_tanh_adam",
         hidden_layer_sizes=(512,),
         activation="tanh",
         solver="adam",
         learning_rate_init=0.001,
         alpha=0.0001,
         batch_size=256,
         early_stopping=True,
         max_iter=50,
         tol=1e-4,
         n_iter_no_change=5,
    ),
    # LBFGS solver (quasi-Newton), no mini-batches, ignores early_stopping
    dict(name="lbfgs_200_relu",
         hidden_layer_sizes=(200,),
         activation="relu",
         solver="lbfgs",
         learning_rate_init=0.001,
         alpha=0.0001,
         batch_size=200,
         early_stopping=False,
         max_iter=100,
         tol=1e-5,
         n_iter_no_change=10,
    ),
    # Higher regularization
    dict(name="relu_adam_alpha_0p01",
         hidden_layer_sizes=(256,128),
         activation="relu",
         solver="adam",
         learning_rate_init=0.001,
         alpha=0.01,
         batch_size=256,
         early_stopping=True,
         max_iter=50,
         tol=1e-4,
         n_iter_no_change=5,
    ),
    # Lower learning rate
    dict(name="relu_adam_lr_0p0003",
         hidden_layer_sizes=(256,128),
         activation="relu",
         solver="adam",
         learning_rate_init=3e-4,
         alpha=0.0001,
         batch_size=256,
         early_stopping=True,
         max_iter=50,
         tol=1e-4,
         n_iter_no_change=5,
    ),
]

def train_one(cfg):
    print(f"\n[RUN] {cfg['name']}")
    clf = MLPClassifier(
        hidden_layer_sizes=cfg["hidden_layer_sizes"],
        activation=cfg["activation"],
        solver=cfg["solver"],
        learning_rate_init=cfg["learning_rate_init"],
        alpha=cfg["alpha"],
        batch_size=cfg["batch_size"],
        early_stopping=cfg["early_stopping"],
        max_iter=cfg["max_iter"],
        tol=cfg["tol"],
        n_iter_no_change=cfg["n_iter_no_change"],
        random_state=42,
        verbose=False,
    )

    tic = time.perf_counter()
    clf.fit(X_train, y_train)
    train_time = time.perf_counter() - tic

    # Evaluate on validation & test
    y_val_pred = clf.predict(X_val)
    y_test_pred = clf.predict(X_test)

    val_acc = accuracy_score(y_val, y_val_pred)
    test_acc = accuracy_score(y_test, y_test_pred)

    # Confusion matrix
    cm = confusion_matrix(y_test, y_test_pred, labels=range(len(CLASS_NAMES)))
    fig, ax = plt.subplots(figsize=(6,5))
    im = ax.imshow(cm, interpolation="nearest")
    ax.set_title(f"Confusion Matrix — {cfg['name']}")
    ax.set_xlabel("Predicted")
    ax.set_ylabel("True")
    ax.set_xticks(range(len(CLASS_NAMES))); ax.set_xticklabels(range(len(CLASS_NAMES)))
    ax.set_yticks(range(len(CLASS_NAMES))); ax.set_yticklabels(range(len(CLASS_NAMES)))
    plt.tight_layout()
    cm_path = os.path.join(OUTDIR, "cm", f"cm_{cfg['name']}.png")
    plt.savefig(cm_path, dpi=160)
    plt.close(fig)

    # Classification report
    report = classification_report(y_test, y_test_pred, target_names=CLASS_NAMES, digits=4)

    # Save per-run artifacts
    with open(os.path.join(OUTDIR, f"cls_report_{cfg['name']}.txt"), "w", encoding="utf-8") as f:
        f.write(report)

    meta = dict(cfg)
    meta.update(dict(
        train_time_sec=round(train_time,4),
        val_accuracy=round(val_acc,4),
        test_accuracy=round(test_acc,4),
        classes=CLASS_NAMES,
        confusion_matrix_path=cm_path
    ))
    with open(os.path.join(OUTDIR, f"meta_{cfg['name']}.json"), "w", encoding="utf-8") as f:
        json.dump(meta, f, ensure_ascii=False, indent=2)
    return meta

rows = []
for cfg in experiments:
    rows.append(train_one(cfg))

df = pd.DataFrame(rows)
df = df[[
    "name","hidden_layer_sizes","activation","solver","learning_rate_init","alpha",
    "batch_size","early_stopping","max_iter","tol","n_iter_no_change",
    "train_time_sec","val_accuracy","test_accuracy","confusion_matrix_path"
]]
csv_path = os.path.join(OUTDIR, "results.csv")
df.to_csv(csv_path, index=False)
print("\n[DONE] Wrote:", csv_path)
print("[HINT] Confusion matrices in:", os.path.join(OUTDIR, "cm"))


[INFO] Loading Fashion-MNIST ...

[RUN] baseline_100relu_adam





[RUN] deep_256x128x64_relu_adam

[RUN] wide_512_tanh_adam

[RUN] lbfgs_200_relu


STOP: TOTAL NO. OF ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
  self.n_iter_ = _check_optimize_result("lbfgs", opt_res, self.max_iter)



[RUN] relu_adam_alpha_0p01

[RUN] relu_adam_lr_0p0003

[DONE] Wrote: results/results.csv
[HINT] Confusion matrices in: results/cm


# 3. Fashion-MNIST image classification  using pytorch

In [None]:
import numpy as np
from tensorflow.keras.datasets import fashion_mnist
import torch
from torch.utils.data import TensorDataset, DataLoader

# Load Fashion-MNIST
# Classes (0-9): T-shirt/top, Trouser, Pullover, Dress, Coat, Sandal, Shirt, Sneaker, Bag, Ankle boot
(X_train, y_train), (X_test, y_test) = fashion_mnist.load_data()

# scale to [0,1], add channel dimension -> (N, 1, 28, 28)
X_train = (X_train.astype("float32") / 255.0)[:, None, :, :]
X_test  = (X_test.astype("float32")  / 255.0)[:,  None, :, :]

y_train = y_train.astype(np.int64)
y_test  = y_test.astype(np.int64)

# train/val split: last 10k of train as validation
X_tr, X_val = X_train[:50000], X_train[50000:]
y_tr, y_val = y_train[:50000], y_train[50000:]

# wrap in PyTorch TensorDatasets and DataLoaders
train_ds = TensorDataset(torch.from_numpy(X_tr),  torch.from_numpy(y_tr))
val_ds   = TensorDataset(torch.from_numpy(X_val), torch.from_numpy(y_val))
test_ds  = TensorDataset(torch.from_numpy(X_test), torch.from_numpy(y_test))

train_loader = DataLoader(train_ds, batch_size=128, shuffle=True)
val_loader   = DataLoader(val_ds,   batch_size=256, shuffle=False)
test_loader  = DataLoader(test_ds,  batch_size=256, shuffle=False)

In [None]:
import torch.nn as nn
import torch.optim as optim

import os, time, json, shutil, math, random
from pathlib import Path
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, random_split
from torchvision import datasets, transforms, models
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, accuracy_score, classification_report
import pandas as pd

# In colab, you should ``change runtime type'' to GPU.
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

def seed_all(seed=42):
    random.seed(seed); np.random.seed(seed); torch.manual_seed(seed); torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True; torch.backends.cudnn.benchmark = False
seed_all(42)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

OUTDIR = Path("pt_results")
CM_DIR = OUTDIR / "cm"
if OUTDIR.exists():
    shutil.rmtree(OUTDIR)
CM_DIR.mkdir(parents=True, exist_ok=True)

CLASS_NAMES = ["T-shirt/top","Trouser","Pullover","Dress","Coat","Sandal","Shirt","Sneaker","Bag","Ankle boot"]
NUM_CLASSES = 10

FM_MEAN, FM_STD = 0.2860406, 0.35302424
tfm_base = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((FM_MEAN,), (FM_STD,))
])

train_full = datasets.FashionMNIST(root="data", train=True, download=True, transform=tfm_base)
test_ds     = datasets.FashionMNIST(root="data", train=False, download=True, transform=tfm_base)

# Validation split: 54k train / 6k val (10%)
val_size = 6000
train_size = len(train_full) - val_size
train_ds, val_ds = random_split(train_full, [train_size, val_size], generator=torch.Generator().manual_seed(42))

train_loader = DataLoader(train_ds, batch_size=128, shuffle=True, num_workers=2, pin_memory=torch.cuda.is_available())
val_loader   = DataLoader(val_ds, batch_size=256, shuffle=False, num_workers=2, pin_memory=torch.cuda.is_available())
test_loader  = DataLoader(test_ds, batch_size=256, shuffle=False, num_workers=2, pin_memory=torch.cuda.is_available())

# For transfer learning (ResNet): we need 3 channels and larger spatial size
tfm_tl = transforms.Compose([
    transforms.Resize(224),
    transforms.Grayscale(num_output_channels=3),  # from 1->3 channels
    transforms.ToTensor(),
    transforms.Normalize(mean=(0.485,0.456,0.406), std=(0.229,0.224,0.225)),  # ImageNet stats
])
train_full_tl = datasets.FashionMNIST(root="data", train=True, download=True, transform=tfm_tl)
test_ds_tl     = datasets.FashionMNIST(root="data", train=False, download=True, transform=tfm_tl)
train_ds_tl, val_ds_tl = random_split(train_full_tl, [train_size, val_size], generator=torch.Generator().manual_seed(42))
train_loader_tl = DataLoader(train_ds_tl, batch_size=128, shuffle=True, num_workers=2, pin_memory=torch.cuda.is_available())
val_loader_tl   = DataLoader(val_ds_tl, batch_size=256, shuffle=False, num_workers=2, pin_memory=torch.cuda.is_available())
test_loader_tl  = DataLoader(test_ds_tl, batch_size=256, shuffle=False, num_workers=2, pin_memory=torch.cuda.is_available())

class MLP(nn.Module):
    def __init__(self, hidden=(256,128)):
        super().__init__()
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(28*28, hidden[0])
        self.fc2 = nn.Linear(hidden[0], hidden[1])
        self.out = nn.Linear(hidden[1], NUM_CLASSES)

    def forward(self, x):
        x = self.flatten(x)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, p=0.2, training=self.training)
        x = F.relu(self.fc2(x))
        x = F.dropout(x, p=0.2, training=self.training)
        return self.out(x)

class SmallCNN(nn.Module):
    def __init__(self):
        super().__init__()
        # 1x28x28 -> 16x14x14 -> 32x7x7 -> FC
        self.conv1 = nn.Conv2d(1, 16, 3, padding=1)       # -> 16x28x28
        self.pool  = nn.MaxPool2d(2,2)                    # -> 16x14x14
        self.conv2 = nn.Conv2d(16, 32, 3, padding=1)      # -> 32x14x14
        self.pool2 = nn.MaxPool2d(2,2)                    # -> 32x7x7
        self.fc1   = nn.Linear(32*7*7, 128)
        self.out   = nn.Linear(128, NUM_CLASSES)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = self.pool(x)
        x = F.relu(self.conv2(x))
        x = self.pool2(x)
        x = torch.flatten(x, 1)
        x = F.dropout(F.relu(self.fc1(x)), p=0.3, training=self.training)
        return self.out(x)

def make_resnet18_finetune():
    # torchvision >= 0.13 style weights
    try:
        weights = models.ResNet18_Weights.DEFAULT
        net = models.resnet18(weights=weights)
    except:
        net = models.resnet18(pretrained=True)
    # Replace classifier
    in_features = net.fc.in_features
    net.fc = nn.Linear(in_features, NUM_CLASSES)
    return net

@torch.no_grad()
def evaluate(model, loader, device):
    model.eval()
    all_logits, all_y = [], []
    for x, y in loader:
        x, y = x.to(device), y.to(device)
        logits = model(x)
        all_logits.append(logits.detach().cpu())
        all_y.append(y.detach().cpu())
    logits = torch.cat(all_logits)
    y_true = torch.cat(all_y).numpy()
    y_pred = logits.argmax(1).numpy()
    acc = accuracy_score(y_true, y_pred)
    return acc, y_true, y_pred

def plot_cm(y_true, y_pred, title, save_path):
    cm = confusion_matrix(y_true, y_pred, labels=list(range(NUM_CLASSES)))
    fig, ax = plt.subplots(figsize=(6,5))
    im = ax.imshow(cm, interpolation='nearest')
    ax.set_title(title)
    ax.set_xlabel("Predicted"); ax.set_ylabel("True")
    ax.set_xticks(range(NUM_CLASSES)); ax.set_xticklabels(range(NUM_CLASSES))
    ax.set_yticks(range(NUM_CLASSES)); ax.set_yticklabels(range(NUM_CLASSES))
    plt.tight_layout()
    fig.savefig(save_path, dpi=160)
    plt.close(fig)

def fit_model(model, train_loader, val_loader, test_loader, config):
    model = model.to(device)
    criterion = nn.CrossEntropyLoss()
    if config["optim"] == "adam":
        opt = torch.optim.Adam(model.parameters(), lr=config["lr"], weight_decay=config["weight_decay"])
    elif config["optim"] == "sgd":
        opt = torch.optim.SGD(model.parameters(), lr=config["lr"], momentum=0.9, weight_decay=config["weight_decay"])
    else:
        raise ValueError("Unknown optim")

    best_val = 0.0
    best_state = None
    epochs_no_improve = 0
    tic = time.perf_counter()
    for epoch in range(1, config["max_epochs"]+1):
        model.train()
        for x, y in train_loader:
            x, y = x.to(device), y.to(device)
            logits = model(x)
            loss = criterion(logits, y)
            opt.zero_grad(); loss.backward(); opt.step()

        val_acc, _, _ = evaluate(model, val_loader, device)
        if val_acc > best_val + 1e-4:
            best_val = val_acc
            best_state = {k:v.cpu().clone() for k,v in model.state_dict().items()}
            epochs_no_improve = 0
        else:
            epochs_no_improve += 1
            if config["early_stop"] and epochs_no_improve >= config["patience"]:
                break

    train_time = time.perf_counter() - tic

    # load best and evaluate on test
    if best_state is not None:
        model.load_state_dict({k:v.to(device) for k,v in best_state.items()})
    val_acc, _, _ = evaluate(model, val_loader, device)
    test_acc, y_true, y_pred = evaluate(model, test_loader, device)
    return train_time, val_acc, test_acc, y_true, y_pred, model

experiments = [
    dict(name="pt_mlp_adam", kind="mlp",
         optim="adam", lr=1e-3, weight_decay=1e-4,
         max_epochs=15, early_stop=True, patience=3),
    dict(name="pt_cnn_adam", kind="cnn",
         optim="adam", lr=1e-3, weight_decay=1e-4,
         max_epochs=15, early_stop=True, patience=3),
    dict(name="pt_resnet18_finetune", kind="resnet18",
         optim="adam", lr=1e-4, weight_decay=1e-4,    # lower LR for fine-tuning
         max_epochs=8, early_stop=True, patience=2),
]

rows = []
for cfg in experiments:
    print(f"\n[RUN] {cfg['name']} ({cfg['kind']})  on {device}")
    if cfg["kind"] == "mlp":
        model = MLP(hidden=(256,128))
        tr_loader, va_loader, te_loader = train_loader, val_loader, test_loader
    elif cfg["kind"] == "cnn":
        model = SmallCNN()
        tr_loader, va_loader, te_loader = train_loader, val_loader, test_loader
    elif cfg["kind"] == "resnet18":
        model = make_resnet18_finetune()
        model = model  # already 3-channel
        tr_loader, va_loader, te_loader = train_loader_tl, val_loader_tl, test_loader_tl
    else:
        raise ValueError

    train_time, val_acc, test_acc, y_true, y_pred, model = fit_model(
        model, tr_loader, va_loader, te_loader, cfg
    )

    cm_path = CM_DIR / f"cm_{cfg['name']}.png"
    plot_cm(y_true, y_pred, f"Confusion Matrix — {cfg['name']}", cm_path)
    report = classification_report(y_true, y_pred, target_names=CLASS_NAMES, digits=4)
    with open(OUTDIR / f"cls_report_{cfg['name']}.txt", "w") as f:
        f.write(report)

    row = dict(
        name=cfg["name"],
        kind=cfg["kind"],
        optimizer=cfg["optim"],
        lr=cfg["lr"],
        weight_decay=cfg["weight_decay"],
        max_epochs=cfg["max_epochs"],
        early_stop=cfg["early_stop"],
        patience=cfg["patience"],
        train_time_sec=round(train_time,4),
        val_accuracy=round(float(val_acc),4),
        test_accuracy=round(float(test_acc),4),
        confusion_matrix_path=str(cm_path),
    )
    rows.append(row)

df = pd.DataFrame(rows)
df = df[["name","kind","optimizer","lr","weight_decay","max_epochs","early_stop","patience",
         "train_time_sec","val_accuracy","test_accuracy","confusion_matrix_path"]]
OUTDIR.mkdir(exist_ok=True, parents=True)
csv_path = OUTDIR / "results.csv"
df.to_csv(csv_path, index=False)
print("\n[DONE] Wrote:", csv_path)
print("[HINT] Confusion matrices in:", CM_DIR)
