### import librarires

In [4]:
import os
import argparse
from pathlib import Path
from typing import Tuple

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms, datasets
from PIL import Image
import numpy as np
from sklearn.metrics import cohen_kappa_score, confusion_matrix, accuracy_score
from tqdm import tqdm

In [6]:
# model

class ConvBlock(nn.Module):
    def __init__(self, in_ch, out_ch, kernel=3, stride=1, padding=1):
        super().__init__()
        self.conv = nn.Conv2d(in_ch, out_ch, kernel, stride, padding, bias=False)
        self.bn = nn.BatchNorm2d(out_ch)
        self.act = nn.ReLU(inplace=True)

    def forward(self, x):
        return self.act(self.bn(self.conv(x)))

In [7]:
class ProposedNet(nn.Module):
    def __init__(self, num_classes=2, in_channels=3):
        super().__init__()
        # Stem
        self.stem = nn.Sequential(
            ConvBlock(in_channels, 32),  # 1
            ConvBlock(32, 32),           # 2
            nn.MaxPool2d(2)              # /2
        )

        # Main branch stacks
        self.main = nn.Sequential(
            ConvBlock(32, 64),           # 3
            ConvBlock(64, 64),           # 4
            nn.MaxPool2d(2),             # /4

            ConvBlock(64, 128),          # 5
            ConvBlock(128, 128),         # 6
            nn.MaxPool2d(2),             # /8

            ConvBlock(128, 256),         # 7
            ConvBlock(256, 256),         # 8
            # we'll merge with parallel branch here
        )

        # Parallel branch (shallower)
        self.parallel = nn.Sequential(
            ConvBlock(32, 48),           # p1
            nn.MaxPool2d(2),             # /4
            ConvBlock(48, 96),           # p2
            nn.MaxPool2d(2),             # /8
        )

        # After concatenation
        self.post_merge = nn.Sequential(
            ConvBlock(256 + 96, 256),    # 9
            ConvBlock(256, 256),         # 10
            nn.AdaptiveAvgPool2d((1, 1))
        )

        # classifier head: keep small to be lightweight
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(256, 128, bias=True),
            nn.ReLU(inplace=True),
            nn.Dropout(0.4),
            nn.Linear(128, num_classes)
        )

    def forward(self, x):
        x0 = self.stem(x)
        m = self.main(x0)
        p = self.parallel(x0)
        # resize if shapes mismatch (they should match spatial dims)
        if m.shape[2:] != p.shape[2:]:
            # adaptive pool to match
            p = F.adaptive_avg_pool2d(p, m.shape[2:])
        merged = torch.cat([m, p], dim=1)
        out = self.post_merge(merged)
        out = self.classifier(out)
        return out

In [8]:
# utilities
def count_parameters(model: nn.Module) -> int:
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

In [9]:
def get_transforms(stage: int, size: int = 227):
    common = [
        transforms.Resize((size, size)),
    ]

    # augmentation used during training
    train_aug = transforms.Compose([
        transforms.RandomResizedCrop(size, scale=(0.85, 1.15), ratio=(0.9, 1.1)),
        transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(20),
        transforms.RandomAffine(degrees=0, translate=(0.1, 0.1), scale=(0.9, 1.1)),
        transforms.ToTensor(),
        # Paper uses zero-centered normalization. We'll apply 0-centered ~ map to [-1,1]
        transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5])
    ])

    val_aug = transforms.Compose([
        transforms.Resize((size, size)),
        transforms.ToTensor(),
        transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5])
    ])

    return train_aug, val_aug

In [10]:
class CSVDataset(Dataset):
    def __init__(self, csv_file, root_dir, transform=None, loader=lambda x: Image.open(x).convert('RGB')):
        import pandas as pd
        self.df = pd.read_csv(csv_file)
        self.root = Path(root_dir)
        self.transform = transform
        self.loader = loader

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        path = self.root / row['image']
        img = self.loader(path)
        label = int(row['label'])
        if self.transform:
            img = self.transform(img)
        return img, label

