# Lab 02: Training a Custom Model


**Objective of this lab**: training a small custom model on the Tiny-ImageNet dataset.

In [1]:
import torch

# Usa MPS per Mac (Apple Silicon) o CPU come fallback
if torch.backends.mps.is_available():
    device = torch.device('mps')
    print("✓ Using MPS (Metal Performance Shaders) - Apple GPU")
elif torch.cuda.is_available():
    device = torch.device('cuda')
    print("✓ Using CUDA - NVIDIA GPU")
else:
    device = torch.device('cpu')
    print("✓ Using CPU")

✓ Using MPS (Metal Performance Shaders) - Apple GPU


## Dataset preparation

In [2]:
import urllib.request
import zipfile
import os

# Percorso della cartella data nella directory padre
data_dir = '../data'
os.makedirs(data_dir, exist_ok=True)

# Download del dataset
url = 'http://cs231n.stanford.edu/tiny-imagenet-200.zip'
zip_path = os.path.join(data_dir, 'tiny-imagenet-200.zip')
extract_dir = os.path.join(data_dir, 'tiny-imagenet')

if not os.path.exists(zip_path):
    print("Downloading Tiny ImageNet dataset...")
    urllib.request.urlretrieve(url, zip_path)
    print("Download completed!")
else:
    print("Dataset already downloaded.")

# Estrazione del file zip
if not os.path.exists(os.path.join(extract_dir, 'tiny-imagenet-200')):
    print("Extracting dataset...")
    os.makedirs(extract_dir, exist_ok=True)
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_dir)
    print("Extraction completed!")
else:
    print("Dataset already extracted.")

Dataset already downloaded.
Dataset already extracted.


We need to adjust the format of the val split of the dataset to be used with ImageFolder.

In [4]:
import os
import shutil

val_images_dir = '../data/tiny-imagenet/tiny-imagenet-200/val/images'

# Controlla se la riorganizzazione è già stata fatta
if os.path.exists(val_images_dir):
    print("Reorganizing validation dataset...")
    with open('../data/tiny-imagenet/tiny-imagenet-200/val/val_annotations.txt') as f:
        for line in f:
            fn, cls, *_ = line.split('\t')
            os.makedirs(f'../data/tiny-imagenet/tiny-imagenet-200/val/{cls}', exist_ok=True)
            shutil.copyfile(f'{val_images_dir}/{fn}', 
                            f'../data/tiny-imagenet/tiny-imagenet-200/val/{cls}/{fn}')
    
    shutil.rmtree(val_images_dir)
    print("Validation dataset reorganized successfully!")
else:
    print("Validation dataset already reorganized.")

Validation dataset already reorganized.


In [5]:
from torchvision.datasets import ImageFolder
import torchvision.transforms as T

