Imports

In [55]:
pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121

Looking in indexes: https://download.pytorch.org/whl/cu121
Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip available: 22.3 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip


In [56]:
pip install scipy 

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip available: 22.3 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip


In [57]:
pip install pandas

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip available: 22.3 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip


In [58]:
pip install ultralytics

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip available: 22.3 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip


In [59]:
pip install seaborn

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip available: 22.3 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip


In [60]:
import torch
import torch.nn as nn
import numpy as np
from torchvision.datasets import Flowers102
from torch.utils.data import Subset, ConcatDataset
from collections import defaultdict, Counter
import random
from collections import defaultdict
import torch
from torch.utils.data import Dataset, Subset, DataLoader
from torchvision import transforms
from typing import Tuple, List, Dict, Optional, Callable
import copy

Checking what device is available

In [61]:
print("Torch:", torch.__version__)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

Torch: 2.5.1+cu121
Using device: cuda


Constants

In [62]:
DATASET_ROOT: str = "./data" 
TRAIN_RELATIVE_SIZE: float = 0.5
VALIDATION_RELATIVE_SIZE: float = 0.25
TEST_RELATIVE_SIZE: float = 0.25
YOLOV5_MODEL: str = 'yolov5s'
DATA_SPLIT_SEED_LIST: list[int] = [42, 43, 44]
EXPECTED_IMAGE_SIZE: tuple[int, int] = (224, 224)
IMAGENET_STD: list[float] = [0.229, 0.224, 0.225]
IMAGENET_MEAN: list[float] = [0.485, 0.456, 0.406]
PATIENCE_EPOCHS: int = 10
IMPROVEMENT_DELTA: float = 0.01
MAX_EPOCHS: int = 100
BATCH_SIZE: int = 32
NUM_WORKERS_FOR_DATALOADER: int = 0  # Set to 0 for Windows compatibility

Fetching the database

In [63]:
# Load the entire dataset to be processed later.
full_dataset = torch.utils.data.ConcatDataset([
    Flowers102(root=DATASET_ROOT, split="train", download=True),
    Flowers102(root=DATASET_ROOT, split="val",   download=True),
    Flowers102(root=DATASET_ROOT, split="test",  download=True),
]) 

Database information

In [64]:
all_labels = []

for single_dataset in full_dataset.datasets:  # full_dataset is ConcatDataset
    all_labels.extend(single_dataset._labels)  # Flowers102 stores labels here
counts = Counter(all_labels)

num_classes = len(counts)
total = len(all_labels)
min_c = min(counts.values())
max_c = max(counts.values())

print(f"Total samples: {total}")
print(f"Classes: {num_classes}")
print(f"Min per class: {min_c}")
print(f"Max per class: {max_c}")
print(f"Imbalance ratio (max/min): {max_c/min_c:.2f}")

Total samples: 8189
Classes: 102
Min per class: 40
Max per class: 258
Imbalance ratio (max/min): 6.45


Data Preparation

Added utility functions to properly separate data.

In [65]:
def extract_all_labels_from_concat_dataset(concat_dataset: ConcatDataset):
    """Collect labels from each underlying dataset without loading images."""
    all_labels_list = []

    for single_dataset in concat_dataset.datasets:
        if hasattr(single_dataset, "_labels"):          # Flowers102
            labels_array = np.asarray(single_dataset._labels, dtype=int)
        elif hasattr(single_dataset, "targets"):        # ImageFolder, CIFAR, etc.
            labels_array = np.asarray(single_dataset.targets, dtype=int)
        else:  # fallback (slow)
            labels_array = np.array(
                [single_dataset[i][1] for i in range(len(single_dataset))],
                dtype=int
            )

        all_labels_list.append(labels_array)

    return np.concatenate(all_labels_list)

