# **Plant Leaf Disease Detection**
## Phase 1: Dataset Audit, Cleaning, and Baseline Modeling

### 1. **Dataset Audit & Quality Profiling**
   
ðŸ“Œ Objective

Before training any model, we validate dataset quality:
* Remove corrupt images
* Detect duplicates
* Identify low-quality samples
* Check for background bias (lab bias risk)
This ensures the model learns leaf disease patterns, not noise or background shortcuts.

Cell 1 â€” Dataset Audit Pipeline

In [None]:
# ===========================
# Dataset Audit & Quality Analysis
# ===========================

import os
import cv2
import numpy as np
import pandas as pd
from tqdm import tqdm
from PIL import Image
import imagehash
from pathlib import Path
from collections import Counter
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt

# Paths
DATA_DIR = Path("/kaggle/input/plant-disease-detection-dataset-master-version/MasterDataset")
REPORT_DIR = Path("outputs/audit_report")
REPORT_DIR.mkdir(parents=True, exist_ok=True)

# ---------- Helper Functions ----------
def is_image_ok(img_path):
    """Check whether image is readable and valid RGB."""
    try:
        Image.open(img_path).convert("RGB")
        return True
    except:
        return False

def image_stats(img_path):
    """Compute brightness, contrast, and sharpness."""
    img = cv2.imread(str(img_path))
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    brightness = gray.mean()
    contrast = gray.std()
    sharpness = cv2.Laplacian(gray, cv2.CV_64F).var()
    return brightness, contrast, sharpness

def compute_phash(img_path):
    """Perceptual hash for duplicate detection."""
    try:
        return str(imagehash.phash(Image.open(img_path).convert("RGB")))
    except:
        return None

def background_features(img_path):
    """Simple background embedding (mean + std color)."""
    img = cv2.imread(str(img_path))
    img = cv2.resize(img, (64, 64))
    return np.concatenate([img.mean((0,1)), img.std((0,1))])

# ---------- Audit Loop ----------
records = []
removed_files = 0

for split in ["train", "val", "test"]:
    split_dir = DATA_DIR / split
    if not split_dir.exists():
        continue

    for cls in sorted(os.listdir(split_dir)):
        cls_dir = split_dir / cls
        if not cls_dir.is_dir():
            continue

        for img_path in tqdm(cls_dir.glob("*"), desc=f"{split}/{cls}"):
            if not img_path.is_file():
                continue

            if not is_image_ok(img_path):
                img_path.unlink()
                removed_files += 1
                continue

            b, c, s = image_stats(img_path)
            ph = compute_phash(img_path)
            bg = background_features(img_path)

            records.append([split, cls, img_path.name, b, c, s, ph, bg])

df = pd.DataFrame(records, columns=[
    "split", "class", "filename",
    "brightness", "contrast", "sharpness",
    "phash", "bg_feats"
])

print(f"Removed {removed_files} corrupt images")


Duplicate & Quality Outlier Detection

In [None]:
# ---------- Duplicate Detection ----------
dup_counts = Counter(df["phash"].dropna())
dup_hashes = [h for h, c in dup_counts.items() if c > 1]

dup_df = df[df["phash"].isin(dup_hashes)]
dup_df.to_csv(REPORT_DIR / "duplicates.csv", index=False)

# ---------- Quality Outliers ----------
low_b = df["brightness"].quantile(0.05)
high_b = df["brightness"].quantile(0.95)
low_s = df["sharpness"].quantile(0.05)

outliers = df[
    (df["brightness"] < low_b) |
    (df["brightness"] > high_b) |
    (df["sharpness"] < low_s)
]

outliers.to_csv(REPORT_DIR / "quality_outliers.csv", index=False)

print(f"Duplicate groups: {len(dup_hashes)}")
print(f"Quality outliers: {len(outliers)}")


Background Bias Visualization

In [None]:
# ---------- Background Bias PCA ----------
bg_matrix = np.stack(df["bg_feats"].values)
pca = PCA(n_components=2)
proj = pca.fit_transform(bg_matrix)

df["bg_x"], df["bg_y"] = proj[:,0], proj[:,1]

