In [None]:
import torch
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import OxfordIIITPet
from torch.utils.data import DataLoader
from torch.utils.data import random_split
import numpy as np
import matplotlib.pyplot as plt
import time

In [None]:
# Check for GPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
# Transforms
train_transform = transforms.Compose([
    transforms.Resize(224),
    transforms.CenterCrop(224),
    transforms.RandomHorizontalFlip(), # Data Augmentation
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])

test_transform = transforms.Compose([
    transforms.Resize(224),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])

In [None]:
# Load dataset
train_dataset = OxfordIIITPet(root='./data', split='trainval', target_types='category', download=True, transform=train_transform)
test_dataset = OxfordIIITPet(root='./data', split='test', target_types='category', download=True, transform=test_transform)

num_classes = 37

# Data split
train_size = int(0.8 * len(train_dataset))
val_size = len(train_dataset) - train_size
train_subset, val_subset = random_split(train_dataset, [train_size, val_size])

train_loader = DataLoader(train_subset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_subset, batch_size=32)
test_loader = DataLoader(test_dataset, batch_size=32)

In [None]:
def load_model():
    # Load pretrained model and replace final layer
    model = torchvision.models.resnet18(weights=torchvision.models.ResNet18_Weights.DEFAULT)
    model.fc = nn.Linear(model.fc.in_features, num_classes)
    model = model.to(device)

    # Freeze all layers except final classifier
    for param in model.parameters():
        param.requires_grad = False
    for param in model.fc.parameters():
        param.requires_grad = True

    return model

In [None]:
def train(model, criterion, optimizer, n_epochs):

    start_time = time.time()

    for epoch in range(n_epochs):
        model.train()
        running_loss = 0
        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)

            outputs = model(images)
            loss = criterion(outputs, labels)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

        print(f"Epoch [{epoch+1}/{n_epochs}], Loss: {running_loss/len(train_loader):.4f}")

    # Record time
    end_time = time.time()
    total_time = end_time - start_time
    print(f"Time taken: {total_time:.2f} seconds")

In [None]:
def evaluate(model, eval_loader):
    model.eval()
    correct, total = 0, 0
    with torch.no_grad():
        for images, labels in eval_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total
    print(f"Accuracy: {accuracy:.2f}%")

    return accuracy

### Baseline: Fine-tune only classification layer

In [None]:
model = load_model()

# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(filter(lambda p: p.requires_grad, model.parameters()), lr=0.0003)

# Start training
print(f"Fine-tuning only fully connected layer...")
n_epochs = 10
train(model, criterion, optimizer, n_epochs)
evaluate(model, val_loader)

### Strategy 1: Fine-tune last l layers together

In [None]:
for n in range(5):

    model = load_model()

    # Get trainable layers
    blocks = [getattr(model, layer_name) for layer_name in ['layer1', 'layer2', 'layer3', 'layer4']]

    # Unfreeze n last blocks
    if n > 0:
        for l in blocks[-n:]:
            for param in l.parameters():
                param.requires_grad = True

    # Loss and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(filter(lambda p: p.requires_grad, model.parameters()), lr=0.0003)

    # Training loop
    print(f"Partially fine-tuning last {n} layers...")
    n_epochs = 10

    train(model, criterion, optimizer, n_epochs)
    evaluate(model, val_loader)

### Strategy 2: Gradually unfreeze layers

In [None]:
model = load_model()

# Get trainable layers
blocks = [getattr(model, layer_name) for layer_name in ['layer1', 'layer2', 'layer3', 'layer4']]

# Define number of epochs and steps
n_epochs = 5
n_steps = 2
total_epochs = n_steps * n_epochs

criterion = nn.CrossEntropyLoss()

print(f"Starting gradual unfreezing...")
start_time = time.time()

# Training loop
t = 0
for step in range(n_steps):

    # Loss and optimizer
    optimizer = torch.optim.Adam(filter(lambda p: p.requires_grad, model.parameters()), lr=0.0003)

    train(model, criterion, optimizer, n_epochs)
    evaluate(model, val_loader)

    # Unfreeze all layers in the current block
    if step < len(blocks):
        block = blocks[-(step + 1)]
        for l in block:
            for param in l.parameters():
                param.requires_grad = True
        print(f"Unfroze layers in residual block {-(step + 1)}")


# Record time
end_time = time.time()
total_time = end_time - start_time
print(f"Overall time taken: {total_time:.2f} seconds")

### Tuning: Maximize performance

In [None]:
# Load pretrained model
model = load_model()

# Include batch norm params
for m in model.modules():
    if isinstance(m, nn.BatchNorm2d):
        for param in m.parameters():
            param.requires_grad = True

# Get trainable layers
blocks = [getattr(model, layer_name) for layer_name in ['layer1', 'layer2', 'layer3', 'layer4']]

# Define number of epochs and steps
n_epochs = 5
n_steps = 5
total_epochs = n_steps * n_epochs
lr = 3e-3
lam = 1e-3

criterion = nn.CrossEntropyLoss()

print(f"Starting gradual unfreezing...")
start_time = time.time()

# Training loop
t = 0
for step in range(n_steps):

    # Define optimizer
    optimizer = torch.optim.Adam(filter(lambda p: p.requires_grad, model.parameters()), lr=lr, weight_decay=lam)

    # Train and evaluate
    train(model, criterion, optimizer, n_epochs)
    evaluate(model, val_loader)

    # Decay lr and lambda
    lr *= 0.1
    lam *= 0.1

    # Unfreeze all layers in the current block
    if step < len(blocks):
        block = blocks[-(step + 1)]
        for l in block:
            for param in l.parameters():
                param.requires_grad = True
        print(f"Unfroze layers in residual block {-(step + 1)}")

# Record time
end_time = time.time()
total_time = end_time - start_time
print(f"Time taken: {total_time:.2f} seconds")

# Measure test performance
evaluate(model, test_loader)