In [66]:
def stratified_split_concat_dataset(
    concat_dataset: ConcatDataset,
    split_fractions=(TRAIN_RELATIVE_SIZE, VALIDATION_RELATIVE_SIZE, TEST_RELATIVE_SIZE),
    random_seed=42
):
    """Split ConcatDataset into stratified Subsets with same class proportions."""

    normalized_fractions = np.array(split_fractions, dtype=float)
    normalized_fractions = normalized_fractions / normalized_fractions.sum()

    all_labels = extract_all_labels_from_concat_dataset(concat_dataset)
    random_generator = np.random.default_rng(random_seed)

    # group global indices by class
    class_to_indices = defaultdict(list)
    for global_index, class_label in enumerate(all_labels):
        class_to_indices[int(class_label)].append(global_index)

    split_indices_per_subset = [[] for _ in range(len(normalized_fractions))]

    for class_indices in class_to_indices.values():
        class_indices = np.array(class_indices, dtype=int)
        random_generator.shuffle(class_indices)

        total_class_samples = len(class_indices)
        samples_per_split = np.floor(normalized_fractions * total_class_samples).astype(int)

        # distribute leftover samples
        remainder = total_class_samples - samples_per_split.sum()
        for split_id in random_generator.permutation(len(samples_per_split))[:remainder]:
            samples_per_split[split_id] += 1

        start_pointer = 0
        for split_id, count in enumerate(samples_per_split):
            split_indices_per_subset[split_id].extend(
                class_indices[start_pointer:start_pointer + count].tolist()
            )
            start_pointer += count

    # shuffle each split's indices
    for split_list in split_indices_per_subset:
        random_generator.shuffle(split_list)

    train_subset, val_subset, test_subset = [
        Subset(concat_dataset, indices) for indices in split_indices_per_subset
    ]

    return train_subset, val_subset, test_subset

Added transformations for both image rescale and data augemntation

In [67]:
yolo_train_transform = transforms.Compose([
    transforms.RandomResizedCrop(EXPECTED_IMAGE_SIZE, scale=(0.8, 1.0)),  # random crop
    transforms.RandomHorizontalFlip(p=0.5), # Random horizontal flip
    transforms.RandomAffine( # small random rotations, translations, scaling
        degrees=10, 
        translate=(0.03, 0.03),  
        scale=(0.97, 1.03)       
    ),
    transforms.ToTensor(),
    transforms.Normalize(IMAGENET_MEAN,
                         IMAGENET_STD),
])


yolo_val_test_transform = transforms.Compose([
    transforms.Resize(EXPECTED_IMAGE_SIZE),
    transforms.ToTensor(),
    transforms.Normalize(IMAGENET_MEAN,
                         IMAGENET_STD),
])

Database to properly handle transformations

In [68]:
class FlowersDataset(Dataset):
    """
    Wrap your existing dataset that returns:
      - image: PIL.Image
      - label: int (0..num_classes-1)
    """
    def __init__(self, base_dataset: Dataset, transform):
        self.base_dataset = base_dataset
        self.transform = transform

    def __len__(self):
        return len(self.base_dataset)

    def __getitem__(self, idx):
        image_pil, label = self.base_dataset[idx]
        image_pil = self.transform(image_pil)
        return image_pil, label

Creating dataloaders    

In [69]:
def create_dataloaders(
    train_dataset: Dataset,
    val_dataset: Dataset,
    test_dataset: Dataset,
    batch_size: int = BATCH_SIZE,
    num_workers: int = NUM_WORKERS_FOR_DATALOADER
) -> Tuple[DataLoader, DataLoader, DataLoader]:
    train_dataloader = DataLoader(
        train_dataset,
        batch_size=batch_size,
        shuffle=True,
        num_workers=num_workers,
        pin_memory=True
    )
    val_dataloader = DataLoader(
        val_dataset,
        batch_size=batch_size,
        shuffle=False,
        num_workers=num_workers,
        pin_memory=True
    )
    test_dataloader = DataLoader(
        test_dataset,
        batch_size=batch_size,
        shuffle=False,
        num_workers=num_workers,
        pin_memory=True
    )
    return train_dataloader, val_dataloader, test_dataloader

Create 3 sets of splits, as required in the assignment

In [70]:
def create_dataset_splits(
    full_dataset: ConcatDataset,
    random_seed: int
) -> Tuple[Dataset, Dataset, Dataset]:
    train_subset, val_subset, test_subset = stratified_split_concat_dataset(
        concat_dataset=full_dataset,
        random_seed=random_seed
    )
    train_dataset = FlowersDataset(train_subset, transform=yolo_train_transform)
    val_dataset = FlowersDataset(val_subset, transform=yolo_val_test_transform)
    test_dataset = FlowersDataset(test_subset, transform=yolo_val_test_transform)
    return train_dataset, val_dataset, test_dataset

