Call 1: Imports

In [None]:
import sys, subprocess

print("Using Python:", sys.executable)

# 1) Clean out CPU-only builds
subprocess.check_call([sys.executable, "-m", "pip", "uninstall", "-y",
                       "torch", "torchvision", "torchaudio"])

# 2) Install CUDA-enabled wheels into THIS env
# If cu124 is unavailable for you, change cu124 -> cu126 (or any cu12x shown on pytorch.org)
subprocess.check_call([sys.executable, "-m", "pip", "install",
                       "--force-reinstall", "--no-cache-dir",
                       "--index-url", "https://download.pytorch.org/whl/cu124",
                       "torch", "torchvision", "torchaudio"])

Using Python: C:\Users\chunn\AppData\Local\Microsoft\WindowsApps\PythonSoftwareFoundation.Python.3.11_qbz5n2kfra8p0\python.exe


: 

In [4]:

import json, math, os
from pathlib import Path
from collections import Counter, defaultdict

import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
from PIL import Image
import torch.nn as nn
import sklearn
import matplotlib.pyplot as plt

import numpy as np
from sklearn.metrics import (
    accuracy_score, precision_recall_fscore_support, roc_auc_score,
    confusion_matrix, matthews_corrcoef
)

from sklearn.model_selection import StratifiedShuffleSplit

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device)



Device: cpu


Cell 2: Configurations

In [22]:
# -------------------------
# Configurations
# -------------------------

DATASET_ROOT    = "../dataset2"       # <-- changed to dataset2
TRAIN_DIR       = f"dataset2\images"   # main folder (we’ll split this virtually)
CHECKPOINT_PATH = "augment_classifier_best.pth"
CLASS_MAP_JSON  = "class_mapping.json"

# ====Hyperparameters ====
IMAGE_SIZE     = 224
BATCH_SIZE     = 32
EPOCHS         = 15
LEARNING_RATE  = 1e-4
WEIGHT_DECAY   = 1e-3
NUM_WORKERS    = 2
USE_AMP        = True
VAL_RATIO      = 0.15     # 15% validation from train
TEST_RATIO     = 0.15     # 15% test from train
SEED           = 42
MOMENTUM       = 0.9

print("Using DATASET_ROOT:", DATASET_ROOT)
print("CWD:", Path.cwd())
print("Looking for:", Path(TRAIN_DIR).resolve())
print("Exists?", Path(TRAIN_DIR).is_dir())



Using DATASET_ROOT: ../dataset2
CWD: c:\Users\Max\Desktop\ComputerVision\INF3001_Project\notebooks
Looking for: C:\Users\Max\Desktop\ComputerVision\INF3001_Project\notebooks\dataset2\images
Exists? False


  TRAIN_DIR       = f"dataset2\images"   # main folder (we’ll split this virtually)


Cell 3: Parse Annotations

In [23]:


# === Updated for your dataset2 layout ===
DATASET_ROOT = Path("../dataset2")
ANN_PATH     = DATASET_ROOT / "annotations.json"
IMAGES_DIR   = DATASET_ROOT / "images"

# Safety check
if not ANN_PATH.exists():
    raise FileNotFoundError(f"annotations.json not found at {ANN_PATH}")
if not IMAGES_DIR.exists():
    raise FileNotFoundError(f"images folder not found at {IMAGES_DIR}")

print("Using annotations:", ANN_PATH.resolve())
print("Using images dir :", IMAGES_DIR.resolve())

# === Parse annotations ===
data = json.loads(ANN_PATH.read_text())   # format: { "file.jpg": ["Helmet","Vest", ...], ... }

# collect all class names
all_labels = set()
for lbls in data.values():
    for l in lbls:
        all_labels.add(str(l))
class_names = sorted(all_labels)
name2idx = {n:i for i,n in enumerate(class_names)}
idx2name = {i:n for n,i in name2idx.items()}

# build samples: (image_path, [class_name,...])
samples = []
missing = []
for fname, lbls in data.items():
    p = IMAGES_DIR / fname
    if p.exists():
        labs = sorted({str(l) for l in lbls})
        samples.append((p, labs))
    else:
        missing.append(str(p))

print(f"\nDetected {len(class_names)} classes: {class_names}")
print("Total images listed in annotations:", len(data))
print("Total images FOUND on disk         :", len(samples))
if missing:
    print(f"WARNING: {len(missing)} image paths from annotations not found under {IMAGES_DIR}")

# count instances per class (multi-label: each image contributes to all its labels)
counts = Counter()
for _, labs in samples:
    counts.update(labs)
print("\nInstances per class:")
for c in class_names:
    print(f"  {c:>20s}: {counts.get(c,0)}")


