### Experiment 2 – Large-Scale Dataset Generalization

#### In this stage, the study tests ResNet-18 and ConvNeXt-Tiny on the larger, more complex LeafSnap dataset. This experiment explores how model capacity and architecture impact performance under fine-grained, high-diversity conditions, and examines the generalization power of pretrained models beyond controlled environments

### 1. Imports
#### This cell imports all the necessary libraries for the project, including PyTorch, Torchvision, NumPy, Pandas, and Scikit-learn.

In [1]:
import os, json, math, random, time, re, glob, warnings
from pathlib import Path
warnings.filterwarnings("ignore")

import numpy as np
import pandas as pd

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, Subset, WeightedRandomSampler

import torchvision
from torchvision import transforms, datasets
from torchvision.models import convnext_tiny, ConvNeXt_Tiny_Weights, resnet18, ResNet18_Weights

from sklearn.metrics import classification_report, confusion_matrix, f1_score, accuracy_score

### 2. Configuration
#### This cell contains all the hyperparameters and settings for the experiment in a single dictionary CFG. 



In [2]:
# -----------------------------
# Configuration
# -----------------------------
CFG = {
    "seed": 42,
    "data_root_candidates": [
        "/kaggle/input/leafsnap-dataset/leafsnap-dataset/dataset/images",
        "/kaggle/input/leafsnap-dataset/leafsnap-dataset/dataset",      # Fallback path
    ],
    "use_subdirs": ["field", "lab"],     # Use both domains; change to ["field"] to use only field images
    "min_per_class": 80,                 # Minimum images per class to be included
    "max_classes": 50,                   # Maximum number of classes to use (Top-K by sample count)
    "split_ratio": 0.85,                 # Training set proportion (stratified within each class)
    "balance_train": True,               # Whether to use class balancing for the training set
    "img_size": 224,
    "batch_size": 64,
    "num_workers": 4,
    "model_name": "convnext_tiny",       # Options: convnext_tiny | resnet18 | customcnn
    "pretrained": True,
    "epochs": 20,
    "lr": 2e-4,
    "weight_decay": 1e-4,
    "mixed_precision": True,
    "enable_cam": True,
    "cam_max_images": 12,
    "out_root": "/kaggle/working/leafsnap_runs",
    "run_name": "convnext_tiny_leafsnap_phase4",
    "save_every": 1,                     # Save confusion matrix/report every N epochs
}

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

## 3. Utility Functions & Custom Dataset
#### This cell defines helper functions for reproducibility (set_seed), directory management (ensure_dir), and automatically finding the data path (auto_find_data_root).

In [3]:
# -----------------------------
# Utility Functions
# -----------------------------
def set_seed(seed=42):
    """Sets the seed for reproducibility."""
    random.seed(seed); np.random.seed(seed); torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed); torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

def ensure_dir(p: Path):
    """Creates a directory if it does not exist."""
    p.mkdir(parents=True, exist_ok=True)

def auto_find_data_root():
    """Automatically finds the dataset root directory from a list of candidates."""
    for p in CFG["data_root_candidates"]:
        if Path(p).exists():
            return Path(p)
    raise FileNotFoundError(
        f"No valid dataset root found. Checked: {CFG['data_root_candidates']}.\n"
        "Mount the dataset 'leafsnap-dataset' in your notebook first (Add data -> Kaggle dataset)."
    )

class ListDataset(Dataset):
    """A custom dataset that takes a list of (path, label) samples."""
    def __init__(self, samples, transform=None):
        self.samples = samples
        self.transform = transform
    def __len__(self): return len(self.samples)
    def __getitem__(self, i):
        path, y = self.samples[i]
        # Read image, handle RGBA, and convert to PIL for transforms
        img = torchvision.io.read_image(path).float()/255.0
        if img.size(0) == 4: img = img[:3, ...] # Handle RGBA by dropping alpha
        img = transforms.ToPILImage()(img)
        if self.transform: img = self.transform(img)
        return img, y, path