plt.figure(figsize=(6,6))
for cls in df["class"].unique():
    subset = df[df["class"] == cls]
    plt.scatter(subset["bg_x"], subset["bg_y"], s=8, alpha=0.6, label=cls)

plt.legend(bbox_to_anchor=(1.05,1))
plt.title("Background Bias PCA")
plt.tight_layout()
plt.savefig(REPORT_DIR / "background_bias_pca.png")
plt.close()

# Save audit metadata
df.drop(columns=["bg_feats"]).to_csv(REPORT_DIR / "dataset_audit.csv", index=False)


### 2. **Manual Review & Cleaning**

Cell 2 â€” Inspect Audit Outputs

In [None]:
from PIL import Image
from IPython.display import display

display(pd.read_csv(REPORT_DIR / "duplicates.csv").head())
display(pd.read_csv(REPORT_DIR / "quality_outliers.csv").head())

Image.open(REPORT_DIR / "background_bias_pca.png")


Cell 3 â€” Remove Duplicate Images (Safe Backup)

In [None]:
import shutil

REMOVED_DUP_DIR = Path("outputs/removed_duplicates")
REMOVED_DUP_DIR.mkdir(parents=True, exist_ok=True)

dup_df = pd.read_csv(REPORT_DIR / "duplicates.csv")

for _, grp in dup_df.groupby("phash"):
    for _, row in grp.iloc[1:].iterrows():
        src = DATA_DIR / row["split"] / row["class"] / row["filename"]
        if src.exists():
            shutil.move(src, REMOVED_DUP_DIR / row["filename"])


Cell 4 â€” Remove Quality Outliers

In [None]:
REMOVED_OUT_DIR = Path("outputs/removed_outliers")
REMOVED_OUT_DIR.mkdir(parents=True, exist_ok=True)

out_df = pd.read_csv(REPORT_DIR / "quality_outliers.csv")

for _, row in out_df.iterrows():
    src = DATA_DIR / row["split"] / row["class"] / row["filename"]
    if src.exists():
        shutil.move(src, REMOVED_OUT_DIR / row["filename"])


### 3. **Exploratory Data Analysis (EDA)**

Cell 5 â€” Class Distribution & Visual Sampling

In [None]:
import seaborn as sns
import random

audit_df = pd.read_csv(REPORT_DIR / "dataset_audit.csv")

plt.figure(figsize=(10,4))
sns.countplot(data=audit_df, x="class")
plt.xticks(rotation=90)
plt.title("Class Distribution")
plt.show()


### 4. **Metadata Preparation**

Cell 6 â€” Generate Training Metadata

In [None]:
metadata = []

for split in ["train", "val", "test"]:
    for cls_dir in (DATA_DIR / split).iterdir():
        if cls_dir.is_dir():
            for img in cls_dir.iterdir():
                if img.suffix.lower() in [".jpg",".png",".jpeg"]:
                    metadata.append({
                        "filepath": str(img),
                        "label": cls_dir.name,
                        "split": split
                    })

meta_df = pd.DataFrame(metadata)
Path("outputs/metadata").mkdir(parents=True, exist_ok=True)
meta_df.to_csv("outputs/metadata/metadata.csv", index=False)

print("Metadata saved.")


### 5. **Baseline Model (Frozen Backbone Strategy)**

ðŸ“Œ Ideology

* Use pretrained ResNet50

* Freeze backbone â†’ train only classifier

* Establish strong baseline before heavy tuning

Cell 7 â€” Model Training & Evaluation (Cleaned)

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import timm
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix, top_k_accuracy_score
import torch.nn.functional as F
from tqdm import tqdm

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class PlantDataset(Dataset):
    def __init__(self, df, transform=None):
        self.df = df.reset_index(drop=True)
        self.transform = transform
        self.label2idx = {l:i for i,l in enumerate(sorted(df.label.unique()))}
        self.idx2label = {v:k for k,v in self.label2idx.items()}

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        img = Image.open(row.filepath).convert("RGB")
        if self.transform:
            img = self.transform(img)
        return img, self.label2idx[row.label]

    def __len__(self):
        return len(self.df)

