# 1. Setting Up Environment




## a. Installing Dependencies


In [4]:
# PyTorch & Torchvision
!pip install --quiet torch torchvision torchaudio

# Roboflow for dataset
!pip install roboflow

# Optuna for Hyperparameter Optimization
!pip install optuna



## b. Importing Libraries


In [5]:
from roboflow import Roboflow
import os
import torch
import torch.optim as optim
import pandas as pd
import numpy as np
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import torchvision.transforms as T
from torchvision import models
import matplotlib.pyplot as plt
import cv2
from google.colab import drive
import matplotlib.pyplot as plt
from sklearn.metrics import (
    confusion_matrix,
    precision_recall_curve,
    classification_report,
    roc_curve,
    auc
)
from sklearn.preprocessing import label_binarize
import optuna
import torch.nn as nn
from torch.utils.data import DataLoader
from time import perf_counter

## c. Download Dataset and Mount Drive

In [6]:
# Download Dataset
rf = Roboflow(api_key="SOWkZCal2FAKPG56WSnb")
project = rf.workspace("work-tqclg").project("tumor-cjxoh")
version = project.version(1)
dataset = version.download("multiclass", location="data")

# Mounting Drive
drive.mount('/content/drive')

loading Roboflow workspace...
loading Roboflow project...
Mounted at /content/drive


# 2. Model Training

## a. Global Training Configuration

In [7]:
DATA_DIR      = 'data'
TRAIN_IMG_DIR = os.path.join(DATA_DIR, 'train')
VALID_IMG_DIR = os.path.join(DATA_DIR, 'valid')
TEST_IMG_DIR  = os.path.join(DATA_DIR, 'test')
TRAIN_CSV     = os.path.join(TRAIN_IMG_DIR, '_classes.csv')
VALID_CSV     = os.path.join(VALID_IMG_DIR, '_classes.csv')
TEST_CSV      = os.path.join(TEST_IMG_DIR, '_classes.csv')

BATCH_SIZE    = 32
MAX_EPOCHS    = 30
PATIENCE      = 5

LR            = 1e-4
WEIGHT_DECAY  = 1e-3
STEP_SIZE     = 5
GAMMA         = 0.1

DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
base_dir = "/content/drive/MyDrive/Brandon's FYP"
os.makedirs(base_dir, exist_ok=True)
OPTIMIZED_HYPERPARAMETERS = "/content/drive/MyDrive/Brandon's FYP/Hyperparameter Optimization.xlsx"

## b. Model Loader

In [8]:
def get_all_models(num_classes):
    models_list = []

    # ResNet50
    resnet = models.resnet50(weights=models.ResNet50_Weights.IMAGENET1K_V2)
    resnet.fc = torch.nn.Sequential(
        torch.nn.Dropout(0.5),
        torch.nn.Linear(resnet.fc.in_features, num_classes)
    )
    models_list.append(("ResNet50", resnet))

    # EfficientNetB2
    effnet = models.efficientnet_b2(weights=models.EfficientNet_B2_Weights.IMAGENET1K_V1)
    effnet.classifier[1] = torch.nn.Linear(effnet.classifier[1].in_features, num_classes)
    models_list.append(("EfficientNetB2", effnet))

    # MobileNetV3
    mobile = models.mobilenet_v3_large(weights=models.MobileNet_V3_Large_Weights.IMAGENET1K_V1)
    mobile.classifier[3] = torch.nn.Linear(mobile.classifier[3].in_features, num_classes)
    models_list.append(("MobileNetV3", mobile))

    return models_list


## c. Dataset Class

In [9]:
# ----------------------------
# Custom Dataset Class from CSV
# ----------------------------
class CSVClassificationDataset(torch.utils.data.Dataset):
    def __init__(self, csv_file, img_dir, transform=None):
        self.img_dir   = img_dir
        self.transform = transform

        # Load CSV and infer class columns
        df = pd.read_csv(csv_file)
        df.columns = df.columns.str.strip()  # Clean column names
        self.class_names = [c for c in df.columns if c.lower() != 'filename']
        self.num_classes = len(self.class_names)

        self.samples = [
            (row['filename'], int(np.argmax(row[self.class_names].values.astype(int))))
            for _, row in df.iterrows()
        ]

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        fname, label = self.samples[idx]
        img_path = os.path.join(self.img_dir, fname)
        img = Image.open(img_path).convert("RGB")
        if self.transform:
            img = self.transform(img)
        return img, torch.tensor(label, dtype=torch.long)