In [71]:
loaders_sets_per_seed = []
for seed in DATA_SPLIT_SEED_LIST:
    train_dataset, val_dataset, test_dataset = create_dataset_splits(full_dataset, random_seed=seed)
    train_dataloader, val_dataloader, test_dataloader = create_dataloaders(train_dataset, val_dataset, test_dataset)
    loaders_sets_per_seed.append((train_dataloader, val_dataloader, test_dataloader))

Model training code

Early stopping logic

In [None]:
class EarlyStopper:
    def __init__(self, patience_epochs: int = PATIENCE_EPOCHS, min_delta: float = IMPROVEMENT_DELTA):
        self.patience_epochs = patience_epochs
        self.min_delta = min_delta
        self.best_value: Optional[float] = None
        self.epochs_without_improvement: int = 0
        self.best_state_dict: Optional[Dict[str, torch.Tensor]] = None

    def step(self, current_value: float, model: nn.Module) -> tuple[bool, bool]:
        """
        Returns should_stop, improved
        """
        if self.best_value is None: # Only relevant in the first call
            self.best_value = current_value
            self.best_state_dict = copy.deepcopy(model.state_dict())
            return False, True


        improved = current_value < (self.best_value - self.min_delta)

        if improved:
            self.best_value = current_value
            self.epochs_without_improvement = 0
            self.best_state_dict = copy.deepcopy(model.state_dict())
            return False, True

        self.epochs_without_improvement += 1
        should_stop = self.epochs_without_improvement >= self.patience_epochs
        return should_stop, improved

Running single training loop

In [73]:
def run_one_epoch_train(
    model: nn.Module,
    train_dataloader: DataLoader,
    loss_function: Callable[[torch.Tensor, torch.Tensor], torch.Tensor],
    optimizer: torch.optim.Optimizer,
    device: torch.device,
) -> Tuple[float, float]:
    model.train()

    total_loss_value = 0.0
    total_correct = 0
    total_samples = 0

    for batch_images, batch_targets in train_dataloader:
        batch_images = batch_images.to(device, non_blocking=True)
        batch_targets = batch_targets.to(device, non_blocking=True).long()

        optimizer.zero_grad(set_to_none=True)

        batch_logits = model(batch_images)
        # If your model returns a tuple/dict, adapt here:
        if isinstance(batch_logits, (tuple, list)):
            batch_logits = batch_logits[0]

        batch_loss = loss_function(batch_logits, batch_targets)
        batch_loss.backward()
        optimizer.step()

        batch_size = batch_targets.size(0)
        total_loss_value += batch_loss.item() * batch_size

        batch_predicted = batch_logits.argmax(dim=1)
        total_correct += (batch_predicted == batch_targets).sum().item()
        total_samples += batch_size

    average_loss = total_loss_value / max(1, total_samples)
    average_accuracy = total_correct / max(1, total_samples)
    return average_loss, average_accuracy

Running single validation loop

In [74]:
@torch.no_grad()
def run_one_epoch_validation(
    model: nn.Module,
    val_dataloader: DataLoader,
    loss_function: Callable[[torch.Tensor, torch.Tensor], torch.Tensor],
    device: torch.device,
) -> Tuple[float, float]:
    model.eval()

    total_loss_value = 0.0
    total_correct = 0
    total_samples = 0

    for batch_images, batch_targets in val_dataloader:
        batch_images = batch_images.to(device, non_blocking=True)
        batch_targets = batch_targets.to(device, non_blocking=True).long()

        batch_logits = model(batch_images)
        if isinstance(batch_logits, (tuple, list)):
            batch_logits = batch_logits[0]

        batch_loss = loss_function(batch_logits, batch_targets)

        batch_size = batch_targets.size(0)
        total_loss_value += batch_loss.item() * batch_size

        batch_predicted = batch_logits.argmax(dim=1)
        total_correct += (batch_predicted == batch_targets).sum().item()
        total_samples += batch_size

    average_loss = total_loss_value / max(1, total_samples)
    average_accuracy = total_correct / max(1, total_samples)
    return average_loss, average_accuracy

Training loop with early stopping that returns the model that acheived the best results instead of the latest.