transform = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225])
])

meta = pd.read_csv("outputs/metadata/metadata.csv")
train_ds = PlantDataset(meta[meta.split=="train"], transform)
val_ds = PlantDataset(meta[meta.split=="val"], transform)

train_loader = DataLoader(train_ds, batch_size=32, shuffle=True)
val_loader = DataLoader(val_ds, batch_size=64)

backbone = timm.create_model("resnet50", pretrained=True)
backbone.reset_classifier(0)
for p in backbone.parameters():
    p.requires_grad = False

model = nn.Sequential(
    backbone,
    nn.Linear(backbone.num_features, len(train_ds.label2idx))
).to(DEVICE)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model[1].parameters(), lr=1e-3)


Training Loop

In [None]:
def run_epoch(model, loader, train=True):
    model.train() if train else model.eval()
    losses, preds, labels, outputs = [], [], [], []

    with torch.set_grad_enabled(train):
        for x,y in tqdm(loader, leave=False):
            x,y = x.to(DEVICE), y.to(DEVICE)
            out = model(x)
            loss = criterion(out, y)

            if train:
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

            losses.append(loss.item())
            preds.append(out.argmax(1).cpu())
            labels.append(y.cpu())
            outputs.append(out.cpu())

    preds = torch.cat(preds)
    labels = torch.cat(labels)
    outputs = torch.cat(outputs)

    return {
        "loss": np.mean(losses),
        "acc": accuracy_score(labels, preds),
        "f1": f1_score(labels, preds, average="weighted"),
        "top3": top_k_accuracy_score(labels, F.softmax(outputs,1), k=3),
        "cm": confusion_matrix(labels, preds)
    }

for epoch in range(3):
    train_metrics = run_epoch(model, train_loader, True)
    val_metrics = run_epoch(model, val_loader, False)

    print(f"Epoch {epoch+1} | "
          f"Train Loss {train_metrics['loss']:.4f} | "
          f"Val Acc {val_metrics['acc']:.4f} | "
          f"F1 {val_metrics['f1']:.4f} | "
          f"Top3 {val_metrics['top3']:.4f}")


# **Phase 2: Advanced Training Pipeline (Stage-6 Model)**
This phase focuses on robust generalization, domain realism, and stable optimization using modern deep learning practices.

### 1. **Environment, Reproducibility & Paths**

Clean Cell â€” Setup

In [None]:
# ===========================
# Environment & Reproducibility
# ===========================

import os, random, math
from pathlib import Path
import numpy as np
import pandas as pd

import cv2
import albumentations as A
from albumentations.pytorch import ToTensorV2

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.cuda.amp import autocast, GradScaler
from torch.optim.swa_utils import AveragedModel
from torch.utils.data import Dataset, DataLoader

import timm
from tqdm import tqdm
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix, top_k_accuracy_score

# ---------------------------
# Reproducibility
# ---------------------------
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)

torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", DEVICE)

# ---------------------------
# Paths
# ---------------------------
METADATA_CSV = "outputs/metadata/metadata.csv"

OUTPUTS_DIR = Path("outputs")
CKPT_DIR = OUTPUTS_DIR / "checkpoints"
FIG_DIR  = OUTPUTS_DIR / "figures"
AUG_DIR  = OUTPUTS_DIR / "augmentation_experiments"

for d in [CKPT_DIR, FIG_DIR, AUG_DIR]:
    d.mkdir(parents=True, exist_ok=True)


### 2. **Dataset: Domain-Specific Cleanup + Safety Checks**
ðŸ“Œ Why this matters

Identified that some classes contained black borders / letterboxing, which:

* Biases the model

* Acts as a shortcut feature

* Reduces real-world performance

Conditionally cleaned images per class â€” this is a very strong dataset insight.

Clean Cell â€” Dataset Class