## d. Data Preprocessing (Transformer)

In [10]:
# ----------------------------
# Flexible Preprocessing Function
# mode = 0 → Baseline (resize + normalize only)
# mode = 1 → Enhanced (blur, jitter, flip, rotate, normalize)
# ----------------------------
class GaussianBlur:
    def __init__(self, kernel_size=3):
        self.kernel_size = kernel_size if kernel_size % 2 == 1 else kernel_size + 1

    def __call__(self, img):
        img_np = np.array(img)
        blurred = cv2.GaussianBlur(img_np, (self.kernel_size, self.kernel_size), 0)
        return Image.fromarray(blurred)

def get_transforms(mode=0):
    # ImageNet Normalization
    normalize = T.Normalize([0.485, 0.456, 0.406],
                            [0.229, 0.224, 0.225])

    if mode == 0:
        # Baseline: Resize + Normalize
        train_transform = T.Compose([
            T.Resize((224, 224)),
            T.ToTensor(),
            normalize,
        ])
        val_transform = train_transform

    else:
        # Stage 2
        train_transform = T.Compose([
            T.Resize((224, 224)),      # (1) Resize first
            T.RandomHorizontalFlip(),  # (2) Flip
            T.RandomRotation(15),      # (3) Rotate ±15°
            T.ColorJitter(
                brightness=0.1,   # ±10%
                contrast=0.1,     # ±10%
                saturation=0.1,   # ±10%
                hue=0.05          # ±5%
            ),
            T.RandomApply(
                [GaussianBlur(kernel_size=3)],  # (5) Small blur on only some images
                p=0.3
            ),
            T.ToTensor(),
            normalize,
        ])

        val_transform = T.Compose([
            T.Resize((224, 224)),
            T.ToTensor(),
            normalize,
        ])

    return train_transform, val_transform


## e. Main Training Function

In [11]:
# ----------------------------
# Evaluation Function
# ----------------------------
def evaluate(model, loader, device):
    model.eval()
    correct = total = 0
    with torch.no_grad():
        for imgs, labels in loader:
            imgs, labels = imgs.to(device), labels.to(device)
            preds = torch.argmax(model(imgs), dim=1)
            correct += (preds == labels).sum().item()
            total   += labels.size(0)
    return correct / total

