## Task 2 - Supervised Baseline [colab version]
---
### Author - Shourav Deb

In [1]:
import os
import glob
import random
import time
import copy
import math
from pathlib import Path
from collections import defaultdict

import numpy as np
import pandas as pd
from PIL import Image

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, Subset

import torchvision
import torchvision.transforms as T
from torchvision.models import (
    efficientnet_b0, EfficientNet_B0_Weights,
    resnet50, ResNet50_Weights,
    vit_b_16, ViT_B_16_Weights
)

import matplotlib.pyplot as plt
from sklearn.metrics import (
    accuracy_score,
    precision_recall_fscore_support,
    confusion_matrix,
    roc_auc_score,
    roc_curve
)
from sklearn.preprocessing import label_binarize
from sklearn.model_selection import train_test_split

import seaborn as sns
sns.set(style="whitegrid")

# Mount Google Drive so we can save checkpoints permanently
from google.colab import drive
drive.mount('/content/drive')

# Device setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device)
print("Torch version:", torch.__version__)
print("CUDA available:", torch.cuda.is_available())

# Reproducibility
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(SEED)


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Device: cuda
Torch version: 2.8.0+cu126
CUDA available: True


# 2) Config

In [None]:

DATASET_ROOT = "/content/drive/MyDrive/Betel Leaf Dataset"

NUM_CLASSES   = 3
BATCH_SIZE    = 32
NUM_EPOCHS    = 50
LEARNING_RATE = 1e-3
WEIGHT_DECAY  = 1e-4
IMG_SIZE      = 224

# Other BACKBONE
BACKBONE = "efficientnet_b0"
# BACKBONE = "resnet50"
# BACKBONE = "vit_b_16"

# We will run one split at a time.
TRAIN_RATIO = 0.9
TEST_RATIO  = 0.1
assert math.isclose(TRAIN_RATIO + TEST_RATIO, 1.0, rel_tol=1e-6), "Train+Test must = 1.0"

# Toggle for freezing backbone
# True  -> freeze feature extractor, train only classifier (faster)
# False -> full fine-tuning
FREEZE_BACKBONE = True

# Output dir
OUTPUT_DIR = "/content/drive/MyDrive/Colab Notebooks/Outputs/betel_leaf_checkpoints"
os.makedirs(OUTPUT_DIR, exist_ok=True)

print(f"Will run Train:{int(TRAIN_RATIO*100)}% / Test:{int(TEST_RATIO*100)}% with {BACKBONE}")
print("Freeze backbone:", FREEZE_BACKBONE)
print("Saving outputs to:", OUTPUT_DIR)



Will run Train:90% / Test:10% with efficientnet_b0
Freeze backbone: True
Saving outputs to: /content/drive/MyDrive/Colab Notebooks/Outputs/betel_leaf_checkpoints


# 3) Dataset prep

In [3]:
# Here we normalize class names from folder names
CLASS_MAP = {
    "healthy":   ["Healthy", "Healthy Leaf"],
    "diseased":  ["Diseased", "Diseased Leaf"],
    "dried":     ["Dried", "Dried Leaf", "Dried Leaf"],
}

def collect_image_paths(dataset_root):
    """
    Walk through dataset_root and return list of (img_path, class_idx, class_name_norm)
    where class_name_norm is one of: healthy / diseased / dried
    """
    all_samples = []
    class_to_idx = {"healthy": 0, "diseased": 1, "dried": 2}

    for class_name_norm, aliases in CLASS_MAP.items():
        for alias in aliases:
            pattern = os.path.join(dataset_root, "**", alias, "*.*")
            for img_path in glob.glob(pattern, recursive=True):
                if img_path.lower().endswith((".jpg", ".jpeg", ".png", ".bmp", ".tif", ".tiff")):
                    all_samples.append(
                        (img_path, class_to_idx[class_name_norm], class_name_norm)
                    )
    return all_samples, class_to_idx

all_samples, class_to_idx = collect_image_paths(DATASET_ROOT)

print(f"Total images found: {len(all_samples)}")
class_counts = defaultdict(int)
for _, idx, cname in all_samples:
    class_counts[cname] += 1
print("Class counts:", dict(class_counts))


