# Deep Learning - Exercise 3
## Emanuele Fontana

In this notebook we'll try to reach 90% accuracy on CIFAR-10 with 2 approaches:
1. Using a simple Convolutional Neural Network (CNN) built with PyTorch
2. Using Transfer Learning with a pre-trained model

### Imports and Data Loading

In [1]:
import torch
import torchvision
import torchvision.transforms as transforms
import torch.optim as optim
from torchmetrics import Accuracy
from PIL import Image
import torch.nn as nn
import torch.nn.functional as F
import torchvision.models as models

device=torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")


# Define transformations for CNN training (32x32)
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.RandomHorizontalFlip(),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])
test_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])
# Define transformations for effnet18 training (224x224 with ImageNet normalization)
transform_effnet = transforms.Compose([
    transforms.Resize((224, 224)),  # Ridimensiona a 224x224 per effnet18
    transforms.ToTensor(),
    transforms.Normalize((0.485, 0.456, 0.406),  # ImageNet normalization (mean)
                         (0.229, 0.224, 0.225))   # ImageNet normalization (std)
])



# Custom dataset with multiple augmented versions
class AugmentedCIFAR10(torchvision.datasets.CIFAR10):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.num_augmentations = 2
    
    def __len__(self):
        return len(self.data) * self.num_augmentations
    
    def __getitem__(self, idx):
        # Get the original image and label
        original_idx = idx // self.num_augmentations
        image_array = self.data[original_idx]
        label = self.targets[original_idx]
        
        # Convert numpy array to PIL Image
        image = Image.fromarray(image_array)
        
        if idx % self.num_augmentations == 0:
            #original image
            newImage = transforms.ToTensor()(image)
            newImage = transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))(newImage)
        else:
            newImage = self.transform(image)

        
        return newImage, label

# Load dataset with augmentation for CNN
trainset = AugmentedCIFAR10(root='./data', train=True,
                            download=True, transform=transform) 
trainloader = torch.utils.data.DataLoader(trainset, batch_size=300,
                                          shuffle=True)

testset = torchvision.datasets.CIFAR10(root='./data', train=False,
                                       download=True, transform=test_transform)
testloader = torch.utils.data.DataLoader(testset, batch_size=300,
                                         shuffle=False)


# Load dataset with augmentation for effnet18
trainset_effnet = torchvision.datasets.CIFAR10(root='./data', train=True,
                                   download=True, transform=transform_effnet) 
trainloader_effnet = torch.utils.data.DataLoader(trainset_effnet, batch_size=64,
                                                 shuffle=True)

testset_effnet = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform_effnet)
testloader_effnet = torch.utils.data.DataLoader(testset_effnet, batch_size=64,
                                                shuffle=False)

classes = ['airplane', 'automobile', 'bird', 'cat', 'deer',
           'dog', 'frog', 'horse', 'ship', 'truck']


ModuleNotFoundError: No module named 'torchmetrics'

### 1 - Simple CNN

In [25]:
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        # Block 1
        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, padding=1)
        self.gel1 = nn.GELU()
        self.bn1 = nn.BatchNorm2d(64)
        self.dropout1 = nn.Dropout2d(p=0.25)
        self.skip1 = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=1),
        )
        
        # Block 2
        self.conv2 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.gel2 = nn.GELU()
        self.bn2 = nn.BatchNorm2d(128)
        self.dropout2 = nn.Dropout2d(p=0.25)
        self.skip2 = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=1),
        )
        
        # Block 3
        self.conv3 = nn.Conv2d(128, 256, kernel_size=3, padding=1)
        self.gel3 = nn.GELU()
        self.bn3 = nn.BatchNorm2d(256)
        self.dropout3 = nn.Dropout2d(p=0.25)
        self.skip3 = nn.Sequential(
            nn.Conv2d(128, 256, kernel_size=1),
        )
        
        # Block 4
        self.conv4 = nn.Conv2d(256, 512, kernel_size=3, padding=1)
        self.gel4 = nn.GELU()
        self.bn4 = nn.BatchNorm2d(512)
        self.dropout4 = nn.Dropout2d(p=0.25)
        self.skip4 = nn.Sequential(
            nn.Conv2d(256, 512, kernel_size=1),
        )
        
        self.pool = nn.MaxPool2d(kernel_size=2)
        
        # MLP head for classification
        self.fc1 = nn.Linear(512, 256)
        self.fcre1lu = nn.ReLU()
        self.bc1 = nn.BatchNorm1d(256)
        self.fc2 = nn.Linear(256, 128)
        self.fcre2lu = nn.ReLU()
        self.bc2 = nn.BatchNorm1d(128)
        self.fc3 = nn.Linear(128, 10)
        
    def forward(self, x):
        # First convolutional block with skip connection
        identity = self.skip1(x)
        x = self.dropout1(self.bn1(self.gel1(self.conv1(x))))
        x = x + identity
        x = self.pool(x)
        
        # Second convolutional block with skip connection
        identity = self.skip2(x)
        x = self.dropout2(self.bn2(self.gel2(self.conv2(x))))
        x = x + identity
        x = self.pool(x)
        
        # Third convolutional block with skip connection
        identity = self.skip3(x)
        x = self.dropout3(self.bn3(self.gel3(self.conv3(x))))
        x = x + identity
        x = self.pool(x)
        
        # Fourth convolutional block with skip connection
        identity = self.skip4(x)
        x = self.dropout4(self.bn4(self.gel4(self.conv4(x))))
        x = x + identity
        
        # Global pooling to keep the linear head lightweight
        x = F.adaptive_avg_pool2d(x, 1)
        x = torch.flatten(x, 1)
        
        x = self.bc1(self.fcre1lu(self.fc1(x)))
        x = self.bc2(self.fcre2lu(self.fc2(x)))
        x = self.fc3(x)
        
        return x