Using annotations: C:\Users\Max\Desktop\ComputerVision\INF3001_Project\dataset2\annotations.json
Using images dir : C:\Users\Max\Desktop\ComputerVision\INF3001_Project\dataset2\images

Detected 7 classes: ['Boots', 'Ear Protection', 'Gloves', 'Goggles', 'Helmet', 'Mask', 'Vest']
Total images listed in annotations: 1391
Total images FOUND on disk         : 1390

Instances per class:
                 Boots: 506
        Ear Protection: 43
                Gloves: 826
               Goggles: 443
                Helmet: 1113
                  Mask: 604
                  Vest: 996


Cell 4: Split Ratio

In [24]:

def primary_label(labels):
    # choose one label per image for stratification proxy
    # here: first label alphabetically; you could use majority/frequency if you track it
    return sorted(labels)[0] if labels else None

X = np.arange(len(samples))
y_proxy = np.array([name2idx[primary_label(lbls)] for _, lbls in samples])

# test split first
sss1 = StratifiedShuffleSplit(n_splits=1, test_size=TEST_RATIO, random_state=SEED)
trainval_idx, test_idx = next(sss1.split(X, y_proxy))

# then val from remaining
val_rel = VAL_RATIO / (1.0 - TEST_RATIO)
sss2 = StratifiedShuffleSplit(n_splits=1, test_size=val_rel, random_state=SEED)
train_idx, val_idx = next(sss2.split(trainval_idx, y_proxy[trainval_idx]))

train_items = [samples[i] for i in trainval_idx[train_idx]]
val_items   = [samples[i] for i in trainval_idx[val_idx]]
test_items  = [samples[i] for i in test_idx]

# --- After your train/val/test split code ---
print("\n=== Split Summary ===")
print(f"Total samples: {len(samples)}")
print(f"Train: {len(train_items)}  ({len(train_items)/len(samples)*100:.1f}%)")
print(f"Val:   {len(val_items)}    ({len(val_items)/len(samples)*100:.1f}%)")
print(f"Test:  {len(test_items)}   ({len(test_items)/len(samples)*100:.1f}%)")

# Optional — class distribution proxy in each split
def summarize_split(name, items):
    proxy = [primary_label(lbls) for _, lbls in items]
    c = Counter(proxy)
    print(f"\n{name} distribution:")
    for cls, cnt in c.items():
        print(f"  {cls:>20s}: {cnt}")

summarize_split("Train", train_items)
summarize_split("Val", val_items)
summarize_split("Test", test_items)



=== Split Summary ===
Total samples: 1390
Train: 972  (69.9%)
Val:   209    (15.0%)
Test:  209   (15.0%)

Train distribution:
                Gloves: 353
                 Boots: 354
               Goggles: 59
                Helmet: 143
                  Vest: 12
                  Mask: 27
        Ear Protection: 24

Val distribution:
                Helmet: 31
                 Boots: 76
                  Mask: 6
                Gloves: 76
        Ear Protection: 5
               Goggles: 13
                  Vest: 2

Test distribution:
                Gloves: 76
                 Boots: 76
                Helmet: 31
               Goggles: 13
                  Mask: 6
        Ear Protection: 5
                  Vest: 2


Cell 5: Dataset and Transformers

In [25]:
# CELL 5 — dataset & dataloaders

train_tfms = transforms.Compose([
    transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225]),
])

class ImageClsDataset(Dataset):
    def __init__(self, items, tfm, name2idx):
        self.items = items
        self.tfm = tfm
        self.name2idx = name2idx
    def __len__(self): return len(self.items)
    def __getitem__(self, i):
        path, cls_name = self.items[i]
        img = Image.open(path).convert("RGB")
        x = self.tfm(img)
        y = self.name2idx[cls_name]
        return x, y

train_ds = ImageClsDataset(train_items, train_tfms, name2idx)
val_ds   = ImageClsDataset(val_items,   train_tfms, name2idx)
test_ds  = ImageClsDataset(test_items,  train_tfms, name2idx)

train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True,  num_workers=NUM_WORKERS, pin_memory=True)
val_loader   = DataLoader(val_ds,   batch_size=BATCH_SIZE, shuffle=False, num_workers=NUM_WORKERS, pin_memory=True)
test_loader  = DataLoader(test_ds,  batch_size=BATCH_SIZE, shuffle=False, num_workers=NUM_WORKERS, pin_memory=True)


Cell 6: Model and Optimizer loss

In [26]:
# CELL 6 — model & optimizer (SGD + momentum like previous version)
num_classes = len(class_names)