# ----------------------------
# Train Function
# ----------------------------
def train_model(model, model_name, train_loader, val_loader, custom_config=None):
    # Use global constants or override with custom config
    lr           = custom_config['lr']           if custom_config else LR
    weight_decay = custom_config['weight_decay'] if custom_config else WEIGHT_DECAY
    optimizer_type = custom_config['optimizer']  if custom_config else 'sgd'
    if optimizer_type == 'sgd':
        momentum = custom_config.get('momentum', 0.9)
    else:
        momentum = None
    scheduler_type = custom_config['scheduler'] if custom_config else 'steplr'

    # Define model-specific directory under base_dir
    model_root_dir = os.path.join(base_dir, model_name)
    version = 1
    save_dir = os.path.join(model_root_dir, f"run_v{version}")
    while os.path.exists(save_dir):
        version += 1
        save_dir = os.path.join(model_root_dir, f"run_v{version}")
    os.makedirs(save_dir, exist_ok=True)
    print(f"[{model_name}] Saving checkpoints & plots to: {save_dir}")

    # Move model to device
    model = model.to(DEVICE)
    criterion = torch.nn.CrossEntropyLoss()

    # Optimizer
    if optimizer_type == 'adam':
        optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)
    else:
        optimizer = torch.optim.SGD(model.parameters(), lr=lr, momentum=momentum, weight_decay=weight_decay)

    # Scheduler
    if scheduler_type == 'cosineannealing':
        scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=MAX_EPOCHS)
    elif scheduler_type == 'steplr':
        scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=STEP_SIZE, gamma=GAMMA)
    else:
        scheduler = torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda=lambda epoch: 1.0)

    # Training loop
    best_acc = 0.0
    train_losses = []
    val_accuracies = []
    epochs_no_improve = 0

    print(f"\n--- Starts Training: {model_name} ---")
    for epoch in range(1, MAX_EPOCHS + 1):
        model.train()
        running_loss = 0.0

        for imgs, labels in train_loader:
            imgs, labels = imgs.to(DEVICE), labels.to(DEVICE)
            optimizer.zero_grad()
            loss = criterion(model(imgs), labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

        scheduler.step()

        avg_loss = running_loss / len(train_loader)
        train_losses.append(avg_loss)

        acc = evaluate(model, val_loader, DEVICE)
        val_accuracies.append(acc)

        if acc > best_acc:
            best_acc = acc
            epochs_no_improve = 0
            torch.save(model.state_dict(), os.path.join(save_dir, 'best.pth'))
            print(f"[{model_name}] Epoch {epoch:02d}: ⬆ New best val_acc: {acc:.4f} | Train Loss: {avg_loss:.4f}")
        else:
            epochs_no_improve += 1
            print(f"[{model_name}] Epoch {epoch:02d}: — val_acc did not improve ({acc:.4f}); "
                  f"patience {epochs_no_improve}/{PATIENCE} | Train Loss: {avg_loss:.4f}")

        torch.save(model.state_dict(), os.path.join(save_dir, 'last.pth'))

        if epochs_no_improve >= PATIENCE:
            print(f"Early stopping triggered for {model_name}")
            break

    # Plot loss and accuracy
    epochs_range = range(1, len(train_losses) + 1)

    plt.figure()
    plt.plot(epochs_range, train_losses, 'o-', label="Train Loss")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.title(f"{model_name} Loss Curve")
    plt.legend(); plt.grid()
    plt.savefig(os.path.join(save_dir, "loss_curve.png")); plt.close()

    plt.figure()
    plt.plot(epochs_range, val_accuracies, 's-', label="Val Accuracy")
    plt.xlabel("Epoch")
    plt.ylabel("Accuracy")
    plt.title(f"{model_name} Accuracy Curve")
    plt.legend(); plt.grid()
    plt.savefig(os.path.join(save_dir, "val_accuracy_curve.png")); plt.close()


## f. Stage Training Functions

In [14]:
# ----------------------------
# Stage 1: Baseline Training
# ----------------------------
def stage_1_baseline_training():
    print("\n==== Stage 1: Baseline Training ====\n")

    # Get transforms for Stage 1 (mode=0)
    train_transform, val_transform = get_transforms(mode=0)

    # Create datasets and data loaders
    train_ds = CSVClassificationDataset(TRAIN_CSV, TRAIN_IMG_DIR, transform=train_transform)
    val_ds   = CSVClassificationDataset(VALID_CSV, VALID_IMG_DIR, transform=val_transform)

    train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True,  num_workers=2)
    val_loader   = DataLoader(val_ds,   batch_size=BATCH_SIZE, shuffle=False, num_workers=2)

    # Get all models
    models_list = get_all_models(num_classes=train_ds.num_classes)

    # Loop through models and train each one
    for model_name, model in models_list:
        print(f"\n--- Training {model_name} ---")
        train_model(model, model_name, train_loader, val_loader)

# ----------------------------
# Stage 2: Preprocessing + Augmentation Training
# ----------------------------
def stage_2_preprocessing_training():
    print("\n==== Stage 2: Training with Preprocessing & Augmentation ====\n")

    # Get enhanced transforms (mode=1)
    train_transform, val_transform = get_transforms(mode=1)

    # Prepare datasets and loaders
    train_ds = CSVClassificationDataset(TRAIN_CSV, TRAIN_IMG_DIR, transform=train_transform)
    val_ds   = CSVClassificationDataset(VALID_CSV, VALID_IMG_DIR, transform=val_transform)

    train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True,  num_workers=2)
    val_loader   = DataLoader(val_ds,   batch_size=BATCH_SIZE, shuffle=False, num_workers=2)

    # Load models
    models_list = get_all_models(num_classes=train_ds.num_classes)

    # Train each model
    for model_name, model in models_list:
        print(f"\n--- Stage 2 Training: {model_name} ---")
        train_model(model, model_name, train_loader, val_loader)