In [26]:
import gc

gc.collect()
torch.cuda.empty_cache()

model = CNN().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=1e-3, weight_decay=1e-4)

epochs = 30
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=epochs)

# Define torchmetrics objects
metrics = {
    "train_acc": Accuracy(task="multiclass", num_classes=10).to(device),
    "test_acc": Accuracy(task="multiclass", num_classes=10).to(device),
}

patience = 10
not_improved_epochs = 0
best_test_acc = 0.0

# Training
for epoch in range(epochs):
    model.train()
    running_loss = 0.0

    # Reset train metrics each epoch
    metrics["train_acc"].reset()

    for images, labels in trainloader:
        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

        # Update training metrics
        metrics["train_acc"].update(outputs, labels)

    # Compute train metrics
    train_acc = metrics["train_acc"].compute().item()

    # Reset test metrics
    metrics["test_acc"].reset()

    # Evaluate on test set
    model.eval()
    with torch.no_grad():
        for images, labels in testloader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            metrics["test_acc"].update(outputs, labels)

    # Compute test metrics
    test_acc = metrics["test_acc"].compute().item()

    # Early stopping logic
    if test_acc > best_test_acc:
        best_test_acc = test_acc
        not_improved_epochs = 0
    else:
        not_improved_epochs += 1

    if not_improved_epochs >= patience:
        print("Early stopping triggered")
        break

    # Step the scheduler
    scheduler.step()

    # Log results
    print(f"Epoch [{epoch+1}/{epochs}] | Loss: {running_loss/len(trainloader):.4f} | "
          f"Train Acc: {train_acc:.4f} | Test Acc: {test_acc:.4f}")

KeyboardInterrupt: 

### 2 - Transfer Learning with Pre-trained Model

In [None]:

num_classes = 10
# Load pre-trained EfficientNet-B0
effnet = models.efficientnet_b0(weights=models.EfficientNet_B0_Weights.DEFAULT)

# Replace the final classification layer for 10 classes
effnet.classifier[1] = nn.Linear(effnet.classifier[1].in_features, num_classes)
effnet = effnet.to(device)

# Define optimizer and loss function for effnet
criterion_effnet = nn.CrossEntropyLoss()
optimizer_effnet = optim.AdamW(effnet.parameters(), lr=0.1)

# Start fine-tuning effnet model by learning only the classification head
# Freeze all backbone parameters
for param in effnet.parameters():
    param.requires_grad = False
# Unfreeze only the classifier head
for param in effnet.classifier.parameters():
    param.requires_grad = True

epochs_effnet = 30
patience_effnet = 5
not_improved_epochs_effnet = 0
best_test_acc_effnet = 0.0


for epoch in range(epochs_effnet):
    effnet.train()
    running_loss = 0.0

    # Reset train metrics each epoch
    metrics["train_acc"].reset()

    for images, labels in trainloader_effnet:
        images, labels = images.to(device), labels.to(device)
        optimizer_effnet.zero_grad()
        outputs = effnet(images)
        loss = criterion_effnet(outputs, labels)
        loss.backward()
        optimizer_effnet.step()
        running_loss += loss.item()

        # Update training metrics
        metrics["train_acc"].update(outputs, labels)

    # Compute train metrics
    train_acc = metrics["train_acc"].compute().item()

    # Reset test metrics
    metrics["test_acc"].reset()

    # Evaluate on test set
    effnet.eval()
    with torch.no_grad():
        for images, labels in testloader_effnet:
            images, labels = images.to(device), labels.to(device)
            outputs = effnet(images)
            metrics["test_acc"].update(outputs, labels)

    # Compute test metrics
    test_acc = metrics["test_acc"].compute().item()

    # Early stopping logic
    if test_acc > best_test_acc_effnet:
        best_test_acc_effnet = test_acc
        not_improved_epochs_effnet = 0
    else:
        not_improved_epochs_effnet += 1
    
    if not_improved_epochs_effnet >= patience_effnet:
        print("Early stopping triggered for EfficientNet-B0")
        break

    # Log results
    print(f"EfficientNet-B0 Epoch [{epoch+1}/{epochs_effnet}] | Loss: {running_loss/len(trainloader_effnet):.4f} | "
            f"Train Acc: {train_acc:.4f} | Test Acc: {test_acc:.4f}")


/content