In [None]:
def strip_black_bars_rgb(img, threshold=4):
    """Remove near-black borders if present."""
    gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
    col_mean = gray.mean(axis=0)
    row_mean = gray.mean(axis=1)

    def bounds(arr):
        lo = next((i for i, v in enumerate(arr) if v > threshold), 0)
        hi = len(arr) - next((i for i, v in enumerate(arr[::-1]) if v > threshold), 1)
        return lo, hi

    x0, x1 = bounds(col_mean)
    y0, y1 = bounds(row_mean)

    if x1 <= x0 or y1 <= y0:
        return img
    return img[y0:y1, x0:x1]


class PlantDataset(Dataset):
    def __init__(self, df, transform=None, label2idx=None):
        self.df = df.reset_index(drop=True)
        self.transform = transform

        if label2idx is None:
            labels = sorted(self.df["label"].unique())
            self.label2idx = {l: i for i, l in enumerate(labels)}
        else:
            self.label2idx = label2idx

        self.idx2label = {v: k for k, v in self.label2idx.items()}

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        img = cv2.imread(row["filepath"])
        if img is None:
            raise FileNotFoundError(row["filepath"])

        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

        # Class-conditional cleanup
        lname = row["label"].lower()
        if "cucumber" in lname or "corn_common_rust" in lname:
            img = strip_black_bars_rgb(img)

        if self.transform:
            img = self.transform(image=img)["image"]

        return img, self.label2idx[row["label"]]


### 3. **Domain-Realistic Augmentations (Very Important)**
ðŸ“Œ Ideology behind these augmentations

Did not use random extreme transforms.

Instead, simulated:

* Camera variation

*  Lighting variation

* Compression artifacts

* Minor blur / motion

Augmentations

In [None]:
IM_SIZE = 320
IM_MEAN = [0.485, 0.456, 0.406]
IM_STD  = [0.229, 0.224, 0.225]

def get_train_transform():
    return A.Compose([
        A.HorizontalFlip(p=0.5),
        A.VerticalFlip(p=0.5),
        A.RandomRotate90(p=0.5),

        A.RandomResizedCrop(
            height=IM_SIZE, width=IM_SIZE,
            scale=(0.8, 1.0), ratio=(0.8, 1.25), p=1.0
        ),

        A.ColorJitter(0.2, 0.2, 0.2, 0.05, p=0.5),
        A.OneOf([
            A.GaussianBlur(blur_limit=(3,5)),
            A.MotionBlur(blur_limit=5),
        ], p=0.25),

        A.GaussNoise(var_limit=(5,25), p=0.25),
        A.RandomGamma((80,120), p=0.3),
        A.ImageCompression(40, 90, p=0.3),

        A.Normalize(IM_MEAN, IM_STD),
        ToTensorV2()
    ])

def get_val_transform():
    return A.Compose([
        A.LongestMaxSize(IM_SIZE),
        A.PadIfNeeded(IM_SIZE, IM_SIZE, border_mode=cv2.BORDER_CONSTANT),
        A.CenterCrop(IM_SIZE, IM_SIZE),
        A.Normalize(IM_MEAN, IM_STD),
        ToTensorV2()
    ])


Loss & Model

In [None]:
class LabelSmoothingCE(nn.Module):
    def __init__(self, eps=0.1):
        super().__init__()
        self.eps = eps

    def forward(self, logits, target):
        n = logits.size(1)
        log_probs = F.log_softmax(logits, dim=1)

        with torch.no_grad():
            smooth = torch.full_like(log_probs, self.eps / (n - 1))
            smooth.scatter_(1, target.unsqueeze(1), 1 - self.eps)

        return torch.mean(torch.sum(-smooth * log_probs, dim=1))


def create_model(num_classes):
    return timm.create_model(
        "efficientnet_b3",
        pretrained=True,
        num_classes=num_classes
    )


### 4. **Training & Validation Engine (AMP + Clipping)**

In [None]:
def train_one_epoch(model, loader, optimizer, scaler, criterion, scheduler=None):
    model.train()
    total_loss = 0

    for x, y in tqdm(loader, leave=False):
        x, y = x.to(DEVICE), y.to(DEVICE)
        optimizer.zero_grad(set_to_none=True)

        with autocast(enabled=DEVICE.type == "cuda"):
            logits = model(x)
            loss = criterion(logits, y)

        scaler.scale(loss).backward()
        scaler.unscale_(optimizer)
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)

        scaler.step(optimizer)
        scaler.update()

        if scheduler:
            scheduler.step()

        total_loss += loss.item() * x.size(0)

    return total_loss / len(loader.dataset)