# ----------------------------
# Stage 3: Hyperparameter Optimization with Optuna (n_trials per model)
# ----------------------------
def stage_3_hyperparameter_optimization(n_trials=20):
    print(f"\n==== Stage 3: Hyperparameter Search ({n_trials} trials per model) ====\n")

    def objective_for_model(fixed_model_name, trial):
        # 1. Sample hyperparameters (excluding model_name, which is fixed)
        lr           = trial.suggest_float("learning_rate", 1e-5, 1e-2, log=True)
        batch_size   = trial.suggest_categorical("batch_size", [16, 32, 64])
        weight_decay = trial.suggest_float("weight_decay", 1e-6, 1e-3, log=True)
        dropout_rate = trial.suggest_float("dropout_rate", 0.0, 0.5)
        optimizer    = trial.suggest_categorical("optimizer", ["sgd", "adam"])
        scheduler    = trial.suggest_categorical("scheduler", ["none", "steplr", "cosineannealing"])

        # Print trial info
        print(f"\n--- {fixed_model_name} Trial {trial.number + 1}/{n_trials} ---")
        print(f"LR: {lr:.2e} | Batch: {batch_size} | WD: {weight_decay:.2e} | "
              f"Dropout: {dropout_rate:.2f} | Opt: {optimizer} | Momentum: {momentum:.2f} | Scheduler: {scheduler}")

        # 2. Build DataLoaders with stage 3 transforms
        train_transform, val_transform = get_transforms(mode=1)
        train_ds = CSVClassificationDataset(TRAIN_CSV, TRAIN_IMG_DIR, transform=train_transform)
        val_ds   = CSVClassificationDataset(VALID_CSV, VALID_IMG_DIR, transform=val_transform)

        train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True,  num_workers=2)
        val_loader   = DataLoader(val_ds,   batch_size=batch_size, shuffle=False, num_workers=2)

        # 3. Instantiate fixed_model_name from get_all_models and replace dropout
        models_list = get_all_models(train_ds.num_classes)
        model = None
        for name, m in models_list:
            if name == fixed_model_name:
                if fixed_model_name == "ResNet50":
                    m.fc[0] = torch.nn.Dropout(dropout_rate)
                elif fixed_model_name == "EfficientNetB2":
                    m.classifier.insert(0, torch.nn.Dropout(dropout_rate))
                elif fixed_model_name == "MobileNetV3":
                    m.classifier.insert(0, torch.nn.Dropout(dropout_rate))
                model = m.to(DEVICE)
                break

        # 4. Choose optimizer
        if optimizer == "sgd":
            momentum     = trial.suggest_float("momentum", 0.5, 0.99)
            optim_inst = optim.SGD(model.parameters(), lr=lr, momentum=momentum, weight_decay=weight_decay)
        else:
            optim_inst = optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)

        # 5. Choose scheduler
        if scheduler == "steplr":
            scheduler_inst = optim.lr_scheduler.StepLR(optim_inst, step_size=5, gamma=0.1)
        elif scheduler == "cosineannealing":
            scheduler_inst = optim.lr_scheduler.CosineAnnealingLR(optim_inst, T_max=10)
        else:
            scheduler_inst = None

        criterion = nn.CrossEntropyLoss()
        best_val_acc = 0.0

        # 6. Train for fixed 10 epochs per trial
        for epoch in range(10):
            model.train()
            for imgs, labels in train_loader:
                imgs, labels = imgs.to(DEVICE), labels.to(DEVICE)
                optim_inst.zero_grad()
                outputs = model(imgs)
                loss = criterion(outputs, labels)
                loss.backward()
                optim_inst.step()

            if scheduler_inst:
                scheduler_inst.step()

            # 7. Validation
            model.eval()
            correct = 0
            total = 0
            with torch.no_grad():
                for imgs, labels in val_loader:
                    imgs, labels = imgs.to(DEVICE), labels.to(DEVICE)
                    outputs = model(imgs)
                    preds = outputs.argmax(dim=1)
                    correct += (preds == labels).sum().item()
                    total += labels.size(0)
            val_acc = correct / total

            # Print epoch progress
            print(f"{fixed_model_name} Trial {trial.number + 1}, Epoch {epoch + 1}/10, Val Acc: {val_acc:.4f}")

            trial.report(val_acc, epoch)
            if trial.should_prune():
                print(f"{fixed_model_name} Trial {trial.number + 1} pruned at epoch {epoch + 1}")
                raise optuna.TrialPruned()

            best_val_acc = max(best_val_acc, val_acc)

        return best_val_acc

    # 8. Loop over each model, run n_trials per model
    all_best_configs = []
    for fixed_model_name in ["ResNet50", "EfficientNetB2", "MobileNetV3"]:
        print(f"\n===== Optimizing {fixed_model_name} =====")
        study = optuna.create_study(direction="maximize")
        func = lambda trial: objective_for_model(fixed_model_name, trial)
        study.optimize(func, n_trials=n_trials)

        # 9. Extract top 3 configs for this model
        df_trials = study.trials_dataframe()
        # Tag every trial with fixed_model_name, since this study only tuned that model
        df_trials["model"] = fixed_model_name
        # Sort all trials by “value” and take the top 3
        df_m_sorted = df_trials.sort_values("value", ascending=False).head(3)

        for rank, row in enumerate(df_m_sorted.itertuples(), start=1):
            all_best_configs.append({
                "model":        row.model,  # same as fixed_model_name
                "rank":         rank,
                "learning_rate": row.params_learning_rate,
                "batch_size":    row.params_batch_size,
                "weight_decay":  row.params_weight_decay,
                "dropout_rate":  row.params_dropout_rate,
                "optimizer":     row.params_optimizer,
                "momentum":      row.params_momentum,
                "scheduler":     row.params_scheduler,
                "val_accuracy":  row.value
            })

    # 10. Save all results to CSV
    df_best = pd.DataFrame(all_best_configs)
    df_best = df_best[[
        "model", "rank", "learning_rate", "batch_size", "weight_decay",
        "dropout_rate", "optimizer", "momentum", "scheduler", "val_accuracy"
    ]]
    summary_path = "/content/drive/MyDrive/Brandon's FYP/hparam_stage3_summary.csv"
    df_best.to_csv(summary_path, index=False)

    print("\n=== Top 3 Configurations per Model ===")
    print(df_best)
    print(f"\nSummary saved to: {summary_path}")

    # -----------------------------------------------------------------------------
