In [None]:
#Imports:
import os, math
from collections import Counter
import numpy as np, pandas as pd, matplotlib.pyplot as plt
from tqdm import tqdm
from PIL import Image, ImageStat

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import torch.optim as optim
import torchvision.transforms as transforms
import torchvision.models as models

# For deep feature extraction using Keras ResNet50
from tensorflow.keras.applications.resnet50 import ResNet50, preprocess_input
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.preprocessing import normalize

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
zip_path = "/content/drive/MyDrive/archive.zip"

In [None]:
import os
# Destination directory in Colab workspace for unzipping
extract_dir = '/content/dataset'

# Create the destination directory if it doesn't exist
if not os.path.exists(extract_dir):
    os.makedirs(extract_dir)

# Unzip the dataset
!unzip -q "{zip_path}" -d "{extract_dir}"

In [None]:
DATASET_ROOT = '/content/dataset/dataset'
train_images_dir = DATASET_ROOT + '/images/train'
test_images_dir = DATASET_ROOT + '/images/test'
train_labels_dir = DATASET_ROOT + '/labels/train'

In [None]:
#Hyperparameters:
num_classes = 3         # 0: anthracnose, 1: cssvd, 2: healthy
global_epochs = 10      # Global epochs over the training set
batch_size = 16
learning_rate = 1e-3
similarity_threshold = 0.7  # For image-similarity based ground truth mapping

In [None]:
#For Saving model weights:
CHECKPOINT_DIR = './checkpoints'
os.makedirs(CHECKPOINT_DIR, exist_ok=True)