In [None]:
def train_with_early_stopping(
    model: nn.Module,
    train_dataloader: DataLoader,
    val_dataloader: DataLoader,
    loss_function: Callable[[torch.Tensor, torch.Tensor], torch.Tensor],
    optimizer_factory: Callable[[nn.Module], torch.optim.Optimizer],
    device: torch.device,
    max_epochs: int = MAX_EPOCHS,
    improvement_delta: float = IMPROVEMENT_DELTA,
    patience_epochs: int = PATIENCE_EPOCHS
) -> nn.Module:
    epoch_loss: list[float] = list()
    epoch_acc: list[float] = list()
    epoch_val_loss: list[float] = list()
    epoch_val_acc: list[float] = list()
    optimizer = optimizer_factory(model)
    early_stopper = EarlyStopper(patience_epochs=patience_epochs, min_delta=improvement_delta)
    for epoch_index in range(1, max_epochs + 1):
        train_loss, train_accuracy = run_one_epoch_train(
            model=model,
            train_dataloader=train_dataloader,
            loss_function=loss_function,
            optimizer=optimizer,
            device=device,
        )
        val_loss, val_accuracy = run_one_epoch_validation(
            model=model,
            val_dataloader=val_dataloader,
            loss_function=loss_function,
            device=device,
        )
        epoch_loss.append(train_loss)
        epoch_acc.append(train_accuracy)
        epoch_val_loss.append(val_loss)
        epoch_val_acc.append(val_accuracy)
        print(
            f"Epoch {epoch_index:03d} | "
            f"train_loss={train_loss:.6f}, train_acc={train_accuracy:.4f} | "
            f"val_loss={val_loss:.6f}, val_acc={val_accuracy:.4f}"
        )

        should_stop, improved = early_stopper.step(current_value=val_loss, model=model)
        if not improved:
            print(
                f"No improvement in val_loss for {early_stopper.epochs_without_improvement} "
                f"out of {patience_epochs} allowed epochs."
            )
        if should_stop:
            print(
                f"Early stopping: no val_loss improvement >= {improvement_delta} "
                f"for {patience_epochs} epochs."
            )
            break

    if early_stopper.best_state_dict is not None:
        model.load_state_dict(early_stopper.best_state_dict)
        print("Loaded best model weights (by validation loss).")

    return model, epoch_loss, epoch_acc, epoch_val_loss, epoch_val_acc

Evaluate model's performance

In [76]:
def evaluate_model(
    model: torch.nn.Module,
    dataloader: torch.utils.data.DataLoader,
    loss_function: torch.nn.Module,
    device: torch.device
) -> Tuple[float, float]:
    model.eval()

    total_loss = 0.0
    total_correct = 0
    total_samples = 0

    with torch.no_grad():
        for images, targets in dataloader:
            images = images.to(device, non_blocking=True)
            targets = targets.to(device, non_blocking=True).long()

            logits = model(images)
            if isinstance(logits, (tuple, list)):  # in case model returns extra outputs
                logits = logits[0]

            loss = loss_function(logits, targets)

            total_loss += loss.item() * images.size(0)
            preds = logits.argmax(dim=1)
            total_correct += (preds == targets).sum().item()
            total_samples += images.size(0)

    avg_loss = total_loss / total_samples
    accuracy = total_correct / total_samples

    return avg_loss, accuracy

Training Yolo

Loading Yolo, classification variant.

In [77]:
yolo_model = torch.hub.load(
    "ultralytics/yolov5",
    "custom",
    path=f"{YOLOV5_MODEL}-cls.pt",   # classification checkpoint
    autoshape=False,        # important for training
    verbose=False
)