# Stage 3 – Train each model on its best (rank-1) configuration
# -----------------------------------------------------------------------------

# Best Configs from “Rank 1” table
BEST_CONFIGS = {
    'ResNet50': {
        'lr':           9.71211e-05,
        'batch_size':  16,
        'weight_decay':7.01183e-05,
        'dropout':     0.40369264,
        'optimizer':   'adam',
        'momentum':    None,
        'scheduler':   'cosineannealing',
    },
    'EfficientNetB2': {
        'lr':           0.0000612861,
        'batch_size':  64,
        'weight_decay':5.97754e-06,
        'dropout':     0.358457194,
        'optimizer':   'adam',
        'momentum':    None,
        'scheduler':   'cosineannealing',
    },
    'MobileNetV3': {
        'lr':           3.05e-04,
        'batch_size':  16,
        'weight_decay':6.14e-05,
        'dropout':     0.227504223,
        'optimizer':   'adam',
        'momentum':    None,
        'scheduler':   'steplr',
    },
}

# ----------------------------
# Stage 3 Training Function with Best Config
# ----------------------------
def stage_3_best_config_training():
    print("\n==== Stage 3: Training on Best Configurations ====\n")

    # Get transforms for Stage 1 (mode=0 for fair comparison)
    train_transform, val_transform = get_transforms(mode=0)

    # Create datasets and data loaders (we'll customize batch size later per model)
    train_ds = CSVClassificationDataset(TRAIN_CSV, TRAIN_IMG_DIR, transform=train_transform)
    val_ds   = CSVClassificationDataset(VALID_CSV, VALID_IMG_DIR, transform=val_transform)

    # Load all models
    models_list = get_all_models(num_classes=train_ds.num_classes)

    for model_name, model in models_list:
        print(f"\n--- Training {model_name} on Best Configuration ---")

        # Get best config for this model
        config = BEST_CONFIGS[model_name]

        # Update dropout layer dynamically before training
        dropout = config['dropout']
        if model_name == 'ResNet50':
            in_features = model.fc[1].in_features
            out_features = model.fc[1].out_features
            model.fc = torch.nn.Sequential(
                torch.nn.Dropout(dropout),
                torch.nn.Linear(in_features, out_features)
            )
        elif model_name == 'EfficientNetB2':
            in_features = model.classifier[1].in_features
            out_features = model.classifier[1].out_features
            model.classifier = torch.nn.Sequential(
                torch.nn.Dropout(dropout),
                torch.nn.Linear(in_features, out_features)
            )
        elif model_name == 'MobileNetV3':
            m = models.mobilenet_v3_large(weights=models.MobileNet_V3_Large_Weights.IMAGENET1K_V1)
            # insert dropout before the final layer
            m.classifier[2] = nn.Dropout(config["dropout"])
            # replace only the final linear
            in_f = m.classifier[3].in_features
            m.classifier[3] = nn.Linear(in_f, 4)

        # Create data loaders using model-specific batch size
        batch_size = config['batch_size']
        train_loader = DataLoader(train_ds, batch_size=config['batch_size'], shuffle=True,  num_workers=2)
        val_loader   = DataLoader(val_ds,   batch_size=config['batch_size'], shuffle=False, num_workers=2)

        # Train with injected config
        train_model(model, model_name, train_loader, val_loader, custom_config=config)

