# Assignment 1 - [Fazin Faizal]_[a1954220]

# Task 0: Data Visualization (0%, but encourage for understanding data)

In [10]:
# Setup: imports, reproducibility, device
from pathlib import Path
import random, numpy as np, pandas as pd
from PIL import Image
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms

def set_seed(seed: int = 1954220):
    random.seed(seed); np.random.seed(seed); torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)
        torch.backends.cudnn.deterministic = True
        torch.backends.cudnn.benchmark = False

set_seed(1954220)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device)

# Relative paths
DATA_DIR = Path(".")
TRAIN_IMG_DIR = DATA_DIR / "train"
TEST_IMG_DIR  = DATA_DIR / "test"
TRAIN_CSV = DATA_DIR / "Train_label.csv"
TEST_CSV  = DATA_DIR / "Test_label.csv"


Device: cpu


Import required libraries, fix random seeds for reproducibility, and select cuda if available (else CPU). Paths are relative so the notebook runs on any machine with the same folder structure.

# Task 1: Data Preprocessing

### 1.1 Data Cleaning (2%)

In [11]:
# Read labels; CSVs have no headers
train_df_raw = pd.read_csv(TRAIN_CSV, header=None, names=["filename","label"])
test_df_raw  = pd.read_csv(TEST_CSV,  header=None, names=["filename","label"])

# Programmatic cleaning: strip, drop '?' and missing, keep only existing images
def clean_df(df: pd.DataFrame, img_dir: Path) -> pd.DataFrame:
    df = df.copy()
    df["filename"] = df["filename"].astype(str).str.strip()
    df["label"]    = df["label"].astype(str).str.strip()
    df = df[df["label"].notna() & (df["label"] != "?")]
    df["exists"] = df["filename"].apply(lambda f: (img_dir / f).exists())
    return df[df["exists"]].drop(columns=["exists"]).reset_index(drop=True)

train_df = clean_df(train_df_raw, TRAIN_IMG_DIR)
test_df  = clean_df(test_df_raw,  TEST_IMG_DIR)

# Save cleaned CSVs (reproducibility)
train_df.to_csv(DATA_DIR / "Train_label_cleaned.csv", index=False)
test_df.to_csv(DATA_DIR / "Test_label_cleaned.csv", index=False)

print("Cleaned:", train_df.shape, test_df.shape)


Cleaned: (117, 2) (30, 2)


Read label CSVs (no headers), strip whitespace, drop invalid labels (e.g., ?), and remove rows pointing to missing image files. Save cleaned CSVs so later cells use a consistent dataset.


### 1.2 Data Processing (2%)

In [12]:
# Verify images open; drop unreadable to avoid loader crashes
def is_readable(p: Path) -> bool:
    try:
        with Image.open(p) as im: im.verify()
        with Image.open(p) as im: im.convert("RGB")
        return True
    except Exception:
        return False

def drop_unreadable(df: pd.DataFrame, img_dir: Path, name: str) -> pd.DataFrame:
    ok = [f for f in df["filename"] if (img_dir / f).exists() and is_readable(img_dir / f)]
    bad = set(df["filename"]) - set(ok)
    if bad: print(f"[{name}] removed:", len(bad))
    return df[df["filename"].isin(ok)].reset_index(drop=True)

train_df = drop_unreadable(train_df, TRAIN_IMG_DIR, "train")
test_df  = drop_unreadable(test_df,  TEST_IMG_DIR,  "test")

# Resize to 64x64 and normalize to [0,1]
IMG_SIZE = (64, 64)
base_transform = transforms.Compose([
    transforms.Resize(IMG_SIZE),
    transforms.ToTensor(),
])


Verify each image can be opened (drop unreadable files) to avoid DataLoader crashes. Define a base transform to resize images to 64×64 and scale pixel values to [0,1] for stable CNN training.

### 1.3 Data Split and Loader (2%)

In [13]:
# 80/20 split; if a class has only 1 sample, keep it in TRAIN (can’t stratify)
from numpy.random import default_rng
rng = default_rng(42)

labels = sorted(train_df["label"].unique())
class_to_idx = {c:i for i,c in enumerate(labels)}
idx_to_class = {i:c for c,i in class_to_idx.items()}
num_classes = len(class_to_idx)
print("Classes:", labels, "| num_classes:", num_classes)

df_idx = train_df.reset_index().rename(columns={"index":"orig_idx"})
train_idx, val_idx = [], []
for lbl, grp in df_idx.groupby("label"):
    idx = grp["orig_idx"].to_numpy(); n = len(idx)
    if n < 2:
        train_idx += idx.tolist(); continue
    n_val = max(1, int(round(0.2*n)))
    val_sel = rng.choice(idx, size=n_val, replace=False)
    train_sel = np.setdiff1d(idx, val_sel)
    val_idx  += val_sel.tolist(); train_idx += train_sel.tolist()

train_split_df = train_df.loc[train_idx].reset_index(drop=True)
val_split_df   = train_df.loc[val_idx].reset_index(drop=True)