### 4. Data Loading Pipeline
#### This section contains the core logic for preparing the data. It scans the data directories, filters classes based on sample count, performs a stratified train/validation split, defines image augmentations, and finally creates the DataLoader instances for training and validation.

In [4]:
# -----------------------------
# Data Loading Pipeline
# -----------------------------
def build_index(root: Path, use_subdirs):
    """
    Scans directories, merges classes from {field, lab}, filters by sample count,
    and returns a dictionary mapping class names to their file lists.
    """
    class_to_files = {}
    for sub in use_subdirs:
        d = root / sub
        if not d.exists(): continue
        for cls_dir in sorted(d.glob("*")):
            if not cls_dir.is_dir(): continue
            cls_name = cls_dir.name
            files = sorted(list(cls_dir.glob("*.jpg")) + list(cls_dir.glob("*.png")))
            if not files: continue
            class_to_files.setdefault(cls_name, []).extend(files)
    
    # Filter by min samples and sort to get top-k classes
    items = [(k, v) for k, v in class_to_files.items() if len(v) >= CFG["min_per_class"]]
    if not items:
        raise RuntimeError("No class meets min_per_class. Try lowering CFG['min_per_class'].")
    items.sort(key=lambda kv: len(kv[1]), reverse=True)
    if CFG["max_classes"] is not None:
        items = items[:CFG["max_classes"]]
    return {k: v for k, v in items}

def stratified_split_by_class(files_by_class, split_ratio):
    """
    Performs a stratified train/val split within each class.
    Returns [(path, class_idx), ...], [(path, class_idx), ...], [class_names]
    """
    classes = sorted(files_by_class.keys())
    cls_to_idx = {c: i for i, c in enumerate(classes)}
    train_list, val_list = [], []
    for c in classes:
        files = files_by_class[c]
        random.shuffle(files)
        n = len(files); n_train = int(n * split_ratio)
        train_list.extend([(str(p), cls_to_idx[c]) for p in files[:n_train]])
        val_list.extend([(str(p), cls_to_idx[c]) for p in files[n_train:]])
    return train_list, val_list, classes

def build_dataloaders():
    """Builds and returns train/validation dataloaders and class names."""
    data_root = auto_find_data_root()
    files_by_class = build_index(data_root, CFG["use_subdirs"])
    train_list, val_list, class_names = stratified_split_by_class(files_by_class, CFG["split_ratio"])

    # Image transformations
    mean, std = [0.485, 0.456, 0.406], [0.229, 0.224, 0.225]
    train_tf = transforms.Compose([
        transforms.RandomResizedCrop(CFG["img_size"], scale=(0.6, 1.0)),
        transforms.RandAugment(num_ops=2, magnitude=7),
        transforms.ColorJitter(0.2, 0.2, 0.2, 0.1),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize(mean, std),
    ])
    val_tf = transforms.Compose([
        transforms.Resize(CFG["img_size"] + 32),
        transforms.CenterCrop(CFG["img_size"]),
        transforms.ToTensor(),
        transforms.Normalize(mean, std),
    ])

    ds_train = ListDataset(train_list, train_tf)
    ds_val = ListDataset(val_list, val_tf)

    # Training sampler: optional class balancing
    if CFG["balance_train"]:
        counts = np.bincount([y for _, y in train_list], minlength=len(class_names))
        class_weights = 1.0 / np.clip(counts, 1, None)
        sample_weights = [class_weights[y] for _, y in train_list]
        sampler = WeightedRandomSampler(sample_weights, num_samples=len(sample_weights), replacement=True)
        train_loader = DataLoader(ds_train, batch_size=CFG["batch_size"], sampler=sampler,
                                  num_workers=CFG["num_workers"], pin_memory=True)
    else:
        train_loader = DataLoader(ds_train, batch_size=CFG["batch_size"], shuffle=True,
                                  num_workers=CFG["num_workers"], pin_memory=True)

    val_loader = DataLoader(ds_val, batch_size=CFG["batch_size"], shuffle=False,
                            num_workers=CFG["num_workers"], pin_memory=True)
    return train_loader, val_loader, class_names