@torch.no_grad()
def validate(model, loader, criterion):
    model.eval()
    total_loss, preds, labels, logits_all = 0, [], [], []

    for x, y in tqdm(loader, leave=False):
        x, y = x.to(DEVICE), y.to(DEVICE)
        with autocast(enabled=DEVICE.type == "cuda"):
            logits = model(x)
            loss = criterion(logits, y)

        total_loss += loss.item() * x.size(0)
        preds.append(logits.argmax(1).cpu().numpy())
        labels.append(y.cpu().numpy())
        logits_all.append(logits.cpu())

    preds = np.concatenate(preds)
    labels = np.concatenate(labels)
    logits_all = torch.cat(logits_all)

    probs = F.softmax(logits_all, dim=1).numpy()

    return {
        "loss": total_loss / len(loader.dataset),
        "acc": accuracy_score(labels, preds),
        "f1": f1_score(labels, preds, average="weighted"),
        "top3": top_k_accuracy_score(labels, probs, k=3),
        "cm": confusion_matrix(labels, preds)
    }


### 5. **Stage-6 Training Strategy**

**Phase A â€“ Warm-up**

* Freeze backbone

* Train classifier head only

* Prevents destroying pretrained features

**Phase B â€“ Fine-tuning**

* Unfreeze backbone

* Discriminative LR (head > backbone)

* OneCycleLR

* Early stopping

In [None]:
def train_stage6(train_df, val_df,
                 batch_size=32,
                 warmup_epochs=2,
                 finetune_epochs=8):

    tf_train, tf_val = get_train_transform(), get_val_transform()

    train_ds = PlantDataset(train_df, tf_train)
    val_ds   = PlantDataset(val_df, tf_val, train_ds.label2idx)

    train_loader = DataLoader(train_ds, batch_size, shuffle=True, num_workers=2)
    val_loader   = DataLoader(val_ds, batch_size*2, shuffle=False, num_workers=2)

    model = create_model(len(train_ds.label2idx)).to(DEVICE)
    criterion = LabelSmoothingCE(0.1)
    scaler = GradScaler()

    # Phase A: head only
    for p in model.parameters(): p.requires_grad = False
    for p in model.classifier.parameters(): p.requires_grad = True

    optimizer = optim.AdamW(model.classifier.parameters(), lr=3e-4, weight_decay=1e-2)
    scheduler = optim.lr_scheduler.OneCycleLR(
        optimizer, max_lr=3e-4,
        total_steps=len(train_loader)*warmup_epochs
    )

    best_loss = float("inf")
    best_state = None

    print("Phase A: Head warm-up")
    for _ in range(warmup_epochs):
        train_one_epoch(model, train_loader, optimizer, scaler, criterion, scheduler)
        metrics = validate(model, val_loader, criterion)

        if metrics["loss"] < best_loss:
            best_loss = metrics["loss"]
            best_state = model.state_dict()

    # Phase B: full fine-tuning
    for p in model.parameters(): p.requires_grad = True

    optimizer = optim.AdamW([
        {"params": model.classifier.parameters(), "lr": 3e-4},
        {"params": [p for n,p in model.named_parameters() if "classifier" not in n], "lr": 1e-4}
    ], weight_decay=1e-2)

    scheduler = optim.lr_scheduler.OneCycleLR(
        optimizer, max_lr=3e-4,
        total_steps=len(train_loader)*finetune_epochs
    )

    print("Phase B: Fine-tuning")
    for _ in range(finetune_epochs):
        train_one_epoch(model, train_loader, optimizer, scaler, criterion, scheduler)
        metrics = validate(model, val_loader, criterion)

        if metrics["loss"] < best_loss:
            best_loss = metrics["loss"]
            best_state = model.state_dict()

    model.load_state_dict(best_state)
    torch.save(best_state, CKPT_DIR / "effb3_stage6.pt")

    return metrics


 # **Phase 3: Robustness & Generalization Training**
