In [None]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("sriramr/fruits-fresh-and-rotten-for-classification")

print("Path to dataset files:", path)

Using Colab cache for faster access to the 'fruits-fresh-and-rotten-for-classification' dataset.
Path to dataset files: /kaggle/input/fruits-fresh-and-rotten-for-classification


In [None]:
import torch
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
import os

root_dir = os.path.join(path, "dataset")
train_dir = os.path.join(root_dir, "train")
test_dir = os.path.join(root_dir, "test")

In [None]:
import os
from PIL import Image

path = "/root/.cache/kagglehub/datasets/sriramr/fruits-fresh-and-rotten-for-classification/versions/1/dataset/train/freshapples"
for i, f in enumerate(os.listdir(path)[:5]):
    img = Image.open(os.path.join(path, f))
    print(img.size)

(390, 418)
(438, 372)
(426, 422)
(390, 360)
(336, 374)


In [None]:
transform_train = transforms.Compose([
    transforms.Resize((128, 128)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

transform_test = transforms.Compose([
    transforms.Resize((128, 128)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

train_dataset = datasets.ImageFolder(root=train_dir, transform=transform_train)
test_dataset = datasets.ImageFolder(root=test_dir, transform=transform_test)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=2, pin_memory=True, persistent_workers=True)
test_loader  = DataLoader(test_dataset,  batch_size=32, shuffle=False, num_workers=2, pin_memory=True, persistent_workers=True)

print("Classes:", train_dataset.classes)

Classes: ['freshapples', 'freshbanana', 'freshoranges', 'rottenapples', 'rottenbanana', 'rottenoranges']


In [None]:
# =========================
# Compact CNN + Training, Plots, Confusion, TP/FP
# =========================
import torch, math, time
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from dataclasses import dataclass
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
torch.backends.cudnn.benchmark = True  # speed on fixed input size (128x128)

# ---- classes from your dataset ----
classes = train_dataset.classes
num_classes = len(classes)
print("Detected classes:", classes)

# ---- Model: light but accurate ----
class ConvBlock(nn.Module):
    def __init__(self, in_ch, out_ch, k=3, s=1, p=1, drop=0.0):
        super().__init__()
        self.conv = nn.Conv2d(in_ch, out_ch, k, s, p, bias=False)
        self.bn = nn.BatchNorm2d(out_ch)
        self.act = nn.SiLU(inplace=True)
        self.drop = nn.Dropout2d(drop) if drop > 0 else nn.Identity()
    def forward(self, x):
        x = self.act(self.bn(self.conv(x)))
        x = self.drop(x)
        return x

class SmallFruitCNN(nn.Module):
    """
    ~1.3M params, fast on Colab free tier.
    """
    def __init__(self, num_classes=6, drop=0.10):
        super().__init__()
        # 3x128x128
        self.stem = ConvBlock(3, 32, k=3, s=1, p=1, drop=0.0)          # -> 32x128x128
        self.layer1 = nn.Sequential(
            nn.MaxPool2d(2),                                           # -> 32x64x64
            ConvBlock(32, 64, drop=drop),                              # -> 64x64x64
            ConvBlock(64, 64, drop=drop),                              # -> 64x64x64
        )
        self.layer2 = nn.Sequential(
            nn.MaxPool2d(2),                                           # -> 64x32x32
            ConvBlock(64, 128, drop=drop),                             # -> 128x32x32
            ConvBlock(128, 128, drop=drop),                            # -> 128x32x32
        )
        self.layer3 = nn.Sequential(
            nn.MaxPool2d(2),                                           # -> 128x16x16
            ConvBlock(128, 256, drop=drop),                            # -> 256x16x16
            ConvBlock(256, 256, drop=drop),                            # -> 256x16x16
        )
        self.head = nn.Sequential(
            nn.AdaptiveAvgPool2d(1),                                   # -> 256x1x1
            nn.Flatten(),                                              # -> 256
            nn.Dropout(0.20),
            nn.Linear(256, 256),
            nn.SiLU(inplace=True),
            nn.Dropout(0.20),
            nn.Linear(256, num_classes)
        )

    def forward(self, x):
        x = self.stem(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.head(x)
        return x

# ---- Config ----
@dataclass
class TrainConfig:
    epochs:int = 15
    lr:float = 2e-3
    weight_decay:float = 1e-4
    label_smoothing:float = 0.00   # helps generalization
    grad_clip:float = 1.0          # set 0 or None to disable
    report_every:int = 1
    save_best:bool = True

cfg = TrainConfig()

# ---- Utilities ----
@torch.no_grad()
def evaluate(model, loader, device, criterion):
    model.eval()
    total_loss, total_correct, n = 0.0, 0, 0
    all_preds, all_targets = [], []
    for x, y in loader:
        x, y = x.to(device, non_blocking=True), y.to(device, non_blocking=True)
        logits = model(x)
        loss = criterion(logits, y)
        bs = y.size(0)
        total_loss += loss.item() * bs
        preds = logits.argmax(1)
        total_correct += (preds == y).sum().item()
        n += bs
        all_preds.append(preds.detach().cpu())
        all_targets.append(y.detach().cpu())
    loss = total_loss / max(1, n)
    acc = total_correct / max(1, n)
    all_preds = torch.cat(all_preds).numpy()
    all_targets = torch.cat(all_targets).numpy()
    return loss, acc, all_preds, all_targets

def plot_history(history):
    # Loss
    plt.figure()
    plt.plot(history["train_loss"], label="train_loss")
    plt.plot(history["val_loss"], label="val_loss")
    plt.xlabel("Epoch"); plt.ylabel("Loss"); plt.title("Loss over epochs"); plt.legend(); plt.show()

    # Accuracy
    plt.figure()
    plt.plot(history["train_acc"], label="train_acc")
    plt.plot(history["val_acc"], label="val_acc")
    plt.xlabel("Epoch"); plt.ylabel("Accuracy"); plt.title("Accuracy over epochs"); plt.legend(); plt.show()

def plot_confusion(y_true, y_pred, class_names, normalize=None, title="Confusion Matrix"):
    cm = confusion_matrix(y_true, y_pred, labels=list(range(len(class_names))), normalize=normalize)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=class_names)
    plt.figure(figsize=(6,5))
    disp.plot(include_values=True, xticks_rotation=45, colorbar=True)
    plt.title(title)
    plt.show()
    return cm

def plot_tp_fp_only(cm, class_names):
    """
    Given a standard (unnormalized) confusion matrix `cm`,
    build a 2 x C matrix containing TP (diag) and FP (column sum - TP) for each class,
    then render as a small heatmap-like matrix and a bar chart.
    """
    cm = np.asarray(cm)
    col_sums = cm.sum(axis=0)  # predicted totals per class
    tp = np.diag(cm)
    fp = col_sums - tp

    tp_fp = np.vstack([tp, fp])

    # Heatmap-like display
    fig, ax = plt.subplots(figsize=(8, 2.8))
    im = ax.imshow(tp_fp, aspect="auto")
    ax.set_yticks([0, 1]); ax.set_yticklabels(["TP", "FP"])
    ax.set_xticks(np.arange(len(class_names))); ax.set_xticklabels(class_names, rotation=45, ha="right")
    for i in range(2):
        for j in range(len(class_names)):
            ax.text(j, i, int(tp_fp[i, j]), ha="center", va="center", color="w" if tp_fp[i, j] > tp_fp.max()/2 else "black")
    ax.set_title("TP/FP per class (counts)")
    fig.colorbar(im, ax=ax)
    plt.tight_layout()
    plt.show()

    # Bar plot
    x = np.arange(len(class_names))
    width = 0.35
    plt.figure(figsize=(8, 3))
    plt.bar(x - width/2, tp, width, label="TP")
    plt.bar(x + width/2, fp, width, label="FP")
    plt.xticks(x, class_names, rotation=45, ha="right")
    plt.ylabel("Count")
    plt.title("Per-class TP and FP")
    plt.legend()
    plt.tight_layout()
    plt.show()

    return tp, fp

# ---- Train ----
def train(model, train_loader, val_loader, device, cfg):
    model.to(device)
    criterion = nn.CrossEntropyLoss(label_smoothing=cfg.label_smoothing)
    optimizer = optim.AdamW(model.parameters(), lr=cfg.lr, weight_decay=cfg.weight_decay)
    # Lightweight cosine schedule
    total_steps = cfg.epochs * len(train_loader)
    scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=total_steps)

    scaler = torch.amp.GradScaler(enabled=(device.type == "cuda"))

    history = {"train_loss":[], "val_loss":[], "train_acc":[], "val_acc":[], "lr":[]}
    best_acc, best_state = 0.0, None

    global_step = 0
    for epoch in range(1, cfg.epochs+1):
        model.train()
        run_loss, run_correct, n = 0.0, 0, 0

        for xb, yb in train_loader:
            xb, yb = xb.to(device, non_blocking=True), yb.to(device, non_blocking=True)

            optimizer.zero_grad(set_to_none=True)
            with torch.amp.autocast(device_type=device.type, dtype=torch.float16, enabled=(device.type == "cuda")):
                logits = model(xb)
                loss = criterion(logits, yb)

            scaler.scale(loss).backward()
            if cfg.grad_clip and cfg.grad_clip > 0:
                scaler.unscale_(optimizer)
                nn.utils.clip_grad_norm_(model.parameters(), cfg.grad_clip)
            scaler.step(optimizer)
            scaler.update()

            scheduler.step()
            global_step += 1

            bs = yb.size(0)
            run_loss += loss.item() * bs
            run_correct += (logits.argmax(1) == yb).sum().item()
            n += bs
            history["lr"].append(optimizer.param_groups[0]["lr"])

        train_loss = run_loss / max(1, n)
        train_acc = run_correct / max(1, n)

        val_loss, val_acc, _, _ = evaluate(model, val_loader, device, criterion)

        history["train_loss"].append(train_loss)
        history["val_loss"].append(val_loss)
        history["train_acc"].append(train_acc)
        history["val_acc"].append(val_acc)

        with torch.no_grad():
          model.eval()
          train_eval_loss, train_eval_acc, _, _ = evaluate(model, train_loader, device, criterion)

        if epoch % cfg.report_every == 0:
            print(f"[Epoch {epoch:02d}] "
                  f"train_mode_acc {train_acc:.3f} | eval(train)_acc {train_eval_acc:.3f} | "
                  f"test_acc {val_acc:.3f} | "
                  f"train_loss {train_loss:.3f} | eval(train)_loss {train_eval_loss:.3f} | test_loss {val_loss:.3f}")

        if cfg.save_best and val_acc > best_acc:
            best_acc = val_acc
            best_state = {k: v.cpu() for k, v in model.state_dict().items()}

    if cfg.save_best and best_state is not None:
        model.load_state_dict({k: v.to(device) for k, v in best_state.items()})
        print(f"Loaded best checkpoint (val_acc={best_acc:.4f})")

    return model, history

Detected classes: ['freshapples', 'freshbanana', 'freshoranges', 'rottenapples', 'rottenbanana', 'rottenoranges']


In [None]:
# ---- Run training ----
model = SmallFruitCNN(num_classes=num_classes, drop=0.10)
model, history = train(model, train_loader, test_loader, device, cfg)

[Epoch 01] train_mode_acc 0.752 | eval(train)_acc 0.848 | test_acc 0.850 | train_loss 0.699 | eval(train)_loss 0.409 | test_loss 0.406
[Epoch 02] train_mode_acc 0.840 | eval(train)_acc 0.855 | test_acc 0.854 | train_loss 0.455 | eval(train)_loss 0.422 | test_loss 0.423
[Epoch 03] train_mode_acc 0.864 | eval(train)_acc 0.912 | test_acc 0.914 | train_loss 0.395 | eval(train)_loss 0.261 | test_loss 0.254
[Epoch 04] train_mode_acc 0.884 | eval(train)_acc 0.906 | test_acc 0.909 | train_loss 0.339 | eval(train)_loss 0.258 | test_loss 0.256
[Epoch 05] train_mode_acc 0.894 | eval(train)_acc 0.926 | test_acc 0.924 | train_loss 0.309 | eval(train)_loss 0.213 | test_loss 0.227
[Epoch 06] train_mode_acc 0.902 | eval(train)_acc 0.944 | test_acc 0.946 | train_loss 0.279 | eval(train)_loss 0.153 | test_loss 0.151


In [None]:
# ---- Plots: loss/acc ----
plot_history(history)

In [None]:
# ---- Confusion matrices ----
criterion_eval = nn.CrossEntropyLoss()  # for eval only
test_loss, test_acc, preds, targets = evaluate(model, test_loader, device, criterion_eval)
print(f"Final test: loss={test_loss:.4f}, acc={test_acc:.4f}")

# 1) Raw confusion matrix (counts)
cm_counts = plot_confusion(targets, preds, classes, normalize=None, title="Confusion Matrix (counts)")

# 2) Normalized by true labels (optional, easier to read recall per class)
_ = plot_confusion(targets, preds, classes, normalize='true', title="Confusion Matrix (normalized by true)")

# 3) TP/FP-only "confusion"
tp, fp = plot_tp_fp_only(cm_counts, classes)
for name, tpi, fpi in zip(classes, tp, fp):
    print(f"{name:>15}: TP={int(tpi):4d}  FP={int(fpi):4d}")