### 5. Model & CAM (Class Activation Mapping) Setup
#### This cell defines the model architectures (SmallCNN, and logic to adapt ConvNeXt/ResNet). It also includes functions to set up Grad-CAM by automatically selecting a target layer and attaching the necessary forward and backward hooks to capture activations and gradients.

In [5]:
# -----------------------------
# Model & CAM Setup
# -----------------------------
class SmallCNN(nn.Module):
    """A simple custom CNN for baseline experiments."""
    def __init__(self, num_classes):
        super().__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(3, 32, 3, 1, 1), nn.BatchNorm2d(32), nn.ReLU(inplace=True), nn.MaxPool2d(2),
            nn.Conv2d(32, 64, 3, 1, 1), nn.BatchNorm2d(64), nn.ReLU(inplace=True), nn.MaxPool2d(2),
            nn.Conv2d(64, 128, 3, 1, 1), nn.BatchNorm2d(128), nn.ReLU(inplace=True),
            nn.AdaptiveAvgPool2d(1)
        )
        self.head = nn.Linear(128, num_classes)
    def forward(self, x):
        feat = self.conv(x).flatten(1) # B,128,1,1 -> B,128
        return self.head(feat)

def build_model(num_classes):
    """Builds the model based on the name specified in CFG."""
    name = CFG["model_name"].lower()
    if name == "convnext_tiny":
        weights = ConvNeXt_Tiny_Weights.IMAGENET1K_V1 if CFG["pretrained"] else None
        m = convnext_tiny(weights=weights)
        m.classifier[-1] = nn.Linear(m.classifier[-1].in_features, num_classes)
        return m
    elif name == "resnet18":
        weights = ResNet18_Weights.IMAGENET1K_V1 if CFG["pretrained"] else None
        m = resnet18(weights=weights)
        m.fc = nn.Linear(m.fc.in_features, num_classes)
        return m
    else:
        return SmallCNN(num_classes)

def select_cam_layer(model):
    """Robustly selects a layer for CAM, preferring the last depthwise Conv2d."""
    # Prioritize the last depthwise convolution, common in modern architectures
    cand = None
    for n, m in model.named_modules():
        if isinstance(m, nn.Conv2d) and m.groups > 1 and m.in_channels == m.out_channels:
            cand = m
    if cand is not None:
        return cand
    # Fallback: take the very last Conv2d layer
    last = None
    for n, m in model.named_modules():
        if isinstance(m, nn.Conv2d):
            last = m
    return last

def build_cam_handle(model):
    """Attaches forward and backward hooks to the target layer for CAM."""
    if not CFG["enable_cam"]:
        return None, []
    layer = select_cam_layer(model)
    if layer is None:
        print("[cam] no suitable conv layer found; CAM disabled.")
        return None, []
    
    # Dictionaries to store activations and gradients
    acts, grads = {"value": None}, {"value": None}
    
    def fwd_hook(module, inp, out):
        acts["value"] = out.detach() if not out.requires_grad else out
    def bwd_hook(module, grad_in, grad_out):
        grads["value"] = grad_out[0]
        
    h1 = layer.register_forward_hook(fwd_hook)
    h2 = layer.register_full_backward_hook(bwd_hook)
    return (acts, grads), [h1, h2]

### 6. Evaluation & Logging Functions
#### This set of functions handles the evaluation loop, calculates metrics, and saves various artifacts like the training curve, confusion matrix, per-class classification report, and Grad-CAM visualizations.

In [6]:
# -----------------------------
# Evaluation & Logging Functions
# -----------------------------
def evaluate(model, loader, num_classes):
    """Evaluates the model on a given dataloader and returns metrics."""
    model.eval()
    y_true, y_pred = [], []
    with torch.no_grad():
        for x, y, _ in loader:
            x = x.to(device)
            logits = model(x)
            pred = logits.argmax(1).cpu().numpy().tolist()
            y_pred.extend(pred)
            y_true.extend(y.numpy().tolist())
    acc = accuracy_score(y_true, y_pred)
    f1 = f1_score(y_true, y_pred, average="macro")
    cm = confusion_matrix(y_true, y_pred, labels=list(range(num_classes)))
    return acc, f1, cm, y_true, y_pred

