In [1]:
!pip install kagglehub
!pip freeze > requirements.txt

import os
import torch
import matplotlib.pyplot as plt
import numpy as np
from tqdm import tqdm
from pathlib import Path
from collections import Counter
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, random_split
from torch import nn
from torch.optim import AdamW
from torch.nn import CrossEntropyLoss
from transformers import ViTForImageClassification
from torch.cuda.amp import GradScaler, autocast



Device and AMP setup 

In [2]:
use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")
scaler = GradScaler() if use_cuda else None
pin_memory = use_cuda

## Dataset Setup

In [None]:

from torchvision import transforms, datasets
from torch.utils.data import DataLoader, Subset
import kagglehub


path = kagglehub.dataset_download("iarunava/cell-images-for-detecting-malaria")
print("✓ Dataset root:", path)



nested_dir = os.path.join(path, "cell_images", "cell_images")
print("Subfolders under nested_dir:", os.listdir(nested_dir))
# Expect: ['Parasitized', 'Uninfected']


transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(15),
    transforms.ColorJitter(brightness=0.2, contrast=0.2),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225]),
])



dataset = datasets.ImageFolder(root=nested_dir, transform=transform)


print("Detected classes:", dataset.classes)  # should be ['Parasitized','Uninfected']
print("Total images:", len(dataset))

# ──────────────── 4) STRATIFIED SPLIT ────────────────
def stratified_split(dataset, val_frac=0.2, test_frac=0.1, seed=42):
    from collections import defaultdict

    label_to_indices = defaultdict(list)
    for idx, (_, label) in enumerate(dataset.samples):
        label_to_indices[label].append(idx)

    train_idxs, val_idxs, test_idxs = [], [], []
    g = torch.Generator().manual_seed(seed)

    for label, indices in label_to_indices.items():
        indices = torch.tensor(indices)
        shuffled = indices[torch.randperm(len(indices), generator=g)]

        n_total = len(shuffled)
        n_test  = int(test_frac * n_total)
        n_val   = int(val_frac * n_total)
        n_train = n_total - n_val - n_test

        train_idxs.extend(shuffled[:n_train].tolist())
        val_idxs.extend(shuffled[n_train : n_train + n_val].tolist())
        test_idxs.extend(shuffled[n_train + n_val :].tolist())

    return Subset(dataset, train_idxs), Subset(dataset, val_idxs), Subset(dataset, test_idxs)

train_dataset, val_dataset, test_dataset = stratified_split(
    dataset, val_frac=0.2, test_frac=0.1, seed=42
)

print("✔️  Split sizes →",
      f"Train={len(train_dataset)}, Val={len(val_dataset)}, Test={len(test_dataset)}")

# Verify class balance in each subset (optional)
def count_labels(subset):
    counts = [0, 0]
    for _, label in subset:
        counts[label] += 1
    return counts

print("Train distribution:", count_labels(train_dataset))  # e.g. [train_parasitized, train_uninfected]
print("Val   distribution:", count_labels(val_dataset))
print("Test  distribution:", count_labels(test_dataset))

# ──────────────── 5) DATALOADERS ────────────────
batch_size = 64
num_workers = 4
pin_memory  = True

train_loader = DataLoader(train_dataset,
                          batch_size=batch_size,
                          shuffle=True,
                          num_workers=num_workers,
                          pin_memory=pin_memory)

val_loader = DataLoader(val_dataset,
                        batch_size=batch_size,
                        shuffle=False,
                        num_workers=num_workers,
                        pin_memory=pin_memory)

test_loader = DataLoader(test_dataset,
                         batch_size=batch_size,
                         shuffle=False,
                         num_workers=num_workers,
                         pin_memory=pin_memory)

print("→ Dataloaders ready!")


✓ Dataset root: /Users/eliahsand/.cache/kagglehub/datasets/iarunava/cell-images-for-detecting-malaria/versions/1
Subfolders under nested_dir: ['.DS_Store', 'Parasitized', 'Uninfected']
Detected classes: ['Parasitized', 'Uninfected']
Total images: 27558
✔️  Split sizes → Train=19294, Val=5510, Test=2754
Train distribution: [9647, 9647]
Val   distribution: [2755, 2755]
Test  distribution: [1377, 1377]
→ Dataloaders ready. 