# Main Training Launcher

In [15]:
# ----------------------------
# Main Launcher
# ----------------------------
if __name__ == '__main__':
    # stage_1_baseline_training()
    # stage_2_preprocessing_training()
    # stage_3_hyperparameter_optimization(n_trials=20) # 9 hours, 49 minutes
    stage_3_best_config_training()



==== Stage 3: Training on Best Configurations ====


--- Training MobileNetV3 on Best Configuration ---
[MobileNetV3] Saving checkpoints & plots to: /content/drive/MyDrive/Brandon's FYP/MobileNetV3/run_v4

--- Starts Training: MobileNetV3 ---
[MobileNetV3] Epoch 01: ⬆ New best val_acc: 0.9555 | Train Loss: 0.1827
[MobileNetV3] Epoch 02: ⬆ New best val_acc: 0.9682 | Train Loss: 0.0561
[MobileNetV3] Epoch 03: ⬆ New best val_acc: 0.9734 | Train Loss: 0.0369
[MobileNetV3] Epoch 04: — val_acc did not improve (0.9612); patience 1/5 | Train Loss: 0.0371
[MobileNetV3] Epoch 05: — val_acc did not improve (0.9612); patience 2/5 | Train Loss: 0.0276
[MobileNetV3] Epoch 06: ⬆ New best val_acc: 0.9859 | Train Loss: 0.0060
[MobileNetV3] Epoch 07: — val_acc did not improve (0.9859); patience 1/5 | Train Loss: 0.0019
[MobileNetV3] Epoch 08: ⬆ New best val_acc: 0.9872 | Train Loss: 0.0015
[MobileNetV3] Epoch 09: — val_acc did not improve (0.9872); patience 1/5 | Train Loss: 0.0010
[MobileNetV3] Epoch 

# 3. Model Testing

## a. Model Evaluation Function