transform = T.Compose([
    T.Resize((224, 224)),  # Resize to fit the input dimensions of the network
    T.ToTensor(),
    T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# root/{classX}/x001.jpg

tiny_imagenet_dataset_train = ImageFolder(root='../data/tiny-imagenet/tiny-imagenet-200/train', transform=transform)
tiny_imagenet_dataset_val = ImageFolder(root='../data/tiny-imagenet/tiny-imagenet-200/val', transform=transform)

In [7]:
print(f"Length of train dataset: {len(tiny_imagenet_dataset_train)}")
print(f"Length of val dataset: {len(tiny_imagenet_dataset_val)}")

# The following code also checks the number of samples per class
# from collections import Counter

# class_counts = Counter([target for _, target in tiny_imagenet_dataset_val])
# for class_label, count in class_counts.items():
#     print(f"Class {class_label}: {count} entries")


Length of train dataset: 100000
Length of val dataset: 10000


In [8]:
import torch

train_loader = torch.utils.data.DataLoader(tiny_imagenet_dataset_train, batch_size=32, shuffle=True, num_workers=8)
val_loader = torch.utils.data.DataLoader(tiny_imagenet_dataset_val, batch_size=32, shuffle=False)

## Custom model definition

In [9]:
import torch
from torch import nn
import torch.nn.functional as F

num_classes = 200 # 200 is the number of classes in TinyImageNet

# Define the custom neural network
class CustomNet(nn.Module):
    def __init__(self):
        super(CustomNet, self).__init__()

        """
        Due 3x3 ti danno un campo recettivo di 5x5 con meno parametri e più flessibilità grazie alle attivazioni non-lineari intermedie!

        Prima Conv (3x3):
        Input: [A B C D E F G]    Kernel copre: [B C D]
                                  Output: X (dipende da B,C,D)

        Seconda Conv (3x3):
        Input dal primo layer: [... X Y Z ...]    Kernel copre: [X Y Z]
                                                  Output finale: W

        W dipende da X,Y,Z
        Ma X dipende da A,B,C,D
        Y dipende da B,C,D,E
        Z dipende da C,D,E,F

        Quindi W dipende da: A,B,C,D,E,F = 5 pixel di larghezza!
        """

        # Convolutional blocks
        # Block 1
        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, padding=1, stride=1)
        self.bn1 = nn.BatchNorm2d(64)
        self.conv2 = nn.Conv2d(64, 64, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(64)
        self.pool1 = nn.MaxPool2d(2, 2)  # 64x64 -> 32x32

        # Block 2
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm2d(128)
        self.conv4 = nn.Conv2d(128, 128, kernel_size=3, padding=1)
        self.bn4 = nn.BatchNorm2d(128)
        self.pool2 = nn.MaxPool2d(2, 2)  # 112x112 -> 56x56

        # Block 3
        self.conv5 = nn.Conv2d(128, 256, kernel_size=3, padding=1)
        self.bn5 = nn.BatchNorm2d(256)
        self.conv6 = nn.Conv2d(256, 256, kernel_size=3, padding=1)
        self.bn6 = nn.BatchNorm2d(256)
        self.conv7 = nn.Conv2d(256, 256, kernel_size=3, padding=1)
        self.bn7 = nn.BatchNorm2d(256)
        self.pool3 = nn.MaxPool2d(2, 2)  # 56x56 -> 28x28

        # Block 4
        self.conv8 = nn.Conv2d(256, 512, kernel_size=3, padding=1)
        self.bn8 = nn.BatchNorm2d(512)
        self.conv9 = nn.Conv2d(512, 512, kernel_size=3, padding=1)
        self.bn9 = nn.BatchNorm2d(512)
        self.conv10 = nn.Conv2d(512, 512, kernel_size=3, padding=1)
        self.bn10 = nn.BatchNorm2d(512)
        self.pool4 = nn.MaxPool2d(2, 2)  # 28x28 -> 14x14

        # Block 5
        self.conv11 = nn.Conv2d(512, 512, kernel_size=3, padding=1)
        self.bn11 = nn.BatchNorm2d(512)
        self.conv12 = nn.Conv2d(512, 512, kernel_size=3, padding=1)
        self.bn12 = nn.BatchNorm2d(512)
        self.conv13 = nn.Conv2d(512, 512, kernel_size=3, padding=1)
        self.bn13 = nn.BatchNorm2d(512)
        self.pool5 = nn.MaxPool2d(2, 2)  # 14x14 -> 7x7

        # Global Average Pooling (più moderno del flatten)
        self.global_avg_pool = nn.AdaptiveAvgPool2d((1, 1))

        # Fully connected layers
        self.dropout = nn.Dropout(0.5)
        self.fc1 = nn.Linear(512, 512)
        self.fc2 = nn.Linear(512, num_classes) # professore si ferma a 256 noi andiamo ancora di un livello in più

    def forward(self, x):
        # Input: B x 3 x 224 x 224

        # Block 1
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = self.pool1(x)

        # Block 2
        x = F.relu(self.bn3(self.conv3(x)))
        x = F.relu(self.bn4(self.conv4(x)))
        x = self.pool2(x)

        # Block 3
        x = F.relu(self.bn5(self.conv5(x)))
        x = F.relu(self.bn6(self.conv6(x)))
        x = F.relu(self.bn7(self.conv7(x)))
        x = self.pool3(x)

        # Block 4
        x = F.relu(self.bn8(self.conv8(x)))
        x = F.relu(self.bn9(self.conv9(x)))
        x = F.relu(self.bn10(self.conv10(x)))
        x = self.pool4(x)

        # Block 5
        x = F.relu(self.bn11(self.conv11(x)))
        x = F.relu(self.bn12(self.conv12(x)))
        x = F.relu(self.bn13(self.conv13(x)))
        x = self.pool5(x)

        # Global Average Pooling e FC
        x = self.global_avg_pool(x)   # B x 512 x 1 x 1
        x = x.view(x.size(0), -1)     # B x 512
        x = self.dropout(F.relu(self.fc1(x)))
        x = self.fc2(x)

        return x

In [10]:
def train(epoch, model, train_loader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for batch_idx, (inputs, targets) in enumerate(train_loader):
        inputs, targets = inputs.to(device), targets.to(device)

        # Azzera i gradienti dall'iterazione precedente
        optimizer.zero_grad()

        # Forward pass
        outputs = model(inputs)

        # Calcola la loss
        loss = criterion(outputs, targets)

        # Backward pass (calcola i gradienti)
        loss.backward()

        # Aggiorna i pesi
        optimizer.step()

        running_loss += loss.item()
        _, predicted = outputs.max(1)
        total += targets.size(0)
        correct += predicted.eq(targets).sum().item()

    train_loss = running_loss / len(train_loader)
    train_accuracy = 100. * correct / total
    print(f'Train Epoch: {epoch} Loss: {train_loss:.6f} Acc: {train_accuracy:.2f}%')

In [11]:
# Validation loop
def validate(model, val_loader, criterion, device):
    model.eval()
    val_loss = 0
    correct, total = 0, 0

    with torch.no_grad():
        for batch_idx, (inputs, targets) in enumerate(val_loader):
            inputs, targets = inputs.to(device), targets.to(device)

            # Forward pass (solo inference, no training)
            outputs = model(inputs)

            # Calcola la loss
            loss = criterion(outputs, targets)

            val_loss += loss.item()
            _, predicted = outputs.max(1)
            total += targets.size(0)
            correct += predicted.eq(targets).sum().item()

    val_loss = val_loss / len(val_loader)
    val_accuracy = 100. * correct / total

    print(f'Validation Loss: {val_loss:.6f} Acc: {val_accuracy:.2f}%')
    return val_accuracy

## Putting everything together

In [None]:
model = CustomNet().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

best_acc = 0

# Run the training process for {num_epochs} epochs
num_epochs = 10
for epoch in range(1, num_epochs + 1):
    train(epoch, model, train_loader, criterion, optimizer, device)

    # At the end of each training iteration, perform a validation step
    val_accuracy = validate(model, val_loader, criterion, device)

    # Best validation accuracy
    best_acc = max(best_acc, val_accuracy)


print(f'Best validation accuracy: {best_acc:.2f}%')