class ButterflyDataset(Dataset):
    def __init__(self, df, img_dir, transform=None):
        self.df = df.reset_index(drop=True); self.img_dir = img_dir; self.transform = transform
    def __len__(self): return len(self.df)
    def __getitem__(self, i):
        r = self.df.iloc[i]; p = self.img_dir / r["filename"]
        img = Image.open(p).convert("RGB")
        img = self.transform(img) if self.transform else img
        return img, class_to_idx[r["label"]]

BATCH_SIZE = 32


Classes: ['ATALA', 'GOLD BANDED', 'MOURNING CLOAK', 'SLEEPY ORANGE', 'SOOTYWING'] | num_classes: 5


Create an 80/20 train–val split per class; if a class has only one sample it stays in train (can’t stratify). Implement a custom Dataset that loads an image and maps its string label to a class index; build DataLoaders (with num_workers=0 for Windows/Jupyter stability).

### 1.4 Data Augmentation

In [14]:
# Augmentation only on training; val/test use base_transform
train_transform = transforms.Compose([
    transforms.Resize(IMG_SIZE),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(degrees=10),
    transforms.ToTensor(),
])

train_ds = ButterflyDataset(train_split_df, TRAIN_IMG_DIR, transform=train_transform)
val_ds   = ButterflyDataset(val_split_df,   TRAIN_IMG_DIR, transform=base_transform)
test_ds  = ButterflyDataset(test_df,        TEST_IMG_DIR,  transform=base_transform)

# Windows/Jupyter-safe loaders (no multiprocessing)
train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True,  num_workers=0)
val_loader   = DataLoader(val_ds,   batch_size=BATCH_SIZE, shuffle=False, num_workers=0)
test_loader  = DataLoader(test_ds,  batch_size=BATCH_SIZE, shuffle=False, num_workers=0)

xb, yb = next(iter(train_loader))
print("Batch OK:", xb.shape, "| Labels:", yb.shape)



Batch OK: torch.Size([32, 3, 64, 64]) | Labels: torch.Size([32])


Add simple, label-preserving augmentations only to the training transform (horizontal flip and small rotation) to increase variety and reduce overfitting; validation/test use the non-augmented base transform.

# Task 2: Training Loop (7%)

### 2.1 Model Architecture (2%)

In [15]:
# Simple CNN built from base PyTorch blocks only (Conv/BN/ReLU/Pool + MLP head)
class SimpleCNN(nn.Module):
    def __init__(self, num_classes: int):
        super().__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 32, 3, padding=1), nn.BatchNorm2d(32), nn.ReLU(), nn.MaxPool2d(2),  # 64->32
            nn.Conv2d(32,64, 3, padding=1), nn.BatchNorm2d(64), nn.ReLU(), nn.MaxPool2d(2),  # 32->16
            nn.Conv2d(64,128,3, padding=1), nn.BatchNorm2d(128),nn.ReLU(), nn.MaxPool2d(2),  # 16->8
        )
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(128*8*8, 256), nn.ReLU(), nn.Dropout(0.3),
            nn.Linear(256, num_classes)
        )
    def forward(self, x):
        return self.classifier(self.features(x))

model = SimpleCNN(num_classes).to(device)
print(model)