ðŸŽ¯ Goal of This Phase

After achieving strong validation performance with Stage-6 EfficientNet-B3, the goal of this phase is to:

* Reduce overfitting to clean/lab images

* Improve decision boundary smoothness

* Force the model to rely on leaf texture & disease patterns, not background cues

* Test whether performance remains stable under harder perturbations

This phase is not about chasing accuracy, but about trustworthy generalization.

Cell A â€” Stronger Domain Augmentations

To simulate:

* phone cameras

* uneven sunlight

* fog/dust

* compression artifacts

In [None]:
IM_SIZE = 320
IM_MEAN = [0.485, 0.456, 0.406]
IM_STD  = [0.229, 0.224, 0.225]

def get_train_transform_stronger():
    return A.Compose([
        A.HorizontalFlip(p=0.5),
        A.VerticalFlip(p=0.5),
        A.RandomRotate90(p=0.5),

        A.RandomResizedCrop(
            height=IM_SIZE, width=IM_SIZE,
            scale=(0.8, 1.0), ratio=(0.8, 1.25), p=1.0
        ),

        A.ColorJitter(0.25, 0.25, 0.25, 0.07, p=0.6),
        A.OneOf([
            A.GaussianBlur(blur_limit=(3,7)),
            A.MotionBlur(blur_limit=7),
        ], p=0.35),

        A.GaussNoise(var_limit=(5,35), p=0.35),
        A.RandomShadow(p=0.3),
        A.RandomFog(0.1, 0.3, alpha_coef=0.08, p=0.2),

        A.RandomGamma((70,130), p=0.5),
        A.RandomBrightnessContrast(0.25, 0.25, p=0.5),
        A.ImageCompression(25, 90, p=0.6),

        A.Normalize(IM_MEAN, IM_STD),
        ToTensorV2(),
    ])


Cell B â€” Hardened Augmentations (Bias Breaking)

In [None]:
def get_train_transform_hardened():
    return A.Compose([
        A.HorizontalFlip(p=0.5),
        A.VerticalFlip(p=0.2),
        A.RandomRotate90(p=0.4),

        # Leaf-focused crops
        A.OneOf([
            A.RandomResizedCrop(IM_SIZE, IM_SIZE, scale=(0.85,1.0), ratio=(0.9,1.1)),
            A.RandomResizedCrop(IM_SIZE, IM_SIZE, scale=(0.8,1.0),  ratio=(0.8,1.25)),
        ], p=1.0),

        A.ColorJitter(0.25,0.25,0.25,0.07,p=0.6),
        A.OneOf([
            A.GaussianBlur((3,7)),
            A.MotionBlur(7),
        ], p=0.35),

        A.GaussNoise((5,35), p=0.3),

        # Break black-border shortcut learning
        A.OneOf([
            A.Compose([
                A.SmallestMaxSize(IM_SIZE),
                A.PadIfNeeded(IM_SIZE, IM_SIZE, border_mode=cv2.BORDER_CONSTANT, value=(128,128,128)),
                A.CenterCrop(IM_SIZE, IM_SIZE),
            ]),
            A.NoOp(),
        ], p=0.2),

        A.RandomGamma((70,130), p=0.5),
        A.RandomBrightnessContrast(0.25,0.25,p=0.5),
        A.ImageCompression(25,90,p=0.5),

        # Explicit background reliance reduction
        A.CoarseDropout(
            max_holes=6,
            max_height=IM_SIZE//8,
            max_width=IM_SIZE//8,
            fill_value=0,
            p=0.2
        ),

        A.Normalize(IM_MEAN, IM_STD),
        ToTensorV2(),
    ])


 MixUp & CutMix (Sample-Level Regularization)

* Stage-6 already learned strong features

* MixUp/CutMix now:

    * smooth class boundaries

    * reduce memorization

    * improve minority class behavior

Cell C â€” MixUp / CutMix Utilities