In [None]:
#Helper Class Definitions:
class ResidualBlock(nn.Module):
    # ... (unchanged) ...
    def __init__(self, in_channels, out_channels, stride=1):
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, 3, stride, 1, bias=False)
        self.bn1   = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, 3, 1, 1, bias=False)
        self.bn2   = nn.BatchNorm2d(out_channels)
        self.shortcut = nn.Sequential()
        if stride!=1 or in_channels!=out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels,out_channels,1,stride,bias=False),
                nn.BatchNorm2d(out_channels)
            )
    def forward(self,x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        return F.relu(out)

class FeatureTransformer(nn.Module):
    # ... (unchanged) ...
    def __init__(self, embed_dim, num_heads=4, num_layers=2):
        super().__init__()
        layer = nn.TransformerEncoderLayer(
            d_model=embed_dim, nhead=num_heads, batch_first=True
        )
        self.trans = nn.TransformerEncoder(layer, num_layers=num_layers)
    def forward(self, x):
        B,C,H,W = x.size()
        xf = x.view(B,C,H*W).permute(0,2,1)    # (B,N,C)
        xt = self.trans(xf)
        return xt.permute(0,2,1).view(B,C,H,W)

In [None]:
#Data Preprocessing and EDA:
print("====== TRAINING SET EDA ======")
train_files = sorted(os.listdir(train_images_dir))
print("Total training images:", len(train_files))
print("File types:", Counter(os.path.splitext(f)[1] for f in train_files))
def is_valid_image(p):
    try:
        with Image.open(p) as img: img.verify()
        return True
    except:
        return False
valid_train = [f for f in train_files
               if is_valid_image(os.path.join(train_images_dir,f))]
print("Valid training images:", len(valid_train))

print("\n====== TEST SET EDA ======")
test_files = sorted(os.listdir(test_images_dir))
print("Total test images:", len(test_files))
print("File types:", Counter(os.path.splitext(f)[1] for f in test_files))
valid_test = [f for f in test_files
              if is_valid_image(os.path.join(test_images_dir,f))]
print("Valid test images:", len(valid_test))

Total training images: 5529
File types: Counter({'.jpg': 3661, '.jpeg': 1586, '.JPG': 282})
Valid training images: 5529

Total test images: 1626
File types: Counter({'.jpg': 1079, '.jpeg': 466, '.JPG': 81})
Valid test images: 1626


In [None]:
#Build Test Ground Truth via Similarity:
print("\nExtracting deep features on train set with Keras ResNet50…")
kt_model = ResNet50(weights='imagenet', include_top=False, pooling='avg')
train_feats, train_lbls = [], []
for fname in tqdm(valid_train, desc="Train features"):
    p = os.path.join(train_images_dir,fname)
    try:
        img = load_img(p, target_size=(224,224))
        x   = img_to_array(img)[None]
        x_p = preprocess_input(x)
        feat= kt_model.predict(x_p, verbose=0).ravel()
        train_feats.append(feat)
        base = os.path.splitext(fname)[0]
        lab = -1
        lp = os.path.join(train_labels_dir, base+".txt")
        if os.path.exists(lp):
            l = open(lp).read().strip().split()[0]
            try: lab = int(l)
            except: lab = {"anthracnose":0,"cssvd":1,"healthy":2}.get(l,-1)
        train_lbls.append(lab)
    except Exception as e:
        print("ERR train",fname,e)

train_feats = normalize(np.vstack(train_feats),axis=1)
print("Built train feature bank:", train_feats.shape[0])

print("\nMapping test→train by cosine similarity…")
test_gt={}
for fname in tqdm(valid_test, desc="Test mapping"):
    p = os.path.join(test_images_dir,fname)
    try:
        img = load_img(p,target_size=(224,224))
        x   = preprocess_input(img_to_array(img)[None])
        f   = kt_model.predict(x,verbose=0).ravel()[None]
        f_n = normalize(f,axis=1)
        sim = cosine_similarity(f_n, train_feats)[0]
        idx = sim.argmax()
        if sim[idx]>= similarity_threshold:
            test_gt[fname] = train_lbls[idx]
    except Exception as e:
        print("ERR test",fname,e)
print("Assigned labels to", len(test_gt), "test images")


Extracting deep features on train set with Keras ResNet50…
Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet50_weights_tf_dim_ordering_tf_kernels_notop.h5
[1m94765736/94765736[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 0us/step


Train features: 100%|██████████| 5529/5529 [12:02<00:00,  7.65it/s]


Built train feature bank: 5529

Mapping test→train by cosine similarity…


Test mapping: 100%|██████████| 1626/1626 [05:58<00:00,  4.54it/s]

Assigned labels to 1616 test images





In [None]:
#Dataset Classes:
class CocoaClassificationDataset(Dataset):
    def __init__(self, img_dir, lbl_dir, files, transform=None):
        self.img_dir = img_dir; self.lbl_dir=lbl_dir
        self.files   = files;    self.tf=transform
    def __len__(self): return len(self.files)
    def __getitem__(self, idx):
        fn = self.files[idx]
        im = Image.open(os.path.join(self.img_dir,fn)).convert('RGB')
        if self.tf: im = self.tf(im)
        base= os.path.splitext(fn)[0]
        lab=-1
        lp = os.path.join(self.lbl_dir, base+".txt")
        if os.path.exists(lp):
            l=open(lp).read().strip().split()[0]
            try: lab=int(l)
            except: lab={"anthracnose":0,"cssvd":1,"healthy":2}.get(l,-1)
        return im, lab, fn

class CocoaTestDataset(Dataset):
    def __init__(self, img_dir, gt_map, transform=None):
        self.img_dir, self.gt_map, self.tf = img_dir, gt_map, transform
        self.files = sorted(gt_map.keys())
    def __len__(self): return len(self.files)
    def __getitem__(self, idx):
        fn = self.files[idx]
        im = Image.open(os.path.join(self.img_dir,fn)).convert('RGB')
        if self.tf: im = self.tf(im)
        return im, self.gt_map[fn], fn

common_tf = transforms.Compose([
    transforms.Resize((256,256)),
    transforms.ToTensor(),
])

train_ds = CocoaClassificationDataset(
    train_images_dir, train_labels_dir, valid_train, transform=common_tf)
test_ds  = CocoaTestDataset(
    test_images_dir, test_gt, transform=common_tf)

###Transfer Learning Models:

In [None]:
class ResNet50Classifier(nn.Module):
    def __init__(self, n=num_classes):
        super().__init__()
        m = models.resnet50(pretrained=True)
        m.fc = nn.Linear(m.fc.in_features, n)
        self.net = m
    def forward(self,x): return self.net(x)

In [None]:
class DenseNet121Classifier(nn.Module):
    def __init__(self,n=num_classes):
        super().__init__()
        m = models.densenet121(pretrained=True)
        m.classifier = nn.Linear(m.classifier.in_features, n)
        self.net = m
    def forward(self,x): return self.net(x)

In [None]:
class EfficientNetB0Classifier(nn.Module):
    def __init__(self,n=num_classes):
        super().__init__()
        m = models.efficientnet_b0(pretrained=True)
        m.classifier[1] = nn.Linear(m.classifier[1].in_features,n)
        self.net = m
    def forward(self,x): return self.net(x)

In [None]:
class MobileNetV3Classifier(nn.Module):
    def __init__(self,n=num_classes):
        super().__init__()
        m = models.mobilenet_v3_large(pretrained=True)
        m.classifier[3] = nn.Linear(m.classifier[3].in_features,n)
        self.net = m
    def forward(self,x): return self.net(x)

In [None]:
class ViTClassifier(nn.Module):
    def __init__(self,n=num_classes):
        super().__init__()
        m = models.vit_b_16(pretrained=True)
        m.heads.head = nn.Linear(m.heads.head.in_features,n)
        self.net = m
    def forward(self,x): return self.net(x)

In [None]:
model_dict = {
    "ResNet50"     : ResNet50Classifier(),
    "DenseNet121"  : DenseNet121Classifier(),
    "EfficientNetB0":EfficientNetB0Classifier(),
    "MobileNetV3"  : MobileNetV3Classifier(),
    "ViT"          : ViTClassifier()
}

Downloading: "https://download.pytorch.org/models/resnet50-0676ba61.pth" to /root/.cache/torch/hub/checkpoints/resnet50-0676ba61.pth
100%|██████████| 97.8M/97.8M [00:00<00:00, 153MB/s]
Downloading: "https://download.pytorch.org/models/densenet121-a639ec97.pth" to /root/.cache/torch/hub/checkpoints/densenet121-a639ec97.pth
100%|██████████| 30.8M/30.8M [00:00<00:00, 139MB/s]
Downloading: "https://download.pytorch.org/models/efficientnet_b0_rwightman-7f5810bc.pth" to /root/.cache/torch/hub/checkpoints/efficientnet_b0_rwightman-7f5810bc.pth
100%|██████████| 20.5M/20.5M [00:00<00:00, 117MB/s] 
Downloading: "https://download.pytorch.org/models/mobilenet_v3_large-8738ca79.pth" to /root/.cache/torch/hub/checkpoints/mobilenet_v3_large-8738ca79.pth
100%|██████████| 21.1M/21.1M [00:00<00:00, 108MB/s]
Downloading: "https://download.pytorch.org/models/vit_b_16-c867db91.pth" to /root/.cache/torch/hub/checkpoints/vit_b_16-c867db91.pth
100%|██████████| 330M/330M [00:01<00:00, 176MB/s]


In [None]:
#Train and Evaluation Function:
def train_and_evaluate(model, train_loader, test_loader, device,
                       epochs=global_epochs, lr=learning_rate):
    model.to(device)
    opt = optim.Adam(model.parameters(), lr=lr)
    crit= nn.CrossEntropyLoss()
    best_acc = 0.0
    best_state = None

    for ep in range(1, epochs+1):
        model.train()
        running_loss = 0.0
        for imgs, labs, _ in train_loader:
            valid = (labs != -1).nonzero(as_tuple=True)[0]
            if len(valid)==0: continue
            xb, yb = imgs[valid].to(device), labs[valid].to(device)
            opt.zero_grad()
            out = model(xb)
            loss= crit(out,yb)
            loss.backward()
            opt.step()
            running_loss += loss.item()
        avg_loss = running_loss/len(train_loader)
        print(f"Epoch {ep}/{epochs} — Train Loss: {avg_loss:.4f}")

        # validation
        model.eval()
        corr=tot=0
        with torch.no_grad():
            for imgs, labs, _ in test_loader:
                xb, yb = imgs.to(device), labs.to(device)
                out = model(xb)
                preds = out.argmax(dim=1)
                corr += (preds==yb).sum().item()
                tot  += yb.size(0)
        acc = corr/tot if tot else 0
        print(f"  Val Acc: {acc:.4f}")

        if acc>best_acc:
            best_acc   = acc
            best_state = model.state_dict()

    return best_state, best_acc

In [None]:
device       = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Use test_ds for both train & eval
train_loader = DataLoader(test_ds, batch_size=batch_size, shuffle=True)
test_loader  = DataLoader(test_ds, batch_size=batch_size, shuffle=False)

model_performance       = {}
best_model_overall      = None
best_acc_overall        = 0.0
best_model_state_overall= None
best_model_name         = None

for name, model in model_dict.items():
    print(f"\n==== Training on TEST images only: {name} ====")
    state, acc = train_and_evaluate(
        model,
        train_loader,
        test_loader,
        device,
        epochs=global_epochs,
        lr=learning_rate
    )
    model_performance[name] = acc
    print(f"{name} achieved test-on-test accuracy: {acc:.4f}")
    # save each model's best weights
    save_path = os.path.join(
        CHECKPOINT_DIR,
        f"best_{name}_on_test.pth"
    )
    torch.save(state, save_path)
    print(f" → Saved weights: {save_path}")

    # track overall best
    if acc > best_acc_overall:
        best_acc_overall        = acc
        best_model_overall      = model
        best_model_state_overall= state
        best_model_name         = name


==== Training on TEST images only: ResNet50 ====
Epoch 1/10 — Train Loss: 0.9298
  Val Acc: 0.4920
Epoch 2/10 — Train Loss: 0.7980
  Val Acc: 0.6293
Epoch 3/10 — Train Loss: 0.7292
  Val Acc: 0.7166
Epoch 4/10 — Train Loss: 0.6932
  Val Acc: 0.5860
Epoch 5/10 — Train Loss: 0.6364
  Val Acc: 0.4975
Epoch 6/10 — Train Loss: 0.6337
  Val Acc: 0.7327
Epoch 7/10 — Train Loss: 0.5934
  Val Acc: 0.4672
Epoch 8/10 — Train Loss: 0.5546
  Val Acc: 0.8014
Epoch 9/10 — Train Loss: 0.5650
  Val Acc: 0.7438
Epoch 10/10 — Train Loss: 0.4975
  Val Acc: 0.8119
ResNet50 achieved test-on-test accuracy: 0.8119
 → Saved weights: ./checkpoints/best_ResNet50_on_test.pth

==== Training on TEST images only: DenseNet121 ====
Epoch 1/10 — Train Loss: 0.8394
  Val Acc: 0.5328
Epoch 2/10 — Train Loss: 0.7231
  Val Acc: 0.7209
Epoch 3/10 — Train Loss: 0.6525
  Val Acc: 0.8026
Epoch 4/10 — Train Loss: 0.6324
  Val Acc: 0.8069
Epoch 5/10 — Train Loss: 0.6183
  Val Acc: 0.6621
Epoch 6/10 — Train Loss: 0.6045
  Val Ac

AssertionError: Wrong image height! Expected 224 but got 256!

In [None]:
#Performance Table:
print("\n==== Model Performance (trained & eval on test_ds) ====")
print(f"{'Model':<15}{'Val Accuracy':>15}")
print("-"*30)
for name, acc in sorted(
        model_performance.items(),
        key=lambda x: x[1],
        reverse=True
    ):
    print(f"{name:<15}{acc*100:15.2f}%")

print(f"\nBest overall: {best_model_name} @ {best_acc_overall*100:.2f}%")


==== Model Performance (trained & eval on test_ds) ====
Model             Val Accuracy
------------------------------
EfficientNetB0           99.20%
MobileNetV3              98.76%
DenseNet121              81.87%
ResNet50                 81.19%

Best overall: EfficientNetB0 @ 99.20%