def save_cm_and_report(cm, y_true, y_pred, class_names, out_dir: Path, epoch: int):
    """Saves the confusion matrix and a per-class classification report."""
    ensure_dir(out_dir)
    # Confusion Matrix: write class names in the first row/col for readability
    cm_path = out_dir / f"confusion_matrix_epoch{epoch:03d}.csv"
    df_cm = pd.DataFrame(cm, columns=class_names)
    df_cm.insert(0, "true\\pred", class_names)
    df_cm.to_csv(cm_path, index=False)

    # Per-class report
    rep = classification_report(y_true, y_pred, labels=list(range(len(class_names))),
                                target_names=class_names, output_dict=True, zero_division=0)
    df_rep = pd.DataFrame(rep).T.reset_index().rename(columns={"index": "class"})
    df_rep.to_csv(out_dir / f"per_class_report_epoch{epoch:03d}.csv", index=False)

def save_train_curve(curves, out_csv: Path):
    """Saves the training history (loss, acc, f1) to a CSV file."""
    df = pd.DataFrame(curves)
    df.to_csv(out_csv, index=False)

def save_cam_samples(model, loader, class_names, out_dir: Path, actgrad, hooks, max_images=12):
    """Generates and saves Grad-CAM visualization samples."""
    ensure_dir(out_dir / "cam")
    acts, grads = actgrad
    saved = 0
    mean = torch.tensor([0.485, 0.456, 0.406], device=device).view(1, 3, 1, 1)
    std = torch.tensor([0.229, 0.224, 0.225], device=device).view(1, 3, 1, 1)

    model.eval()
    for x, y, paths in loader:
        # -- Key Fix: Enable gradients for CAM generation --
        with torch.enable_grad():
            x = x.to(device)
            logits = model(x)
            pred = logits.argmax(1)

            for i in range(x.size(0)):
                if saved >= max_images: break
                model.zero_grad(set_to_none=True)
                score = logits[i, pred[i]]
                score.backward(retain_graph=True)

                A = acts["value"][i]    # Activations C x H x W
                dA = grads["value"][i]  # Gradients C x H x W
                if A.dim() == 4: A = A.squeeze(0)
                if dA.dim() == 4: dA = dA.squeeze(0)

                # Grad-CAM calculation
                w = dA.mean(dim=(1, 2), keepdim=True) # C x 1 x 1
                cam = torch.relu((A * w).sum(dim=0))  # H x W
                cam = (cam - cam.min()) / (cam.max() - cam.min() + 1e-6)
                cam = cam.unsqueeze(0).unsqueeze(0)  # 1x1xHxW
                cam = F.interpolate(cam, size=x[i:i+1].shape[-2:], mode="bilinear", align_corners=False)[0]

                # Denormalize image for visualization
                img = x[i:i+1] * std + mean
                img = img.clamp(0, 1)[0]

                # Create simple red heatmap and overlay it on the image
                heat = torch.zeros_like(img)
                heat[0] = cam[0]
                overlay = (0.6 * img + 0.4 * heat).clamp(0, 1)

                # Save the result
                name = f"{saved:02d}_pred_{class_names[pred[i]]}.jpg"
                torchvision.utils.save_image(overlay.cpu(), out_dir / "cam" / name)
                saved += 1
        if saved >= max_images: break
    
    # Clean up hooks to prevent memory leaks
    for h in hooks:
        h.remove()

### 7. Main Training Pipeline