[31m[1mrequirements:[0m Ultralytics requirements ['gitpython>=3.1.30', 'setuptools>=70.0.0'] not found, attempting AutoUpdate...

[31m[1mrequirements:[0m AutoUpdate success  2.0s



YOLOv5  2026-1-26 Python-3.11.0 torch-2.5.1+cu121 CUDA:0 (NVIDIA GeForce RTX 2070 SUPER, 8192MiB)



Replacing classifier head with a different classifier due to different categories and nubmer of categories


In [78]:
def replace_last_linear_layer(model: nn.Module, num_classes: int) -> nn.Module:
    last_linear_name = None
    last_linear_module = None

    for module_name, module in model.named_modules():
        if isinstance(module, nn.Linear):
            last_linear_name = module_name
            last_linear_module = module

    if last_linear_module is None:
        raise RuntimeError("No nn.Linear layer found to replace.")

    new_linear_layer = nn.Linear(last_linear_module.in_features, num_classes)

    # set by walking to the parent module
    parent = model
    name_parts = last_linear_name.split(".")
    for part in name_parts[:-1]:
        parent = getattr(parent, part)
    setattr(parent, name_parts[-1], new_linear_layer)

    print(f"Replaced: {last_linear_name} -> Linear({last_linear_module.in_features}, {num_classes})")
    return model

# usage
yolo_model = replace_last_linear_layer(yolo_model, num_classes)
yolo_model = yolo_model.to(device)
yolo_model.train()

Replaced: model.model.9.linear -> Linear(1280, 102)


DetectMultiBackend(
  (model): ClassificationModel(
    (model): Sequential(
      (0): Conv(
        (conv): Conv2d(3, 32, kernel_size=(6, 6), stride=(2, 2), padding=(2, 2), bias=False)
        (bn): BatchNorm2d(32, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
        (act): SiLU(inplace=True)
      )
      (1): Conv(
        (conv): Conv2d(32, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
        (bn): BatchNorm2d(64, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
        (act): SiLU(inplace=True)
      )
      (2): C3(
        (cv1): Conv(
          (conv): Conv2d(64, 32, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn): BatchNorm2d(32, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
          (act): SiLU(inplace=True)
        )
        (cv2): Conv(
          (conv): Conv2d(64, 32, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn): BatchNorm2d(32, eps=0.001, momentum=0.03, affine=Tru

Data regarding number of parameters in model.

In [79]:
total_params = sum(parameter.numel() for parameter in yolo_model.parameters())
trainable_params = sum(parameter.numel() for parameter in yolo_model.parameters() if parameter.requires_grad)

print("Total params:", total_params)
print("Trainable params:", trainable_params)

Total params: 4303142
Trainable params: 130662


Training code

Defining loss and optimizer

Factory is used since it cannot be used before the model is properly setup.

In [80]:
def optimizer_factory(model):
    return torch.optim.AdamW(model.parameters(), lr=3e-4, weight_decay=1e-2)

In [81]:
yolov5_optimizer = optimizer_factory
loss_function = nn.CrossEntropyLoss()

In [82]:
yolov5_classifier_model = train_with_early_stopping(
    model=yolo_model,
    train_dataloader=loaders_sets_per_seed[0][0],
    val_dataloader=loaders_sets_per_seed[0][1],
    loss_function=loss_function,
    optimizer_factory=optimizer_factory,
    device=device,
    max_epochs=MAX_EPOCHS,
)

Epoch 001 | train_loss=3.549493, train_acc=0.3101 | val_loss=2.581550, val_acc=0.5465
Epoch 002 | train_loss=2.009889, train_acc=0.7052 | val_loss=1.611691, val_acc=0.7639
Epoch 003 | train_loss=1.303480, train_acc=0.8287 | val_loss=1.171780, val_acc=0.8382
Epoch 004 | train_loss=0.943822, train_acc=0.8816 | val_loss=0.914688, val_acc=0.8760
Epoch 005 | train_loss=0.737549, train_acc=0.9143 | val_loss=0.791420, val_acc=0.8756
Epoch 006 | train_loss=0.607203, train_acc=0.9262 | val_loss=0.682145, val_acc=0.8947
Epoch 007 | train_loss=0.522110, train_acc=0.9286 | val_loss=0.618420, val_acc=0.8967
Epoch 008 | train_loss=0.451350, train_acc=0.9401 | val_loss=0.551897, val_acc=0.9011
Epoch 009 | train_loss=0.388111, train_acc=0.9525 | val_loss=0.520931, val_acc=0.9065
Epoch 010 | train_loss=0.359091, train_acc=0.9518 | val_loss=0.489103, val_acc=0.9061
Epoch 011 | train_loss=0.326899, train_acc=0.9557 | val_loss=0.454155, val_acc=0.9129
Epoch 012 | train_loss=0.291673, train_acc=0.9654 | va

KeyboardInterrupt: 

Training VGG

Evaluating Models

Model Evaluation Functions