## Model, optimizer and loss

In [None]:

from torchvision.models import densenet121, DenseNet121_Weights


weights = DenseNet121_Weights.IMAGENET1K_V1
model = densenet121(weights=weights)


in_features = model.classifier.in_features  # typically 1024
model.classifier = nn.Linear(in_features, 2)

model = model.to(device)


for param in model.parameters():
    param.requires_grad = True


optimizer = AdamW(model.parameters(), lr=5e-5)
criterion = nn.CrossEntropyLoss()



## Trainingfunction

In [14]:
def evaluate(model, loader, criterion):
    model.eval()
    total_loss = 0.0
    correct, total = 0, 0

    with torch.no_grad():
        for images, labels in loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs.logits, labels)
            total_loss += loss.item()
            _, predicted = torch.max(outputs.logits, 1)
            correct += (predicted == labels).sum().item()
            total += labels.size(0)

    return total_loss / len(loader), correct / total


## Early stopping class

In [11]:
class EarlyStopping:
    def __init__(self, patience=6, delta=0.0):
        self.patience = patience
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        self.delta = delta

    def __call__(self, val_accuracy, model):
        score = val_accuracy

        if self.best_score is None or score > self.best_score + self.delta:
            self.best_score = score
            self.counter = 0
            torch.save(model.state_dict(), "best_model.pth")
        else:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True


## Model Training

In [None]:
train_losses, val_losses, val_accuracies = [], [], []
early_stopper = EarlyStopping(patience=3)

for epoch in range(25):
    model.train()
    running_loss = 0.0
    with tqdm(total=len(train_loader), desc=f"Epoch {epoch + 1}", unit="batch") as pbar:
        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()

            if use_cuda:
                with autocast():
                    outputs = model(images)             
                    loss = criterion(outputs, labels)   
                scaler.scale(loss).backward()
                scaler.step(optimizer)
                scaler.update()
            else:
                outputs = model(images)
                loss = criterion(outputs, labels)       
                loss.backward()
                optimizer.step()

            running_loss += loss.item()
            pbar.set_postfix(loss=(running_loss / len(train_loader)))
            pbar.update(1)

    avg_train_loss = running_loss / len(train_loader)
    val_loss, val_acc = evaluate(model, val_loader, criterion)

    train_losses.append(avg_train_loss)
    val_losses.append(val_loss)
    val_accuracies.append(val_acc)

    print(
        f"Epoch {epoch + 1} - "
        f"Train Loss: {avg_train_loss:.4f}, "
        f"Val Loss: {val_loss:.4f}, "
        f"Val Acc: {val_acc*100:.2f}%"
    )

    early_stopper(val_acc, model)
    if early_stopper.early_stop:
        print("Early stopping triggered.")
        break



Epoch 1:   1%|          | 3/302 [00:48<1:20:13, 16.10s/batch, loss=0.00655]


KeyboardInterrupt: 

## Plotting results 

In [None]:
epochs = range(1, len(train_losses) + 1)
plt.figure(figsize=(12, 5))

plt.subplot(1, 2, 1)
plt.plot(epochs, train_losses, label="Train Loss")
plt.plot(epochs, val_losses, label="Validation Loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.title("Loss per Epoch")
plt.grid(True)
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(epochs, [v*100 for v in val_accuracies], label="Validation Accuracy")
plt.xlabel("Epoch")
plt.ylabel("Accuracy (%)")
plt.title("Accuracy per Epoch")
plt.grid(True)
plt.legend()

plt.tight_layout()
plt.show()

## Testevaluation with best model 

In [None]:
model.load_state_dict(torch.load("best_model.pth"))
model.eval()
correct, total = 0, 0
with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs.logits, 1)
        correct += (predicted == labels).sum().item()
        total += labels.size(0)

print(f"\nFinal Test Accuracy (best model): {correct / total * 100:.2f}%")

In [None]:
from sklearn.metrics import classification_report

true_labels = []
pred_labels = []

with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs.logits, 1)
        true_labels.extend(labels.cpu().numpy())
        pred_labels.extend(predicted.cpu().numpy())

print("\n📊 Classification Report:")
print(classification_report(true_labels, pred_labels, target_names=["Parasitized", "Uninfected"]))