In [7]:
# -----------------------------
# Main Training Pipeline
# -----------------------------
def train_one_run():
    """Executes a full training and validation run."""
    set_seed(CFG["seed"])
    out_dir = Path(CFG["out_root"]) / CFG["run_name"]
    met_dir = out_dir / "metrics"
    ensure_dir(met_dir)

    train_loader, val_loader, class_names = build_dataloaders()
    num_classes = len(class_names)
    print(f"Starting training for {num_classes} classes.")

    # Model / Optimizer / Criterion
    model = build_model(num_classes).to(device)
    optimizer = torch.optim.AdamW(model.parameters(), lr=CFG["lr"], weight_decay=CFG["weight_decay"])
    scaler = torch.cuda.amp.GradScaler(enabled=CFG["mixed_precision"])
    criterion = nn.CrossEntropyLoss()

    # Save resource info
    with open(met_dir / "resource.json", "w") as f:
        json.dump({
            "params": sum(p.numel() for p in model.parameters()),
            "model": CFG["model_name"], "pretrained": CFG["pretrained"],
            "img_size": CFG["img_size"], "batch_size": CFG["batch_size"],
            "num_classes": num_classes, "class_names": class_names,
        }, f, indent=2)

    # Setup CAM hooks
    actgrad, hooks = build_cam_handle(model)

    best_f1, best_ep = -1.0, -1
    # Columns: loss=train_loss, acc=train_acc, f1=val_f1(macro)
    curves = {"epoch": [], "loss": [], "acc": [], "f1": []}
    best_path = out_dir / f"{CFG['run_name']}_best.pt"

    for epoch in range(1, CFG["epochs"] + 1):
        model.train()
        losses, preds, gts = [], [], []
        
        for x, y, _ in train_loader:
            x, y = x.to(device), y.to(device)
            optimizer.zero_grad(set_to_none=True)
            with torch.cuda.amp.autocast(enabled=CFG["mixed_precision"]):
                logits = model(x)
                loss = criterion(logits, y)
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()

            losses.append(loss.item())
            preds.extend(logits.argmax(1).detach().cpu().numpy().tolist())
            gts.extend(y.detach().cpu().numpy().tolist())

        tr_loss = float(np.mean(losses))
        tr_acc = accuracy_score(gts, preds)

        # Validation
        val_acc, val_f1, cm, y_true, y_pred = evaluate(model, val_loader, num_classes)
        print(f"[{CFG['model_name']}] [Epoch {epoch:03d}/{CFG['epochs']}] "
              f"train_loss={tr_loss:.4f} acc={tr_acc:.4f} | val_acc={val_acc:.4f} f1={val_f1:.4f}")

        # Log metrics for the training curve
        curves["epoch"].append(epoch)
        curves["loss"].append(tr_loss)
        curves["acc"].append(tr_acc)
        curves["f1"].append(val_f1)
        save_train_curve(curves, met_dir / "train_curve.csv")

        # Save confusion matrix & report (periodically)
        if epoch % CFG["save_every"] == 0 or epoch == CFG["epochs"]:
            save_cm_and_report(cm, y_true, y_pred, class_names, met_dir, epoch)

        # Save CAM samples on the first epoch
        if CFG["enable_cam"] and epoch == 1 and actgrad is not None:
            save_cam_samples(model, val_loader, class_names, met_dir, actgrad, hooks, CFG["cam_max_images"])

        # Save the best model based on validation F1 score
        if val_f1 > best_f1:
            best_f1, best_ep = val_f1, epoch
            torch.save({"model_cfg": CFG, "state_dict": model.state_dict(), "class_names": class_names},
                       best_path)
            print(f"  -> New best model saved at epoch {best_ep} with F1-score: {best_f1:.4f}")

    print(f"\nTraining finished. Best model from epoch {best_ep} saved at {best_path}")
    return out_dir, best_ep, class_names, best_path

### 8. Fine-tuning at a Higher Resolution