class BetelLeafDataset(Dataset):
    def __init__(self, samples, transform=None):
        """
        samples: list of (img_path, class_idx, class_name_norm)
        """
        self.samples = samples
        self.transform = transform

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, i):
        img_path, class_idx, class_name = self.samples[i]
        img = Image.open(img_path).convert("RGB")
        if self.transform is not None:
            img = self.transform(img)
        return img, class_idx

Total images found: 2082
Class counts: {'healthy': 669, 'diseased': 509, 'dried': 904}


# 4) Transforms

In [4]:
IMAGENET_MEAN = [0.485, 0.456, 0.406]
IMAGENET_STD  = [0.229, 0.224, 0.225]

train_transform = T.Compose([
    T.Resize((IMG_SIZE, IMG_SIZE)),
    T.RandomHorizontalFlip(p=0.5),
    T.RandomVerticalFlip(p=0.2),
    T.ColorJitter(brightness=0.15, contrast=0.15, saturation=0.15, hue=0.05),
    T.ToTensor(),
    T.Normalize(mean=IMAGENET_MEAN, std=IMAGENET_STD),
])

test_transform = T.Compose([
    T.Resize((IMG_SIZE, IMG_SIZE)),
    T.ToTensor(),
    T.Normalize(mean=IMAGENET_MEAN, std=IMAGENET_STD),
])


# 5) Model Factory

In [5]:
def get_model(backbone_name, num_classes=3):
    """
    Returns a model with pretrained weights and replaces
    the classifier head with (num_classes).
    """
    if backbone_name == "efficientnet_b0":
        weights = EfficientNet_B0_Weights.IMAGENET1K_V1
        model = efficientnet_b0(weights=weights)
        in_features = model.classifier[1].in_features
        model.classifier[1] = nn.Linear(in_features, num_classes)
        return model

    elif backbone_name == "resnet50":
        weights = ResNet50_Weights.IMAGENET1K_V2
        model = resnet50(weights=weights)
        in_features = model.fc.in_features
        model.fc = nn.Linear(in_features, num_classes)
        return model

    elif backbone_name == "vit_b_16":
        weights = ViT_B_16_Weights.IMAGENET1K_V1
        model = vit_b_16(weights=weights)
        in_features = model.heads.head.in_features
        model.heads.head = nn.Linear(in_features, num_classes)
        return model

    else:
        raise ValueError(f"Unknown backbone: {backbone_name}")

# 6) Train / Eval Helpers

