### Assignment 2 Understanding transfer learning and fine tuning 

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torch.utils.data import Dataset
import torch.nn.functional as F
import matplotlib.pyplot as plt
from PIL import Image
from torchvision import transforms
from torch.utils.data import random_split
from torchvision import datasets
import os 
import numpy as np 
from typing import Literal
from torchvision.models import resnet18, ResNet18_Weights
from sklearn.metrics import accuracy_score, confusion_matrix, precision_score, recall_score

print(f"[INFO] Torch infos: {torch.__version__}")

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"[INFO] Using device: {device}")

# We need transform from resnet18 and its weights


In [None]:
transform = ResNet18_Weights.DEFAULT.transforms()

# pick a dataset  you can import from

```python
from torchvision import datasets
```

# Easy picks:

- Food101
- Flowers102
- DTD
- FGVAircraft

# Other picks:

```python

full_train = datasets.OxfordIIITPet(root="data", split="trainval", download=True, transform=transform)
test_ds    = datasets.OxfordIIITPet(root="data", split="test", download=True, transform=transform)

```


# Split the data

In [None]:
# Using OxfordIIITPet: 37 pet breeds, good balance of classes
# trainval split = 3680 images, test split = 3669 images

full_train = datasets.OxfordIIITPet(root="data", split="trainval", download=True, transform=transform)
test_ds    = datasets.OxfordIIITPet(root="data", split="test",     download=True, transform=transform)

# Split full_train into train (80%) and validation (20%)
train_size = int(0.8 * len(full_train))
val_size   = len(full_train) - train_size

train_ds, val_ds = random_split(full_train, [train_size, val_size])

print(f"Train size:      {len(train_ds)}")
print(f"Validation size: {len(val_ds)}")
print(f"Test size:       {len(test_ds)}")

# load the train, validation and test split 

In [None]:
# Number of output classes for OxfordIIITPet
NUM_CLASSES = 37

# create dataloader handler from the dataset

In [None]:
BATCH_SIZE = 32

train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True)
val_loader   = DataLoader(val_ds,   batch_size=BATCH_SIZE, shuffle=False)
test_loader  = DataLoader(test_ds,  batch_size=BATCH_SIZE, shuffle=False)

print(f"Train batches: {len(train_loader)}")
print(f"Val batches:   {len(val_loader)}")
print(f"Test batches:  {len(test_loader)}")

# now create the cnn, use the resnet18 as backbone, and ResNet18_Weights as initial weights.
- create the backbone
- create a classifier 
- based on your dataset add the correct number of output classes
- the classifier have to be trainable.

# Question 1?
- Why we freeze the backbone? Why not the classifier?

In [None]:
class TransferModel(nn.Module):
    def __init__(self, 
                 num_classes: int,
                 freeze_backbone: bool = True,
                 dense_units: list[int] = [],
                 dropout_prob: float = 0.3,
                 weights=ResNet18_Weights.DEFAULT):
        super().__init__()
        self.weights = weights

        # Load pretrained ResNet18 as backbone
        backbone = resnet18(weights=weights)
        
        # Remove the final FC layer, keep everything else
        self.backbone = nn.Sequential(*list(backbone.children())[:-1])
        self.backbone_out = 512  # ResNet18 outputs 512 features
        
        # Freeze backbone if requested
        if freeze_backbone:
            for param in self.backbone.parameters():
                param.requires_grad = False

        # Build MLP classifier with multiple dense layers
        mlp = []
        cur = self.backbone_out
        for h in dense_units:
            mlp += [
                nn.Linear(cur, h),
                nn.ReLU(inplace=True),
                nn.Dropout(p=dropout_prob)
            ]
            cur = h  # Update for next layer
        
        self.classifier = nn.Sequential(*mlp)
        
        # Final classification layer
        self.final_classifier = nn.Linear(cur, num_classes)

    def forward(self, x):
        x = self.backbone(x)           # (batch, 512, 1, 1)
        x = x.flatten(start_dim=1)     # (batch, 512)
        x = self.classifier(x)         # (batch, dense_units[-1] or 512)
        x = self.final_classifier(x)   # (batch, num_classes)
        return x
    
    def unfreeze_layer4(self):
        """Unfreeze the last ResNet block for fine-tuning"""
        for param in self.backbone[7].parameters():  # layer4
            param.requires_grad = True
    
    def get_transform(self):
        return self.weights.transforms()