SimpleCNN(
  (features): Sequential(
    (0): Conv2d(3, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (4): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (5): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (6): ReLU()
    (7): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (8): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (9): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (10): ReLU()
    (11): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (classifier): Sequential(
    (0): Flatten(start_dim=1, end_dim=-1)
    (1): Linear(in_features=8192, out_features=256, bias=True)
    (2): ReLU()
    (3): Dropou

Define a compact CNN using only basic PyTorch blocks (Conv–BatchNorm–ReLU–Pool stacks) plus a small MLP head with Dropout; suitable capacity for 64×64 inputs while limiting overfitting.   

### 2.2 Training Loop (5%)

In [16]:
# Train/eval utilities
def train_one_epoch(model, loader, optimizer, criterion):
    model.train(); total_loss=0.0; total_correct=0; n=0
    for xb, yb in loader:
        xb, yb = xb.to(device), yb.to(device)
        optimizer.zero_grad()  # clear stale gradients each step
        logits = model(xb); loss = criterion(logits, yb)
        loss.backward(); optimizer.step()
        b = xb.size(0); total_loss += loss.item()*b; total_correct += (logits.argmax(1)==yb).sum().item(); n += b
    return total_loss/max(n,1), total_correct/max(n,1)

@torch.no_grad()
def evaluate(model, loader, criterion):
    model.eval(); total_loss=0.0; total_correct=0; n=0
    for xb, yb in loader:
        xb, yb = xb.to(device), yb.to(device)
        logits = model(xb); loss = criterion(logits, yb)
        b = xb.size(0); total_loss += loss.item()*b; total_correct += (logits.argmax(1)==yb).sum().item(); n += b
    return total_loss/max(n,1), total_correct/max(n,1)

# Three parameter settings → pick best on validation
param_sets = [
    {"name":"SGD_lr0.01_m0.9", "optimizer":"SGD",  "lr":0.01, "momentum":0.9, "weight_decay":1e-4, "epochs":15},
    {"name":"Adam_lr1e-3",     "optimizer":"Adam", "lr":1e-3, "betas":(0.9,0.999), "weight_decay":1e-4, "epochs":15},
    {"name":"Adam_lr5e-4",     "optimizer":"Adam", "lr":5e-4, "betas":(0.9,0.999), "weight_decay":5e-4, "epochs":20},
]
def make_optimizer(model, ps):
    if ps["optimizer"] == "SGD":
        return torch.optim.SGD(model.parameters(), lr=ps["lr"], momentum=ps["momentum"], weight_decay=ps["weight_decay"])
    else:
        return torch.optim.Adam(model.parameters(), lr=ps["lr"], betas=ps["betas"], weight_decay=ps["weight_decay"])

history = []
best_val_acc, best_state, best_params = -1.0, None, None
criterion = nn.CrossEntropyLoss()

for ps in param_sets:
    model = SimpleCNN(num_classes).to(device)
    optimizer = make_optimizer(model, ps)
    train_hist, val_hist = [], []
    for epoch in range(1, ps["epochs"]+1):
        tr_loss, tr_acc = train_one_epoch(model, train_loader, optimizer, criterion)
        va_loss, va_acc = evaluate(model, val_loader, criterion)
        train_hist.append((tr_loss, tr_acc)); val_hist.append((va_loss, va_acc))
        print(f"[{ps['name']}] Epoch {epoch:02d} | train {tr_loss:.4f}/{tr_acc:.3f} | val {va_loss:.4f}/{va_acc:.3f}")
    history.append({"params": ps, "train": train_hist, "val": val_hist})
    if val_hist[-1][1] > best_val_acc:
        best_val_acc = val_hist[-1][1]
        best_state = {k:v.cpu() for k,v in model.state_dict().items()}
        best_params = ps

print("Best val acc:", round(best_val_acc, 4))
print("Best params:", best_params)

BEST_MODEL_PATH = DATA_DIR / "best_simplecnn.pth"
torch.save(best_state, BEST_MODEL_PATH)
print("Saved best model →", BEST_MODEL_PATH)


[SGD_lr0.01_m0.9] Epoch 01 | train 1.4640/0.333 | val 1.5131/0.625
[SGD_lr0.01_m0.9] Epoch 02 | train 0.8102/0.731 | val 1.3373/0.583
[SGD_lr0.01_m0.9] Epoch 03 | train 0.7601/0.720 | val 1.2282/0.583
[SGD_lr0.01_m0.9] Epoch 04 | train 0.4589/0.839 | val 1.1972/0.625
[SGD_lr0.01_m0.9] Epoch 05 | train 0.4354/0.817 | val 1.0860/0.625
[SGD_lr0.01_m0.9] Epoch 06 | train 0.6846/0.817 | val 1.2520/0.625
[SGD_lr0.01_m0.9] Epoch 07 | train 0.3918/0.903 | val 1.3858/0.625
[SGD_lr0.01_m0.9] Epoch 08 | train 0.3682/0.882 | val 1.1127/0.708
[SGD_lr0.01_m0.9] Epoch 09 | train 0.3689/0.871 | val 1.4798/0.625
[SGD_lr0.01_m0.9] Epoch 10 | train 0.2999/0.839 | val 0.7712/0.917
[SGD_lr0.01_m0.9] Epoch 11 | train 0.3552/0.871 | val 0.7849/0.917
[SGD_lr0.01_m0.9] Epoch 12 | train 0.1915/0.946 | val 1.1249/0.750
[SGD_lr0.01_m0.9] Epoch 13 | train 0.1936/0.914 | val 0.8987/0.917
[SGD_lr0.01_m0.9] Epoch 14 | train 0.1466/0.946 | val 1.0098/0.917
[SGD_lr0.01_m0.9] Epoch 15 | train 0.1931/0.946 | val 0.9862/0

Implement train_one_epoch (forward → loss → backward → optimizer step) and evaluate (no_grad forward) that return average loss and accuracy. Use Cross-Entropy for multi-class classification and optimizer.zero_grad() to prevent gradient accumulation.
Run the same CNN with three hyperparameter sets (SGD vs Adam / learning rate / weight decay), track train/val metrics per epoch, and select the best configuration by validation accuracy. Save the best model weights for reproducibility.

### Final Test Evaluation

In [17]:
best_model = SimpleCNN(num_classes).to(device)
state = torch.load(BEST_MODEL_PATH, map_location=device)
best_model.load_state_dict(state, strict=False)
te_loss, te_acc = evaluate(best_model, test_loader, nn.CrossEntropyLoss())
print(f"Test loss {te_loss:.4f} | Test acc {te_acc:.3f}")

Test loss 0.2794 | Test acc 0.900


Reload the saved best weights and run a single evaluation on the held-out test set to report final loss and accuracy, avoiding any test-time tuning.