def get_resnet(num_classes, variant="resnet18", pretrained=True):
    if variant == "resnet18":
        m = models.resnet18(weights=models.ResNet18_Weights.IMAGENET1K_V1 if pretrained else None)
    elif variant == "resnet50":
        m = models.resnet50(weights=models.ResNet50_Weights.IMAGENET1K_V1 if pretrained else None)
    else:
        raise ValueError("Unsupported variant")
    in_feats = m.fc.in_features
    m.fc = nn.Linear(in_feats, num_classes)
    return m

model = get_resnet(num_classes, variant="resnet18", pretrained=True).to(device)

# === switch back to SGD with momentum (previous config) ===
optimizer = torch.optim.SGD(
    model.parameters(),
    lr=LEARNING_RATE,
    momentum= MOMENTUM,           # standard momentum used before
    weight_decay=WEIGHT_DECAY,
    nesterov=True           # optional but often beneficial
)

criterion = nn.CrossEntropyLoss()
scaler = torch.cuda.amp.GradScaler(enabled=(USE_AMP and device.type=="cuda"))


  scaler = torch.cuda.amp.GradScaler(enabled=(USE_AMP and device.type=="cuda"))


Cell 7: Training Loops

In [None]:
# === CELL 7 — train with live progress updates ===
from tqdm.notebook import tqdm
import numpy as np
from sklearn.metrics import precision_recall_fscore_support

THRESH = 0.5
history = {"train_loss": [], "train_acc": [], "val_acc": [], "train_f1": [], "val_f1": []}

def train_one_epoch(model, loader, optimizer, criterion, scaler, device, epoch):
    model.train()
    total_loss = 0.0
    pbar = tqdm(loader, desc=f"Epoch {epoch:02d}", leave=False)
    for xb, yb in pbar:
        xb, yb = xb.to(device), yb.to(device)
        optimizer.zero_grad(set_to_none=True)
        with torch.cuda.amp.autocast(enabled=(scaler is not None and scaler.is_enabled())):
            logits = model(xb)
            loss   = criterion(logits, yb)
        if scaler and scaler.is_enabled():
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()
        else:
            loss.backward()
            optimizer.step()

        total_loss += loss.item() * xb.size(0)
        pbar.set_postfix({"batch_loss": f"{loss.item():.4f}"})
    return total_loss / len(loader.dataset)

@torch.no_grad()
def eval_match(model, loader, device, thresh=THRESH):
    model.eval()
    logits_list, ys = [], []
    for xb, yb in loader:
        xb = xb.to(device)
        logits_list.append(model(xb).cpu())
        ys.append(yb)
    logits = torch.cat(logits_list)
    probs  = logits.sigmoid().numpy()
    y_pred = (probs >= thresh).astype(int)
    y_true = torch.cat(ys).numpy().astype(int)
    subset_acc = (y_pred == y_true).all(axis=1).mean()
    prec, rec, f1, _ = precision_recall_fscore_support(
        y_true, y_pred, average="macro", zero_division=0
    )
    return float(subset_acc), float(f1)

best_val_acc = -1.0

for epoch in range(1, EPOCHS + 1):
    tr_loss = train_one_epoch(model, train_loader, optimizer, criterion, scaler, device, epoch)
    tr_acc, tr_f1   = eval_match(model, train_loader, device)
    val_acc, val_f1 = eval_match(model, val_loader, device)

    history["train_loss"].append(tr_loss)
    history["train_acc"].append(tr_acc)
    history["val_acc"].append(val_acc)
    history["train_f1"].append(tr_f1)
    history["val_f1"].append(val_f1)

    if val_acc > best_val_acc:
        best_val_acc = val_acc
        torch.save({"model": model.state_dict(), "classes": class_names}, CHECKPOINT_PATH)

    tqdm.write(
        f"Epoch {epoch:02d}/{EPOCHS} | "
        f"Train Loss {tr_loss:.4f} | "
        f"Train Acc {tr_acc:.4f} | Train F1 {tr_f1:.4f} | "
        f"Val Acc {val_acc:.4f} | Val F1 {val_f1:.4f}"
    )

tqdm.write(f"\n✅ Best Val Acc: {best_val_acc:.4f} (saved to {CHECKPOINT_PATH})")


Cell 8: Full Evaluation Metrics

In [None]:
# === CELL 8 — evaluation (val & test) + plots ===

