In [None]:
# Importing the Libraries
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset, Subset
from sklearn.utils.class_weight import compute_class_weight
import numpy as np
from tqdm import tqdm
import kagglehub
from sklearn.model_selection import StratifiedShuffleSplit
from torchvision import transforms
from PIL import Image
from sklearn.metrics import f1_score, confusion_matrix, classification_report
import seaborn as sns
import matplotlib.pyplot as plt

In [None]:
# Set device
device = torch.device("mps" if torch.backends.mps.is_available() else "cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

In [None]:
# 1. Download & point to the right directory
path = kagglehub.dataset_download("nafishamoin/new-bangladeshi-crop-disease")
dataset_path = "/Users/sourinrakshit/.cache/kagglehub/datasets/nafishamoin/new-bangladeshi-crop-disease/versions/2/BangladeshiCrops/BangladeshiCrops/Crop___Disease"


In [None]:
# 2. Dataset class
class CropDiseaseDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.classes, self.class_to_idx = self._find_classes()
        self.samples = self._make_dataset()

    def _find_classes(self):
        classes = []
        for main_class in os.listdir(self.root_dir):
            main_path = os.path.join(self.root_dir, main_class)
            if not os.path.isdir(main_path):
                continue
            for subclass in os.listdir(main_path):
                subclass_path = os.path.join(main_path, subclass)
                if not os.path.isdir(subclass_path):
                    continue
                classes.append(f"{main_class}_{subclass.split('_')[-3]}_{subclass.split('_')[-2]}_{subclass.split('_')[-1]}")
        classes = sorted(set(classes))
        return classes, {c: i for i, c in enumerate(classes)}

    def _make_dataset(self):
        samples = []
        for main_class in os.listdir(self.root_dir):
            main_path = os.path.join(self.root_dir, main_class)
            if not os.path.isdir(main_path):
                continue
            for subclass in os.listdir(main_path):
                subclass_path = os.path.join(main_path, subclass)
                if not os.path.isdir(subclass_path):
                    continue
                label = self.class_to_idx[f"{main_class}_{subclass.split('_')[-3]}_{subclass.split('_')[-2]}_{subclass.split('_')[-1]}"]
                for fn in os.listdir(subclass_path):
                    file_path = os.path.join(subclass_path, fn)
                    if not os.path.isfile(file_path):
                        continue
                    samples.append((file_path, label))
        return samples

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        fn, label = self.samples[idx]
        img = Image.open(fn).convert("RGB")
        if self.transform:
            img = self.transform(img)
        return img, label

In [None]:
# 3. Transforms
train_tf = transforms.Compose([
    transforms.RandomResizedCrop(size=224, scale=(0.8, 1.0), ratio=(0.9, 1.1)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomVerticalFlip(p=0.2),
    transforms.RandomRotation(degrees=15),
    transforms.RandomAffine(degrees=0, translate=(0.1, 0.1), scale=(0.9, 1.1), shear=10),
    transforms.RandomPerspective(distortion_scale=0.2, p=0.3),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.3, hue=0.1),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

val_test_tf = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

In [None]:
# 4. Load dataset & compute class weights
full_ds = CropDiseaseDataset(dataset_path, transform=None)
labels = [lbl for _, lbl in full_ds.samples]
num_classes = len(full_ds.classes)
cw = compute_class_weight("balanced", classes=np.arange(num_classes), y=labels)
class_weights = torch.tensor(cw, dtype=torch.float)

In [None]:
# 5. Stratified split
sss1 = StratifiedShuffleSplit(n_splits=1, test_size=0.2, random_state=42)
train_val_idx, test_idx = next(sss1.split(np.zeros(len(labels)), labels))

train_val_labels = [labels[i] for i in train_val_idx]
sss2 = StratifiedShuffleSplit(n_splits=1, test_size=0.2, random_state=42)
train_idx, val_idx = next(sss2.split(np.zeros(len(train_val_labels)), train_val_labels))

train_idx = [train_val_idx[i] for i in train_idx]
val_idx = [train_val_idx[i] for i in val_idx]

train_ds = Subset(full_ds, train_idx)
train_ds.dataset.transform = train_tf

val_ds = Subset(full_ds, val_idx)
val_ds.dataset.transform = val_test_tf

test_ds = Subset(full_ds, test_idx)
test_ds.dataset.transform = val_test_tf

train_loader = DataLoader(train_ds, batch_size=32, shuffle=True, num_workers=0)
val_loader   = DataLoader(val_ds, batch_size=32, shuffle=False, num_workers=0)
test_loader  = DataLoader(test_ds, batch_size=32, shuffle=False, num_workers=0)

In [None]:
# 6. Model
class BasicCNN(nn.Module):
    def __init__(self, num_classes):
        super().__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(3,32,3,padding=1), nn.ReLU(), nn.MaxPool2d(2),
            nn.Conv2d(32,64,3,padding=1), nn.ReLU(), nn.MaxPool2d(2),
            nn.Conv2d(64,128,3,padding=1), nn.ReLU(), nn.MaxPool2d(2),
        )
        self.fc = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(128*28*28,512), nn.ReLU(), nn.Dropout(0.5),
            nn.Linear(512,num_classes)
        )
    def forward(self,x):
        x = self.conv(x)
        x = x.view(x.size(0),-1)
        return self.fc(x)


model = BasicCNN(num_classes).to(device)

In [None]:
# 7. Loss, optimizer
criterion = nn.CrossEntropyLoss(weight=class_weights.to(device))
optimizer = optim.Adam(model.parameters(), lr=1e-3)