In [11]:
def train_epoch(model, loader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    all_preds = []
    all_labels = []
    pbar = tqdm(loader, leave=False)
    for x, y in pbar:
        x = x.to(device)
        y = y.to(device)
        optimizer.zero_grad()
        out = model(x)
        loss = criterion(out, y)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * x.size(0)
        preds = torch.argmax(out.detach(), dim=1).cpu().numpy()
        all_preds.extend(preds.tolist())
        all_labels.extend(y.cpu().numpy().tolist())
        pbar.set_description(f"loss: {loss.item():.4f}")

    epoch_loss = running_loss / len(loader.dataset)
    epoch_acc = accuracy_score(all_labels, all_preds)
    return epoch_loss, epoch_acc

In [12]:
def eval_epoch(model, loader, criterion, device):
    model.eval()
    running_loss = 0.0
    all_preds = []
    all_labels = []
    with torch.no_grad():
        for x, y in loader:
            x = x.to(device)
            y = y.to(device)
            out = model(x)
            loss = criterion(out, y)
            running_loss += loss.item() * x.size(0)
            preds = torch.argmax(out, dim=1).cpu().numpy()
            all_preds.extend(preds.tolist())
            all_labels.extend(y.cpu().numpy().tolist())
    epoch_loss = running_loss / len(loader.dataset)
    epoch_acc = accuracy_score(all_labels, all_preds)
    kappa = cohen_kappa_score(all_labels, all_preds)
    cm = confusion_matrix(all_labels, all_preds)
    return epoch_loss, epoch_acc, kappa, cm

In [13]:
def set_finetune_head(model: nn.Module, num_classes: int):
    # replace classifier head while keeping backbone frozen in transfer learning
    if hasattr(model, 'classifier'):
        model.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(256, 128),
            nn.ReLU(inplace=True),
            nn.Dropout(0.4),
            nn.Linear(128, num_classes)
        )
    else:
        raise RuntimeError('Model does not have classifier attribute')

In [14]:
def freeze_backbone(model: nn.Module):
    for name, p in model.named_parameters():
        if 'classifier' not in name:
            p.requires_grad = False

In [19]:
def main(args):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    train_transform, val_transform = get_transforms(stage=args.stage, size=args.size)

    # Data loaders - this script expects folder-per-class if no CSV is provided.
    if args.csv:
        train_ds = CSVDataset(args.csv, args.data_dir, transform=train_transform)
        # For simplicity reuse same csv for val in this scaffold; in practice split
        val_ds = CSVDataset(args.csv, args.data_dir, transform=val_transform)
    else:
        train_ds = datasets.ImageFolder(os.path.join(args.data_dir, 'train'), transform=train_transform)
        val_ds = datasets.ImageFolder(os.path.join(args.data_dir, 'val'), transform=val_transform)

    train_loader = DataLoader(train_ds, batch_size=args.batch_size, shuffle=True, num_workers=4, pin_memory=True)
    val_loader = DataLoader(val_ds, batch_size=args.batch_size, shuffle=False, num_workers=4, pin_memory=True)

    num_classes = 2 if args.stage == 1 else args.num_classes

    model = ProposedNet(num_classes=num_classes).to(device)
    print(f"Model params: {count_parameters(model)/1e6:.3f}M")

    # Loss and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(filter(lambda p: p.requires_grad, model.parameters()), lr=args.lr, momentum=0.9)
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=30, gamma=0.1)

    best_kappa = -1.0
    for epoch in range(args.epochs):
        print(f"Epoch {epoch+1}/{args.epochs}")
        train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, device)
        val_loss, val_acc, val_kappa, cm = eval_epoch(model, val_loader, criterion, device)
        scheduler.step()

        print(f"train loss {train_loss:.4f} acc {train_acc:.4f}")
        print(f"val   loss {val_loss:.4f} acc {val_acc:.4f} kappa {val_kappa:.4f}")
        print("confusion matrix:\n", cm)

        # checkpoint
        if val_kappa > best_kappa:
            best_kappa = val_kappa
            torch.save({'model_state': model.state_dict(), 'epoch': epoch}, args.checkpoint)
            print(f"Saved best model to {args.checkpoint}")

    print('Training finished')

In [20]:
if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--data-dir', type=str, default='./data', help='path to dataset root (train/ val folders) or root for CSV loader')
    parser.add_argument('--csv', type=str, default=None, help='optional csv file mapping images to labels')
    parser.add_argument('--stage', type=int, default=1, help='1 for DR detection (binary), 2 for severity (multi-class)')
    parser.add_argument('--num-classes', type=int, default=5, help='number of classes for severity task')
    parser.add_argument('--epochs', type=int, default=100)
    parser.add_argument('--batch-size', type=int, default=128)
    parser.add_argument('--lr', type=float, default=0.001)
    parser.add_argument('--size', type=int, default=227)
    parser.add_argument('--checkpoint', type=str, default='best_model.pt')

    # For Jupyter/IPython, allow args to be ignored
    try:
        args = parser.parse_args()
    except SystemExit:
        args = parser.parse_args([])

    main(args)

usage: ipykernel_launcher.py [-h] [--data-dir DATA_DIR] [--csv CSV]
                             [--stage STAGE] [--num-classes NUM_CLASSES]
                             [--epochs EPOCHS] [--batch-size BATCH_SIZE]
                             [--lr LR] [--size SIZE] [--checkpoint CHECKPOINT]
ipykernel_launcher.py: error: unrecognized arguments: -f /home/mahabur-alam/.local/share/jupyter/runtime/kernel-420a5f88-4c40-4ebf-9552-799ca3421b40.json


FileNotFoundError: [Errno 2] No such file or directory: './data/train'