In [16]:
def evaluate_model(model_name, run_version, model_filename):
    print(f"\n--- Evaluating {model_name} ({run_version}) ---")

    # --- Config & paths ---
    DEVICE     = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    BASE_DIR   = "/content/drive/MyDrive/Brandon's FYP"
    MODEL_PATH = os.path.join(BASE_DIR, model_name, run_version, model_filename)
    SAVE_DIR   = os.path.dirname(MODEL_PATH)
    TEST_IMG_DIR = "data/test"
    TEST_CSV     = os.path.join(TEST_IMG_DIR, "_classes.csv")
    os.makedirs(SAVE_DIR, exist_ok=True)

    # --- Load CSV and prepare classes ---
    df = pd.read_csv(TEST_CSV)
    df.columns = df.columns.str.strip()
    class_cols = [c for c in df.columns if c != 'filename']
    NUM_CLASSES = len(class_cols)

    # --- Load model architecture and weights ---
    models_dict = dict(get_all_models(NUM_CLASSES))
    model = models_dict[model_name]
    state = torch.load(MODEL_PATH, map_location=DEVICE)
    model.load_state_dict(state)
    model = model.to(DEVICE)
    model.eval()

    # --- Transforms (match validation pipeline) ---
    transform = T.Compose([
        T.Resize((224,224)),
        T.ToTensor(),
        T.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225]),
    ])

    # --- Inference loop ---
    y_true, y_pred, y_prob = [], [], []
    with torch.no_grad():
        for _, row in df.iterrows():
            img_path = os.path.join(TEST_IMG_DIR, row['filename'].strip())
            img = Image.open(img_path).convert("RGB")
            x   = transform(img).unsqueeze(0).to(DEVICE)
            logits = model(x)
            probs  = torch.softmax(logits, dim=1).cpu().numpy()[0]
            pred   = int(np.argmax(probs))
            true   = int(row[class_cols].astype(int).values.argmax())
            y_true.append(true)
            y_pred.append(pred)
            y_prob.append(probs)
    y_prob = np.array(y_prob)

    # --- Confusion Matrix ---
    cm = confusion_matrix(y_true, y_pred)
    cm_norm = cm.astype(float) / cm.sum(axis=1)[:, None]
    def plot_cm(matrix, title, fname, norm=False):
        fig, ax = plt.subplots()
        im = ax.imshow(matrix, cmap='Blues', vmin=0 if norm else None, vmax=1 if norm else None)
        plt.colorbar(im, ax=ax)
        ax.set(xticks=np.arange(NUM_CLASSES), yticks=np.arange(NUM_CLASSES),
               xticklabels=class_cols, yticklabels=class_cols,
               xlabel='Predicted', ylabel='Actual', title=title)
        for i in range(NUM_CLASSES):
            for j in range(NUM_CLASSES):
                value = f"{matrix[i,j]:.2f}" if norm else str(matrix[i,j])
                ax.text(j, i, value, ha='center', va='center',
                        color='white' if matrix[i,j] > matrix.max()/2. else 'black')
        fig.tight_layout()
        fig.savefig(os.path.join(SAVE_DIR, fname), dpi=150)
        plt.close(fig)
    plot_cm(cm, f'{model_name}: Confusion Matrix', 'confusion_matrix.png')
    plot_cm(cm_norm, f'{model_name}: Normalized Confusion Matrix', 'confusion_matrix_normalized.png', norm=True)

    # --- F1 Curve ---
    common_t = np.linspace(0,1,100)
    f1_curves = []
    for idx, cls in enumerate(class_cols):
        yb      = np.array(y_true) == idx
        scores  = y_prob[:, idx]
        prec, rec, th = precision_recall_curve(yb, scores)
        f1_vals = 2 * prec * rec / (prec + rec + 1e-8)
        th_ext  = np.concatenate(([0], th))
        f1_curves.append(np.interp(common_t, th_ext, f1_vals))
    mean_f1 = np.mean(f1_curves, axis=0)
    fig, ax = plt.subplots()
    for idx, cls in enumerate(class_cols):
        ax.plot(common_t, f1_curves[idx], label=cls)
    ax.plot(common_t, mean_f1, 'k--', lw=2, label='Average')
    ax.set(title=f'{model_name}: F1 Score vs. Threshold', xlabel='Threshold', ylabel='F1 Score')
    ax.legend(loc='lower left', fontsize='small')
    fig.tight_layout()
    fig.savefig(os.path.join(SAVE_DIR, 'f1_curve.png'), dpi=150)
    plt.close(fig)

    # --- ROC Curve ---
    y_true_bin = label_binarize(y_true, classes=list(range(NUM_CLASSES)))
    fpr, tpr, roc_auc = {}, {}, {}
    for i in range(NUM_CLASSES):
        fpr[i], tpr[i], _ = roc_curve(y_true_bin[:, i], y_prob[:, i])
        roc_auc[i] = auc(fpr[i], tpr[i])
    fig, ax = plt.subplots()
    for i, cls in enumerate(class_cols):
        ax.plot(fpr[i], tpr[i], label=f'{cls} (AUC = {roc_auc[i]:.2f})')
    ax.plot([0,1], [0,1], 'k--', linewidth=1)
    ax.set(xlabel='False Positive Rate', ylabel='True Positive Rate',
           title=f'{model_name}: ROC Curves (OvR)')
    ax.legend(loc='lower right', fontsize='small')
    fig.tight_layout()
    fig.savefig(os.path.join(SAVE_DIR, 'roc_curve.png'), dpi=150)
    plt.close(fig)

    # --- Classification Report ---
    report = classification_report(y_true, y_pred, target_names=class_cols, output_dict=True, zero_division=0)
    df_report = pd.DataFrame(report).T
    df_report.to_csv(os.path.join(SAVE_DIR, 'classification_report.csv'))

    fig, ax = plt.subplots(figsize=(8, 1 + 0.5*len(df_report)))
    ax.axis('off')
    tbl = ax.table(
        cellText=np.round(df_report.values, 3),
        rowLabels=df_report.index,
        colLabels=df_report.columns,
        cellLoc='center', loc='center'
    )
    tbl.auto_set_font_size(False)
    tbl.set_fontsize(10)
    tbl.scale(1, 1.5)
    plt.title(f'{model_name}: Classification Report', pad=20)
    fig.tight_layout()
    fig.savefig(os.path.join(SAVE_DIR, 'classification_report.png'), dpi=150)
    plt.close(fig)

    print(f"✅ Saved all evaluation outputs for {model_name} to:\n → {SAVE_DIR}")

## b. CPU Timing Function