@torch.no_grad()
def evaluate_multilabel(model, loader, device, split_name="Val", thresh=THRESH):
    """Prints multi-label metrics and returns a dict for programmatic use."""
    model.eval()
    logits_list, ys = [], []
    for xb, yb in loader:
        xb = xb.to(device)
        logits_list.append(model(xb).cpu())
        ys.append(yb)
    logits = torch.cat(logits_list)                # (N,C)
    y_true = torch.cat(ys).numpy().astype(int)     # (N,C)
    probs  = logits.sigmoid().numpy()
    y_pred = (probs >= thresh).astype(int)

    # Subset accuracy (exact match)
    subset_acc = (y_pred == y_true).all(axis=1).mean()

    # Macro Precision/Recall/F1
    prec, rec, f1, _ = precision_recall_fscore_support(
        y_true, y_pred, average='macro', zero_division=0
    )

    # Per-class AUC (skip classes with only one label value)
    auc_per_class = []
    for c in range(len(class_names)):
        yt = y_true[:, c]
        if len(np.unique(yt)) == 2:
            auc_per_class.append(roc_auc_score(yt, probs[:, c]))
        else:
            auc_per_class.append(np.nan)
    auc_macro = float(np.nanmean(auc_per_class))

    # Specificity / FPR / G-mean per class
    spec_list, fpr_list, rec_list, gmean_list = [], [], [], []
    for c in range(len(class_names)):
        yt, yp = y_true[:, c], y_pred[:, c]
        TP = np.sum((yt==1)&(yp==1))
        TN = np.sum((yt==0)&(yp==0))
        FP = np.sum((yt==0)&(yp==1))
        FN = np.sum((yt==1)&(yp==0))
        spec = TN / (TN+FP) if (TN+FP)>0 else 0.0
        tpr  = TP / (TP+FN) if (TP+FN)>0 else 0.0
        fpr  = 1.0 - spec
        gmean= np.sqrt(max(spec,0.0)*max(tpr,0.0))
        spec_list.append(spec); fpr_list.append(fpr); rec_list.append(tpr); gmean_list.append(gmean)

    spec_macro = float(np.mean(spec_list))
    fpr_macro  = float(np.mean(fpr_list))
    gmean_macro= float(np.mean(gmean_list))

    # MCC (flattened multi-label)
    mcc = matthews_corrcoef(y_true.flatten(), y_pred.flatten())

    # Print nicely
    print(f"\n=== {split_name} (threshold={thresh}) ===")
    print(f"AUC-ROC (macro)      : {auc_macro:.4f}")
    print(f"Subset Accuracy       : {subset_acc:.4f}")
    print(f"Precision (macro)     : {prec:.4f}")
    print(f"Recall (macro)        : {rec:.4f}")
    print(f"F1 (macro)            : {f1:.4f}")
    print(f"Specificity (macro)   : {spec_macro:.4f}")
    print(f"FPR (macro)           : {fpr_macro:.4f}")
    print(f"G-mean (macro)        : {gmean_macro:.4f}")
    print(f"MCC (flattened)       : {mcc:.4f}")

    print("\nPer-class (AUC / Spec / FPR / Recall / G-mean):")
    for i, cname in enumerate(class_names):
        print(f"{cname:20s}  "
              f"AUC:{auc_per_class[i]:6.3f}  "
              f"Spec:{spec_list[i]:6.3f}  "
              f"FPR:{fpr_list[i]:6.3f}  "
              f"Rec:{rec_list[i]:6.3f}  "
              f"G:{gmean_list[i]:6.3f}")

    return {
        "auc_macro": auc_macro,
        "subset_acc": float(subset_acc),
        "precision_macro": float(prec),
        "recall_macro": float(rec),
        "f1_macro": float(f1),
        "specificity_macro": spec_macro,
        "fpr_macro": fpr_macro,
        "gmean_macro": gmean_macro,
        "mcc": float(mcc),
        "auc_per_class": auc_per_class,
    }

# --- Run evaluation on validation and test ---
val_metrics  = evaluate_multilabel(model, val_loader,  device, "Val",  THRESH)
test_metrics = evaluate_multilabel(model, test_loader, device, "Test", THRESH)

# --- Plot training curves from Cell 7 history ---
epochs = range(1, len(history["train_loss"]) + 1)

plt.figure()
plt.plot(epochs, history["train_loss"], marker='o')
plt.title("Train Loss per Epoch")
plt.xlabel("Epoch"); plt.ylabel("Loss")
plt.grid(True)
plt.show()

plt.figure()
plt.plot(epochs, history["train_acc"], marker='o', label="Train Acc (subset)")
plt.plot(epochs, history["val_acc"],   marker='o', label="Val Acc (subset)")
plt.title("Subset Accuracy per Epoch")
plt.xlabel("Epoch"); plt.ylabel("Accuracy")
plt.legend(); plt.grid(True)
plt.show()

plt.figure()
plt.plot(epochs, history["train_f1"], marker='o', label="Train F1 (macro)")
plt.plot(epochs, history["val_f1"],   marker='o', label="Val F1 (macro)")
plt.title("F1 (macro) per Epoch")
plt.xlabel("Epoch"); plt.ylabel("F1")
plt.legend(); plt.grid(True)
plt.show()