# Create model with empty dense_units (direct 512 -> 37 mapping)
model = TransferModel(num_classes=NUM_CLASSES, freeze_backbone=True, dense_units=[]).to(device)
print(model)

## Answer to Question 1

We **freeze the backbone** because it was already trained on ImageNet and has learned rich, general-purpose features (edges, textures, shapes). Freezing it means those weights are not updated during training, which:
1. Saves a lot of computation — only the small classifier is updated.
2. Prevents destroying the learned features with a noisy gradient from our small dataset.

We do **not freeze the classifier** because it is randomly initialised and knows nothing about our dataset. It needs to be trained from scratch to map the backbone's features to our 37 pet-breed classes.

# add the train evaluation, and predict functions here

In [None]:
def train_one_epoch(model, loader, criterion, optimizer):
    """Train for one epoch, return average loss and accuracy."""
    model.train()
    total_loss = 0.0
    all_preds, all_labels = [], []

    for images, labels in loader:
        images, labels = images.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        preds = outputs.argmax(dim=1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

    avg_loss = total_loss / len(loader)
    acc = accuracy_score(all_labels, all_preds)
    return avg_loss, acc


def evaluate(model, loader, criterion):
    """Evaluate on a dataloader, return average loss and accuracy."""
    model.eval()
    total_loss = 0.0
    all_preds, all_labels = [], []

    with torch.no_grad():
        for images, labels in loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)

            total_loss += loss.item()
            preds = outputs.argmax(dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    avg_loss = total_loss / len(loader)
    acc = accuracy_score(all_labels, all_preds)
    return avg_loss, acc


def train(model, train_loader, val_loader, criterion, optimizer, epochs):
    """Train for multiple epochs, return history of losses and accuracies."""
    train_losses, val_losses = [], []
    train_accs,   val_accs   = [], []

    for epoch in range(1, epochs + 1):
        train_loss, train_acc = train_one_epoch(model, train_loader, criterion, optimizer)
        val_loss,   val_acc   = evaluate(model, val_loader, criterion)

        train_losses.append(train_loss)
        val_losses.append(val_loss)
        train_accs.append(train_acc)
        val_accs.append(val_acc)

        if epoch % 10 == 0:
            print(f"Epoch {epoch:3d}/{epochs} | "
                  f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f} | "
                  f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")

    return train_losses, val_losses, train_accs, val_accs


def predict(model, loader):
    """Run inference on a dataloader, return all predictions and true labels."""
    model.eval()
    all_preds, all_labels = [], []

    with torch.no_grad():
        for images, labels in loader:
            images = images.to(device)
            outputs = model(images)
            preds = outputs.argmax(dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.numpy())

    return np.array(all_preds), np.array(all_labels)

### Phase 1: Transfer learning

# freeze all layers except the classifier.
- train and evaluate the model for 50 epochs 
- remember to save val loss and train loss

In [None]:
# Backbone is already frozen in __init__ (freeze_backbone=True)
# Only trainable parameters are in classifier and final_classifier

optimizer = optim.Adam(filter(lambda p: p.requires_grad, model.parameters()), lr=1e-3)
criterion = nn.CrossEntropyLoss()

# Train for 50 epochs using the train() function
phase1_train_losses, phase1_val_losses, phase1_train_accs, phase1_val_accs = train(
    model, train_loader, val_loader, criterion, optimizer, epochs=50
)

# Plot accuracy and loss

In [None]:
epochs_range = range(1, 51)

fig, axes = plt.subplots(1, 2, figsize=(12, 4))

# Loss curve
axes[0].plot(epochs_range, phase1_train_losses, label="Train Loss")
axes[0].plot(epochs_range, phase1_val_losses,   label="Val Loss")
axes[0].set_title("Phase 1 – Loss")
axes[0].set_xlabel("Epoch")
axes[0].set_ylabel("Loss")
axes[0].legend()

# Accuracy curve
axes[1].plot(epochs_range, phase1_train_accs, label="Train Acc")
axes[1].plot(epochs_range, phase1_val_accs,   label="Val Acc")
axes[1].set_title("Phase 1 – Accuracy")
axes[1].set_xlabel("Epoch")
axes[1].set_ylabel("Accuracy")
axes[1].legend()

plt.tight_layout()
plt.show()

# plot predictions 

In [None]:
# Show a grid of 8 test images with predicted vs true class names
class_names = test_ds.classes  # list of 37 breed names

# Grab one batch from the test loader
images_batch, labels_batch = next(iter(test_loader))
outputs = model(images_batch.to(device))
preds_batch = outputs.argmax(dim=1).cpu()

# ImageNet normalisation used by ResNet – we reverse it for display
mean = torch.tensor([0.485, 0.456, 0.406])
std  = torch.tensor([0.229, 0.224, 0.225])

fig, axes = plt.subplots(2, 4, figsize=(14, 7))
for i, ax in enumerate(axes.flat):
    img = images_batch[i].permute(1, 2, 0) * std + mean
    img = img.clamp(0, 1).numpy()
    ax.imshow(img)
    true_name = class_names[labels_batch[i]]
    pred_name = class_names[preds_batch[i]]
    color = "green" if labels_batch[i] == preds_batch[i] else "red"
    ax.set_title(f"True: {true_name}\nPred: {pred_name}", color=color, fontsize=8)
    ax.axis("off")

plt.suptitle("Phase 1 Predictions (green = correct, red = wrong)", fontsize=12)
plt.tight_layout()
plt.show()

# calculate TEST accuracy score 

In [None]:
phase1_preds, phase1_labels = predict(model, test_loader)
phase1_test_acc = accuracy_score(phase1_labels, phase1_preds)
print(f"Phase 1 Test Accuracy: {phase1_test_acc:.4f}")

# Calculate confusion matrices precision and recall 

In [None]:
# Confusion matrix
cm = confusion_matrix(phase1_labels, phase1_preds)

fig, ax = plt.subplots(figsize=(14, 12))
im = ax.imshow(cm, cmap="Blues")
plt.colorbar(im, ax=ax)
ax.set_title("Phase 1 – Confusion Matrix")
ax.set_xlabel("Predicted")
ax.set_ylabel("True")
plt.tight_layout()
plt.show()

# Per-class precision and recall (macro average for a single summary number)
precision_macro = precision_score(phase1_labels, phase1_preds, average="macro", zero_division=0)
recall_macro    = recall_score(phase1_labels, phase1_preds,    average="macro", zero_division=0)

print(f"Phase 1 Macro Precision: {precision_macro:.4f}")
print(f"Phase 1 Macro Recall:    {recall_macro:.4f}")

### Phase 2: Freeze layer 4 - Fine tuning

# from the freezed cnn unfreeze the  ``` layer4 ```. 

In [None]:
# Use the built-in method to unfreeze layer4
model.unfreeze_layer4()

# Create optimizer with differential learning rates
# Lower LR for backbone, higher LR for classifier
optimizer = optim.Adam([
    {"params": model.backbone[7].parameters(),    "lr": 1e-4},  # layer4
    {"params": model.classifier.parameters(),     "lr": 1e-3},
    {"params": model.final_classifier.parameters(), "lr": 1e-3},
])

print("layer4 and classifier are now trainable.")

# Train for 50 epochs 

In [None]:
# Train for 50 epochs using the train() function
phase2_train_losses, phase2_val_losses, phase2_train_accs, phase2_val_accs = train(
    model, train_loader, val_loader, criterion, optimizer, epochs=50
)

# Plot curves

In [None]:
fig, axes = plt.subplots(1, 2, figsize=(12, 4))

axes[0].plot(epochs_range, phase2_train_losses, label="Train Loss")
axes[0].plot(epochs_range, phase2_val_losses,   label="Val Loss")
axes[0].set_title("Phase 2 – Loss")
axes[0].set_xlabel("Epoch")
axes[0].set_ylabel("Loss")
axes[0].legend()

axes[1].plot(epochs_range, phase2_train_accs, label="Train Acc")
axes[1].plot(epochs_range, phase2_val_accs,   label="Val Acc")
axes[1].set_title("Phase 2 – Accuracy")
axes[1].set_xlabel("Epoch")
axes[1].set_ylabel("Accuracy")
axes[1].legend()

plt.tight_layout()
plt.show()

# visualize prediction

In [None]:
# Reuse the same batch for a fair visual comparison with Phase 1
outputs = model(images_batch.to(device))
preds_batch = outputs.argmax(dim=1).cpu()

fig, axes = plt.subplots(2, 4, figsize=(14, 7))
for i, ax in enumerate(axes.flat):
    img = images_batch[i].permute(1, 2, 0) * std + mean
    img = img.clamp(0, 1).numpy()
    ax.imshow(img)
    true_name = class_names[labels_batch[i]]
    pred_name = class_names[preds_batch[i]]
    color = "green" if labels_batch[i] == preds_batch[i] else "red"
    ax.set_title(f"True: {true_name}\nPred: {pred_name}", color=color, fontsize=8)
    ax.axis("off")

plt.suptitle("Phase 2 Predictions (green = correct, red = wrong)", fontsize=12)
plt.tight_layout()
plt.show()

# Calculate test accuracy score 

In [None]:
phase2_preds, phase2_labels = predict(model, test_loader)
phase2_test_acc = accuracy_score(phase2_labels, phase2_preds)
print(f"Phase 1 Test Accuracy: {phase1_test_acc:.4f}")
print(f"Phase 2 Test Accuracy: {phase2_test_acc:.4f}")
print(f"Improvement:           {phase2_test_acc - phase1_test_acc:+.4f}")

# calculate confusion matrix precision and recall

In [None]:
cm2 = confusion_matrix(phase2_labels, phase2_preds)

fig, ax = plt.subplots(figsize=(14, 12))
im = ax.imshow(cm2, cmap="Blues")
plt.colorbar(im, ax=ax)
ax.set_title("Phase 2 – Confusion Matrix")
ax.set_xlabel("Predicted")
ax.set_ylabel("True")
plt.tight_layout()
plt.show()

precision_macro2 = precision_score(phase2_labels, phase2_preds, average="macro", zero_division=0)
recall_macro2    = recall_score(phase2_labels, phase2_preds,    average="macro", zero_division=0)

print(f"Phase 2 Macro Precision: {precision_macro2:.4f}")
print(f"Phase 2 Macro Recall:    {recall_macro2:.4f}")

## Answer to Question 2 – What is the difference between transfer learning and fine tuning?

**Transfer learning** (Phase 1) means taking a model that was pre-trained on a large dataset (ImageNet) and re-using its feature-extraction layers as-is, frozen. Only a new classification head on top is trained. This is fast and works well even with limited data, because the frozen layers already know how to detect general visual features.

**Fine tuning** (Phase 2) goes one step further: after the classifier has been trained, we unfreeze some of the later backbone layers (here `layer4`) and continue training with a *lower* learning rate. This allows the model to adapt the high-level features of the backbone to the specifics of our dataset, usually pushing accuracy higher. We use a small learning rate so we adjust the pretrained weights gently rather than overwriting them.

**Key takeaway:** Transfer learning gets you a good baseline quickly; fine tuning squeezes out extra performance by letting the network specialise its features to your task. The trade-off is that fine tuning takes more time and requires careful learning rate selection to avoid destabilising the pretrained weights.