In [8]:
# -----------------------------
# (Optional) Fine-tuning at 288px
# -----------------------------
def finetune_288(best_ckpt_path: Path, class_names):
    """(Optional) Fine-tunes the best model at a higher resolution (288px)."""
    print("\n--- Starting 288px Fine-tuning Stage ---")
    num_classes = len(class_names)
    
    # Load the best model from the previous stage
    model = build_model(num_classes).to(device)
    ckpt = torch.load(best_ckpt_path, map_location="cpu")
    model.load_state_dict(ckpt["state_dict"], strict=False)

    # Build new dataloaders with 288px image size and light augmentation
    data_root = auto_find_data_root()
    files_by_class = build_index(data_root, CFG["use_subdirs"])
    train_list, val_list, _ = stratified_split_by_class(files_by_class, CFG["split_ratio"])
    
    train_tf = transforms.Compose([
        transforms.RandomResizedCrop(288, scale=(0.7, 1.0)),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
    ])
    val_tf = transforms.Compose([
        transforms.Resize(288 + 32),
        transforms.CenterCrop(288),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
    ])
    
    ds_train = ListDataset(train_list, train_tf)
    ds_val = ListDataset(val_list, val_tf)
    train_loader = DataLoader(ds_train, batch_size=48, shuffle=True, num_workers=CFG["num_workers"], pin_memory=True)
    val_loader = DataLoader(ds_val, batch_size=64, shuffle=False, num_workers=CFG["num_workers"], pin_memory=True)

    # Fine-tune with a low learning rate
    optimizer = torch.optim.AdamW(model.parameters(), lr=1e-5, weight_decay=1e-5)
    scaler = torch.cuda.amp.GradScaler(enabled=CFG["mixed_precision"])
    criterion = nn.CrossEntropyLoss()

    out_dir = Path(CFG["out_root"]) / (CFG["run_name"] + "_ft288")
    met_dir = out_dir / "metrics"
    ensure_dir(met_dir)

    best_f1, best_ep = -1.0, -1
    for epoch in range(1, 6):
        model.train()
        losses, preds, gts = [], [], []
        for x, y, _ in train_loader:
            x, y = x.to(device), y.to(device)
            optimizer.zero_grad(set_to_none=True)
            with torch.cuda.amp.autocast(enabled=CFG["mixed_precision"]):
                logits = model(x)
                loss = criterion(logits, y)
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()
            losses.append(loss.item())
            preds.extend(logits.argmax(1).detach().cpu().numpy().tolist())
            gts.extend(y.detach().cpu().numpy().tolist())

        tr_loss = float(np.mean(losses))
        tr_acc = accuracy_score(gts, preds)
        val_acc, val_f1, cm, y_true, y_pred = evaluate(model, val_loader, num_classes)

        print(f"[FT 288] [Epoch {epoch}/5] train_loss={tr_loss:.4f} acc={tr_acc:.4f} | "
              f"val_acc={val_acc:.4f} f1={val_f1:.4f}")

        if val_f1 > best_f1:
            best_f1, best_ep = val_f1, epoch
            best_ft_path = out_dir / f"{CFG['run_name']}_ft288_best.pt"
            torch.save({"model_cfg": CFG, "state_dict": model.state_dict(), "class_names": class_names},
                       best_ft_path)
            print(f"  -> New best fine-tuned model saved at epoch {best_ep} with F1-score: {best_f1:.4f}")
        
        save_cm_and_report(cm, y_true, y_pred, class_names, met_dir, epoch)

    print(f"\nFine-tuning finished. Best model saved at {best_ft_path}")
    return out_dir, best_ep

In [None]:
# -----------------------------
# Execution
# -----------------------------
set_seed(CFG["seed"])
ensure_dir(Path(CFG["out_root"]))
print("Using device:", device)

# --- Start the main training run ---
out_dir, best_ep, class_names, best_path = train_one_run()

# --- (Optional) Uncomment the lines below to run the 288px fine-tuning stage ---
# print("\n" + "="*50)
# ft_out_dir, ft_best_ep = finetune_288(best_path, class_names)
# print("="*50)

print(f"\nAll done. Main outputs are under: {out_dir}")

Using device: cuda
Starting training for 50 classes.


Downloading: "https://download.pytorch.org/models/convnext_tiny-983f1562.pth" to /root/.cache/torch/hub/checkpoints/convnext_tiny-983f1562.pth
100%|██████████| 109M/109M [00:00<00:00, 164MB/s]  