In [None]:
def sample_lambda(alpha):
    return float(np.random.beta(alpha, alpha)) if alpha > 0 else 1.0

def mixup(x, y, lam):
    idx = torch.randperm(x.size(0), device=x.device)
    return lam*x + (1-lam)*x[idx], y, y[idx], lam

def cutmix(x, y, lam):
    B, C, H, W = x.shape
    idx = torch.randperm(B, device=x.device)

    rw, rh = int(W*np.sqrt(1-lam)), int(H*np.sqrt(1-lam))
    cx, cy = np.random.randint(W), np.random.randint(H)

    x1, y1 = max(cx-rw//2,0), max(cy-rh//2,0)
    x2, y2 = min(cx+rw//2,W), min(cy+rh//2,H)

    x[:, :, y1:y2, x1:x2] = x[idx, :, y1:y2, x1:x2]
    lam = 1 - ((x2-x1)*(y2-y1))/(W*H)
    return x, y, y[idx], lam


Training Loop with MixUp / CutMix

In [None]:
def train_one_epoch_mix(
    model, loader, optimizer, scaler, device,
    mixup_alpha=0.2, cutmix_alpha=0.2, ls_eps=0.1
):
    model.train()
    total = 0.0

    for x, y in tqdm(loader, leave=False):
        x, y = x.to(device), y.to(device)

        use_mix = mixup_alpha > 0 and np.random.rand() < 0.5
        lam = sample_lambda(mixup_alpha if use_mix else cutmix_alpha)

        if use_mix:
            x, ya, yb, lam = mixup(x, y, lam)
        else:
            x, ya, yb, lam = cutmix(x, y, lam)

        optimizer.zero_grad(set_to_none=True)

        with torch.amp.autocast(device_type="cuda", enabled=device.type=="cuda"):
            logits = model(x)
            loss = _soft_target_ce(logits, ya, yb, lam, eps=ls_eps)

        scaler.scale(loss).backward()
        scaler.unscale_(optimizer)
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)

        scaler.step(optimizer)
        scaler.update()

        total += loss.item() * x.size(0)

    return total / len(loader.dataset)


Final Fine-Tuning with Early Stopping

In [None]:
def finetune_with_early_stopping(
    train_df, val_df,
    resume_ckpt,
    save_name,
    max_epochs=15,
    patience=3,
    batch_size=32,
    mixup_alpha=0.3,
    cutmix_alpha=0.3
):
    tf_train = get_train_transform_stronger()
    tf_val   = get_val_transform()

    tmp = PlantDataset(train_df, tf_val)
    label2idx = tmp.label2idx

    train_ds = PlantDataset(train_df, tf_train, label2idx)
    val_ds   = PlantDataset(val_df,   tf_val,   label2idx)

    train_loader = DataLoader(train_ds, batch_size, shuffle=True)
    val_loader   = DataLoader(val_ds, batch_size*2)

    model = create_efficientnet_b3(len(label2idx)).to(DEVICE)
    model.load_state_dict(torch.load(resume_ckpt)["model"])

    optimizer = torch.optim.AdamW(
        get_param_groups(model, 1.5e-4, 5e-5, 1e-2)
    )
    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=max_epochs)
    criterion = LabelSmoothingCE(0.1)
    scaler = torch.amp.GradScaler("cuda")

    best_loss, wait = float("inf"), 0
    best_state = None

    for ep in range(max_epochs):
        tr = train_one_epoch_mix(
            model, train_loader, optimizer, scaler, DEVICE,
            mixup_alpha=mixup_alpha, cutmix_alpha=cutmix_alpha
        )
        va_loss, va_acc, va_f1, va_top3, va_cm = validate(model, val_loader, criterion, DEVICE)
        scheduler.step()

        print(f"[FT {ep+1}] train {tr:.4f} | val {va_loss:.4f}")

        if va_loss < best_loss:
            best_loss, wait = va_loss, 0
            best_state = {"model": model.state_dict(), "label2idx": label2idx}
        else:
            wait += 1
            if wait >= patience:
                print("Early stopping.")
                break

    out = CKPT_DIR / save_name
    torch.save(best_state, out)
    return out