In [17]:
def measure_cpu_inference_time(
    model_name,
    run_version,
    model_filename,
    batch_size=BATCH_SIZE,
    device=torch.device("cpu")
):
    print(f"\n--- CPU Inference Timing: {model_name} ({run_version}) ---")

    # Paths
    BASE_DIR     = "/content/drive/MyDrive/Brandon's FYP"
    MODEL_PATH   = os.path.join(BASE_DIR, model_name, run_version, model_filename)
    SAVE_DIR     = os.path.dirname(MODEL_PATH)
    TEST_IMG_DIR = "data/test"
    TEST_CSV     = os.path.join(TEST_IMG_DIR, "_classes.csv")
    os.makedirs(SAVE_DIR, exist_ok=True)

    # Load test dataset using existing transforms and loader
    print(f"Loading test set from {TEST_CSV}")
    _, val_transform = get_transforms(mode=1)
    test_ds = CSVClassificationDataset(TEST_CSV, TEST_IMG_DIR, transform=val_transform)
    test_loader = DataLoader(test_ds, batch_size=batch_size, shuffle=False, num_workers=2)
    num_images = len(test_ds)
    num_classes = test_ds.num_classes
    print(f"Test set: {num_images} images, {num_classes} classes")

    # Load model and weights
    print(f"Loading model weights from {MODEL_PATH}")
    models_dict = dict(get_all_models(num_classes))
    model = models_dict[model_name]
    state = torch.load(MODEL_PATH, map_location=device)
    model.load_state_dict(state)
    model = model.to(device).eval()

    # Warm-up
    print("Running warm-up pass…")
    with torch.no_grad():
        imgs, _ = next(iter(test_loader))
        _ = model(imgs.to(device))

    # Timing
    print("Timing full dataset inference…")
    start = perf_counter()
    with torch.no_grad():
        for imgs, _ in test_loader:
            _ = model(imgs.to(device))
    end = perf_counter()

    total_time = end - start
    avg_time   = total_time / num_images
    print(f"Total time: {total_time:.2f}s for {num_images} images")
    print(f"Average per image: {avg_time:.4f}s")

    # Save results
    df_time = pd.DataFrame([{
        "model":       model_name,
        "run_version": run_version,
        "num_images":  num_images,
        "total_time_s": total_time,
        "avg_time_s":   avg_time
    }])
    output_csv = os.path.join(SAVE_DIR, "cpu_inference_time.csv")
    df_time.to_csv(output_csv, index=False)
    print(f"Saved timing CSV to {output_csv}\n")

    return df_time

# Main Testing Launcher

In [18]:
if __name__ == '__main__':
    for model_name in ["ResNet50", "EfficientNetB2", "MobileNetV3"]:
      evaluate_model(model_name, run_version="run_v4", model_filename="best.pth")
      measure_cpu_inference_time(model_name, run_version="run_v4", model_filename="best.pth")



--- Evaluating ResNet50 (run_v4) ---
✅ Saved all evaluation outputs for ResNet50 to:
 → /content/drive/MyDrive/Brandon's FYP/ResNet50/run_v4

--- CPU Inference Timing: ResNet50 (run_v4) ---
Loading test set from data/test/_classes.csv
Test set: 1824 images, 4 classes
Loading model weights from /content/drive/MyDrive/Brandon's FYP/ResNet50/run_v4/best.pth
Running warm-up pass…
Timing full dataset inference…
Total time: 595.57s for 1824 images
Average per image: 0.3265s
Saved timing CSV to /content/drive/MyDrive/Brandon's FYP/ResNet50/run_v4/cpu_inference_time.csv


--- Evaluating EfficientNetB2 (run_v4) ---
✅ Saved all evaluation outputs for EfficientNetB2 to:
 → /content/drive/MyDrive/Brandon's FYP/EfficientNetB2/run_v4

--- CPU Inference Timing: EfficientNetB2 (run_v4) ---
Loading test set from data/test/_classes.csv
Test set: 1824 images, 4 classes
Loading model weights from /content/drive/MyDrive/Brandon's FYP/EfficientNetB2/run_v4/best.pth
Running warm-up pass…
Timing full dataset

# Auto Disconnect After Training

In [None]:
import time
from google.colab import runtime

def disconnect_after(minutes=5):
    print(f"Will disconnect Colab in {minutes} minutes if still running...")
    time.sleep(minutes * 60)
    print("Disconnecting now.")
    runtime.unassign()

if __name__ == '__main__':
    disconnect_after(minutes=5)

Will disconnect Colab in 5 minutes if still running...
Disconnecting now.