In [6]:
def train_one_epoch(model, loader, criterion, optimizer):
    model.train()
    total_loss = 0.0
    total_correct = 0
    total_count = 0

    for imgs, labels in loader:
        imgs = imgs.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()
        logits = model(imgs)
        loss = criterion(logits, labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item() * imgs.size(0)
        preds = torch.argmax(logits, dim=1)
        total_correct += torch.sum(preds == labels).item()
        total_count += imgs.size(0)

    avg_loss = total_loss / total_count
    avg_acc  = total_correct / total_count
    return avg_loss, avg_acc


@torch.no_grad()
def eval_one_epoch(model, loader, criterion):
    model.eval()
    total_loss = 0.0
    total_correct = 0
    total_count = 0

    all_labels = []
    all_probs  = []
    all_preds  = []

    for imgs, labels in loader:
        imgs = imgs.to(device)
        labels = labels.to(device)

        logits = model(imgs)
        loss = criterion(logits, labels)

        probs = torch.softmax(logits, dim=1)
        preds = torch.argmax(probs, dim=1)

        total_loss   += loss.item() * imgs.size(0)
        total_correct += torch.sum(preds == labels).item()
        total_count  += imgs.size(0)

        all_labels.append(labels.cpu().numpy())
        all_probs.append(probs.cpu().numpy())
        all_preds.append(preds.cpu().numpy())

    avg_loss = total_loss / total_count
    avg_acc  = total_correct / total_count

    all_labels = np.concatenate(all_labels)
    all_probs  = np.concatenate(all_probs)
    all_preds  = np.concatenate(all_preds)

    return avg_loss, avg_acc, all_labels, all_preds, all_probs


def plot_training_curves(history, title_prefix=""):
    """
    history dict:
      "train_loss": [...],
      "val_loss":   [...],
      "train_acc":  [...],
      "val_acc":    [...]
    """
    epochs_range = range(1, len(history["train_loss"]) + 1)

    plt.figure(figsize=(12,5))

    # Loss subplot
    plt.subplot(1,2,1)
    plt.plot(epochs_range, history["train_loss"], label="Train Loss")
    plt.plot(epochs_range, history["val_loss"],   label="Val Loss")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.title(f"{title_prefix} Loss")
    plt.legend()

    # Acc subplot
    plt.subplot(1,2,2)
    plt.plot(epochs_range, history["train_acc"], label="Train Acc")
    plt.plot(epochs_range, history["val_acc"],   label="Val Acc")
    plt.xlabel("Epoch")
    plt.ylabel("Accuracy")
    plt.title(f"{title_prefix} Accuracy")
    plt.legend()

    plt.tight_layout()
    plt.show()


def evaluate_metrics(y_true, y_pred, y_prob, class_names):
    """
    Returns dict of metrics, and also plots:
      - Confusion matrix
      - Per-class accuracy bar
    Also computes ROC-AUC macro (OvR).
    """
    acc = accuracy_score(y_true, y_pred)
    precision, recall, f1, _ = precision_recall_fscore_support(
        y_true, y_pred, average="weighted", zero_division=0
    )

    cm = confusion_matrix(y_true, y_pred)
    per_class_acc = cm.diagonal() / cm.sum(axis=1)

    # ROC-AUC (macro, one-vs-rest)
    y_true_bin = label_binarize(y_true, classes=list(range(len(class_names))))
    try:
        roc_auc_macro = roc_auc_score(
            y_true_bin, y_prob, average="macro", multi_class="ovr"
        )
    except ValueError:
        roc_auc_macro = np.nan

    # Confusion matrix plot
    plt.figure(figsize=(5,4))
    sns.heatmap(
        cm,
        annot=True,
        fmt="d",
        xticklabels=class_names,
        yticklabels=class_names,
        cmap="Blues"
    )
    plt.xlabel("Predicted")
    plt.ylabel("True")
    plt.title("Confusion Matrix")
    plt.show()

    # Per-class accuracy bar
    plt.figure(figsize=(5,4))
    sns.barplot(x=class_names, y=per_class_acc)
    plt.ylabel("Per-Class Accuracy")
    plt.ylim(0,1)
    plt.title("Per-Class Accuracy")
    plt.show()

    metrics_dict = {
        "accuracy": acc,
        "precision_weighted": precision,
        "recall_weighted": recall,
        "f1_weighted": f1,
        "roc_auc_macro": roc_auc_macro,
        "per_class_accuracy": dict(zip(class_names, per_class_acc)),
    }

    return metrics_dict

# 7) Run experiment for ONE split

In [None]:
!pip install -q thop
from thop import profile

def run_experiment_for_split(
    train_ratio,
    test_ratio,
    all_samples,
    backbone_name="efficientnet_b0",
    num_epochs=50,
    batch_size=32,
    lr=1e-3,
    wd=1e-4,
    verbose=True,
    freeze_backbone=False,
):
    assert math.isclose(train_ratio + test_ratio, 1.0, rel_tol=1e-6), "Train+Test must = 1.0"

    # ----- split data -----
    idxs = list(range(len(all_samples)))
    train_idx, test_idx = train_test_split(
        idxs,
        train_size=train_ratio,
        random_state=SEED,
        shuffle=True,
        stratify=[s[1] for s in all_samples]
    )
    train_samples = [all_samples[i] for i in train_idx]
    test_samples  = [all_samples[i] for i in test_idx]

    # datasets / loaders
    ds_train = BetelLeafDataset(train_samples, transform=train_transform)
    ds_test  = BetelLeafDataset(test_samples,  transform=test_transform)
    train_loader = DataLoader(ds_train, batch_size=batch_size, shuffle=True,  num_workers=2)
    val_loader   = DataLoader(ds_test,  batch_size=batch_size, shuffle=False, num_workers=2)

    # ----- model / loss / optim -----
    model = get_model(backbone_name, num_classes=NUM_CLASSES).to(device)

    # ðŸ”¥ FREEZE logic
    if freeze_backbone:
        # EfficientNet
        if backbone_name == "efficientnet_b0":
            for name, param in model.named_parameters():
                if not name.startswith("classifier"):  # freeze everything except classifier
                    param.requires_grad = False

        # ResNet50
        elif backbone_name == "resnet50":
            for name, param in model.named_parameters():
                if not name.startswith("fc"):  # freeze all conv layers, train only fc
                    param.requires_grad = False

        # ViT
        elif backbone_name == "vit_b_16":
            for name, param in model.named_parameters():
                # vit_b_16 has classifier at model.heads
                if not name.startswith("heads"):
                    param.requires_grad = False

        else:
            # fallback: freeze all, then unfreeze last layer manually
            for param in model.parameters():
                param.requires_grad = False

    # compute GFLOPs & params once
    dummy_input = torch.randn(1, 3, IMG_SIZE, IMG_SIZE).to(device)
    macs, params = profile(model, inputs=(dummy_input,), verbose=False)
    gflops = macs / 1e9
    total_params = params / 1e6  # in millions

    # optimizer should only see trainable params
    trainable_params = [p for p in model.parameters() if p.requires_grad]
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(trainable_params, lr=lr, weight_decay=wd)

    history = {"train_loss": [], "val_loss": [], "train_acc": [], "val_acc": []}
    best_model_wts = copy.deepcopy(model.state_dict())
    best_val_acc   = 0.0

    start_time = time.time()

    # ----- training loop -----
    for epoch in range(num_epochs):
        train_loss, train_acc = train_one_epoch(model, train_loader, criterion, optimizer)
        val_loss, val_acc, _, _, _ = eval_one_epoch(model, val_loader, criterion)

        history["train_loss"].append(train_loss)
        history["val_loss"].append(val_loss)
        history["train_acc"].append(train_acc)
        history["val_acc"].append(val_acc)

        if verbose:
            print(f"[{int(train_ratio*100)}-{int(test_ratio*100)}] "
                  f"Epoch {epoch+1}/{num_epochs} "
                  f"TrainLoss={train_loss:.4f} ValLoss={val_loss:.4f} "
                  f"TrainAcc={train_acc:.4f} ValAcc={val_acc:.4f}")

        if val_acc > best_val_acc:
            best_val_acc = val_acc
            best_model_wts = copy.deepcopy(model.state_dict())

    total_time = time.time() - start_time

    # evaluate best model
    model.load_state_dict(best_model_wts)
    val_loss, val_acc, y_true, y_pred, y_prob = eval_one_epoch(model, val_loader, criterion)

    # plots & metrics
    plot_training_curves(history, title_prefix=f"Train:{int(train_ratio*100)}% Test:{int(test_ratio*100)}%")
    class_names = ["Healthy", "Diseased", "Dried"]
    metrics_dict = evaluate_metrics(y_true, y_pred, y_prob, class_names)

    # add meta info
    metrics_dict.update({
        "train_ratio": train_ratio,
        "test_ratio": test_ratio,
        "final_val_loss": float(val_loss),
        "final_val_acc": float(val_acc),
        "GFLOPs": float(gflops),
        "Params_M": float(total_params),
        "Train_Time_sec": float(total_time),
        "frozen_backbone": bool(freeze_backbone),
    })

    # save checkpoint
    ckpt_name = f"checkpoint_{backbone_name}_{int(train_ratio*100)}_{int(test_ratio*100)}.pt"
    ckpt_path = os.path.join(OUTPUT_DIR, ckpt_name)
    torch.save({
        "model_state_dict": model.state_dict(),
        "optimizer_state_dict": optimizer.state_dict(),
        "epoch": num_epochs,
        "history": history,
        "class_to_idx": class_to_idx,
        "train_ratio": train_ratio,
        "test_ratio": test_ratio,
        "random_seed": SEED,
        "GFLOPs": gflops,
        "Params_M": total_params,
        "Train_Time_sec": total_time,
        "frozen_backbone": bool(freeze_backbone),
    }, ckpt_path)

    print(f"Checkpoint saved to {ckpt_path}")
    return metrics_dict

# CELL 8: Execute one run

In [None]:
print("="*60)
print(f"Running split Train:{int(TRAIN_RATIO*100)}% / Test:{int(TEST_RATIO*100)}% with {BACKBONE}")
print("="*60)

single_result = run_experiment_for_split(
    train_ratio=TRAIN_RATIO,
    test_ratio=TEST_RATIO,
    all_samples=all_samples,
    backbone_name=BACKBONE,
    num_epochs=NUM_EPOCHS,
    batch_size=BATCH_SIZE,
    lr=LEARNING_RATE,
    wd=WEIGHT_DECAY,
    verbose=True,
    freeze_backbone=FREEZE_BACKBONE,
)

single_df = pd.DataFrame([{
    "backbone": BACKBONE,
    "train%": int(single_result["train_ratio"]*100),
    "test%": int(single_result["test_ratio"]*100),
    "val_acc": single_result["final_val_acc"],
    "val_loss": single_result["final_val_loss"],
    "precision_weighted": single_result["precision_weighted"],
    "recall_weighted": single_result["recall_weighted"],
    "f1_weighted": single_result["f1_weighted"],
    "roc_auc_macro": single_result["roc_auc_macro"],
    "per_class_acc_Healthy":  single_result["per_class_accuracy"]["Healthy"],
    "per_class_acc_Diseased": single_result["per_class_accuracy"]["Diseased"],
    "per_class_acc_Dried":    single_result["per_class_accuracy"]["Dried"],
    "GFLOPs": single_result["GFLOPs"],
    "Params_M": single_result["Params_M"],
    "Train_Time_sec": single_result["Train_Time_sec"],
    "frozen_backbone": single_result["frozen_backbone"],
}])

display(single_df)

csv_name = f"task2_results_{BACKBONE}_{int(TRAIN_RATIO*100)}_{int(TEST_RATIO*100)}.csv"
csv_path = os.path.join(OUTPUT_DIR, csv_name)
single_df.to_csv(csv_path, index=False)
print("Saved summary CSV to:", csv_path)

Running split Train:90% / Test:10% with efficientnet_b0
Downloading: "https://download.pytorch.org/models/efficientnet_b0_rwightman-7f5810bc.pth" to /root/.cache/torch/hub/checkpoints/efficientnet_b0_rwightman-7f5810bc.pth


100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 20.5M/20.5M [00:00<00:00, 141MB/s]


[90-10] Epoch 1/50 TrainLoss=0.7335 ValLoss=0.4348 TrainAcc=0.7042 ValAcc=0.8852
[90-10] Epoch 2/50 TrainLoss=0.4944 ValLoss=0.3509 TrainAcc=0.8270 ValAcc=0.8947
[90-10] Epoch 3/50 TrainLoss=0.4103 ValLoss=0.3042 TrainAcc=0.8596 ValAcc=0.9282
[90-10] Epoch 4/50 TrainLoss=0.3745 ValLoss=0.3006 TrainAcc=0.8628 ValAcc=0.8804
[90-10] Epoch 5/50 TrainLoss=0.3677 ValLoss=0.2635 TrainAcc=0.8612 ValAcc=0.9330
[90-10] Epoch 6/50 TrainLoss=0.3196 ValLoss=0.2512 TrainAcc=0.8783 ValAcc=0.9234
[90-10] Epoch 7/50 TrainLoss=0.3487 ValLoss=0.2393 TrainAcc=0.8617 ValAcc=0.9234
[90-10] Epoch 8/50 TrainLoss=0.2969 ValLoss=0.2328 TrainAcc=0.8932 ValAcc=0.9378
[90-10] Epoch 9/50 TrainLoss=0.2851 ValLoss=0.2438 TrainAcc=0.8948 ValAcc=0.9234
[90-10] Epoch 10/50 TrainLoss=0.3041 ValLoss=0.2074 TrainAcc=0.8815 ValAcc=0.9330
[90-10] Epoch 11/50 TrainLoss=0.3123 ValLoss=0.2315 TrainAcc=0.8788 ValAcc=0.9091
[90-10] Epoch 12/50 TrainLoss=0.2753 ValLoss=0.1995 TrainAcc=0.8922 ValAcc=0.9330
[90-10] Epoch 13/50 Train