# Deep Learning Applications: Laboratory #1

## Exercise 1: Warming Up

In [None]:
# Start with some standard imports.
import numpy as np
import matplotlib.pyplot as plt
from functools import reduce
import torch
import torchvision
from torchvision.datasets import MNIST
from torch.utils.data import Subset
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as transforms

import wandb
from datetime import datetime

#### Data preparation

Here is some basic dataset loading, validation splitting code to get you started working with MNIST.

In [None]:
# Standard MNIST transform.
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.1307,), (0.3081,))
])

# Load MNIST train and test.
ds_train = MNIST(root='./data', train=True, download=True, transform=transform)
ds_test = MNIST(root='./data', train=False, download=True, transform=transform)

# Split train into train and validation.
val_size = 5000
I = np.random.permutation(len(ds_train))
ds_val = Subset(ds_train, I[:val_size])
ds_train = Subset(ds_train, I[val_size:])

#### Boilerplate training and evaluation code

This is some **very** rough training, evaluation, and plotting code. Again, just to get you started. I will be *very* disappointed if any of this code makes it into your final submission.

In [None]:
from tqdm import tqdm
from sklearn.metrics import accuracy_score, classification_report

# Function to train a model for a single epoch over the data loader.
def train_epoch(model, dl, opt, epoch='Unknown', device='cpu'):
    model.train()
    losses = []
    for (xs, ys) in tqdm(dl, desc=f'Training epoch {epoch}', leave=True):
        xs, ys = xs.to(device), ys.to(device)
        # Zero out the gradients.
        opt.zero_grad()
        # Forward through the model.
        logits = model(xs) 
        # Compute the cross-entropy loss.
        loss = F.cross_entropy(logits, ys)
        # Backward through the model.
        loss.backward()
        # Update the model parameters.
        opt.step()
        # Save the loss value.
        losses.append(loss.item())
    # Return the average loss for this epoch.
    return np.mean(losses)

# Function to evaluate model over all samples in the data loader.
def evaluate_model(model, dl, device='cpu'):
    model.eval()
    predictions = []
    ground_truths = []
    with torch.no_grad():
        for (xs, ys) in tqdm(dl, desc='Evaluating', leave=False):
            xs = xs.to(device)
            logits = model(xs)
            preds = torch.argmax(logits, 1)
            
            # Save the ground truth and predictions.
            ground_truths.append(ys)
            predictions.append(preds.detach().cpu().numpy())
            
    predictions = np.hstack(predictions)
    ground_truths = np.hstack(ground_truths)
    
    # Return accuracy score and classification report.
    return accuracy_score(ground_truths, predictions), classification_report(ground_truths, predictions, zero_division=0, digits=3)

# Simple function to plot the loss curve and validation accuracy.
def plot_validation_curves(training_history):
    losses, accuracies = zip(*training_history)
    plt.figure(figsize=(16, 8))

    plt.subplot(1, 2, 1)
    plt.plot(losses)
    plt.title('Average Training Loss per Epoch')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')

    plt.subplot(1, 2, 2)
    plt.plot(accuracies)
    plt.title(f'Best Accuracy = {np.max(accuracies)} @ epoch {np.argmax(accuracies)}')
    plt.xlabel('Epoch')
    plt.ylabel('Validation Accuracy')
    plt.show()

In [None]:
def train_model(model, dl_train, dl_val, opt, epochs, model_name, dataset_type, lr, batch_size, device='cpu'):
    wandb.init(
        # set the wandb project where this run will be logged
        project="DLA Assigment 1",
        name=model_name + "-" + datetime.now().strftime("%Y%m%d-%H%M%S"),
        # track hyperparameters and run metadata
        config={
            "architecture": model_name,
            "dataset": dataset_type,
            "epochs": epochs,
            "learning_rate": lr,
            "batch_size": batch_size,
            "device": device,
            "optimizer": "Adam"
        }
    )
    losses_and_accs = []
    for epoch in range(epochs):
        loss = train_epoch(model, dl_train, opt, epoch, device=device)
        (val_acc, _) = evaluate_model(model, dl_val, device=device)
        losses_and_accs.append((loss, val_acc))
        
        print(f'Epoch {epoch}: Loss - {loss:.4f}, Validation Acc - {val_acc:.4f}')
        # wandb
        wandb.log({"acc": val_acc, "loss": loss})
                
    # [optional] finish the wandb run, necessary in notebooks
    wandb.finish()    

    torch.save(model.state_dict(), f"model_states/model_{model_name}.pt")

    return losses_and_accs

#### A basic, parameterized MLP

This is a very basic implementation of a Multilayer Perceptron. Don't waste too much time trying to figure out how it works -- the important detail is that it allows you to pass in a list of input, hidden layer, and output *widths*. **Your** implementation should also support this for the exercises to come.

In [None]:
class MLP(nn.Module):
    def __init__(self, layer_sizes):
        super().__init__()
        self.layers = nn.ModuleList([nn.Linear(nin, nout) for (nin, nout) in zip(layer_sizes[:-1], layer_sizes[1:])])
    
    def forward(self, x):
        return reduce(lambda f, g: lambda x: g(F.relu(f(x))), self.layers, lambda x: x.flatten(1))(x)

In [None]:
# A non-parametric CNN.
class ConvBlock(nn.Module):
    def __init__(self, num=1, channels=8, size=3):
        super().__init__()
        self.layers = nn.ModuleList([nn.Conv2d(channels, channels, kernel_size=size, padding='same') for _ in range(num)])

    def forward(self, x):
        return reduce(lambda f, g: lambda x: g(F.relu(f(x))), self.layers, lambda x: x)(x)
    
class CNN(nn.Module):
    def __init__(self, num=2, channels=8, size=3):
        super().__init__()
        self.conv1 = nn.Conv2d(1, channels, kernel_size=size)
        self.cblock1 = ConvBlock(num=num, channels=channels, size=size)
        self.cblock2 = ConvBlock(num=num, channels=channels, size=size)
        self.cblock3 = ConvBlock(num=num, channels=channels, size=size)
        self.output = nn.Linear(channels, 10)
        
    def forward(self, x):
        x = self.conv1(x)
        x = self.cblock1(x)
        x = F.max_pool2d(x, 3, 2)
        x = self.cblock2(x)
        x = F.max_pool2d(x, 3, 2)
        x = self.cblock3(x)
        x = F.avg_pool2d(x, kernel_size=x.shape[2:]).squeeze()
        return self.output(x)

#### A *very* minimal training pipeline.

Here is some basic training and evaluation code to get you started.

**Important**: I cannot stress enough that this is a **terrible** example of how to implement a training pipeline. You can do better!

In [None]:
# Training hyperparameters.
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f'Using device: {device}')
epochs = 50
lr = 0.0001
batch_size = 128

# Architecture hyperparameters.
input_size = 28*28
width = 16
depth = 2

# Dataloaders.
dl_train = torch.utils.data.DataLoader(ds_train, batch_size, shuffle=True, num_workers=4)
dl_val   = torch.utils.data.DataLoader(ds_val, batch_size, num_workers=4)
dl_test  = torch.utils.data.DataLoader(ds_test, batch_size, shuffle=True, num_workers=4)

# Instantiate model and optimizer.
model_mlp = MLP([input_size] + [width]*depth + [10]).to(device)
opt = torch.optim.Adam(params=model_mlp.parameters(), lr=lr)

# Training 
losses_and_accs = train_model(model_mlp, dl_train, dl_val, opt, epochs, "MLP", "MNIST", lr, batch_size, device=device)

# And finally plot the curves.
plot_validation_curves(losses_and_accs)
print(f'Accuracy report on TEST:\n {evaluate_model(model_mlp, dl_test, device=device)[1]}')

### Exercise 1.1: A baseline MLP

In [None]:
class SimpleMLP(nn.Module):
    def __init__(self):
        super().__init__()
        self.layers = nn.ModuleList([
            nn.Linear(28*28, 128),  # First narrow layer
            nn.Linear(128, 128),  # Second narrow layer
            nn.Linear(128, 64),  # Third narrow layer
            nn.Linear(64, 64), # Fourth narrow layer
            nn.Linear(64, 10)  # Output layer
        ])
    
    def forward(self, x):
        x = x.flatten(1)
        for layer in self.layers[:-1]:
            x = F.relu(layer(x))
        return x

In [None]:
# Training hyperparameters.
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f'Using device {device}')
epochs = 50
lr = 0.0001
batch_size = 128

# Dataloaders.
dl_train = torch.utils.data.DataLoader(ds_train, batch_size, shuffle=True, num_workers=4)
dl_val   = torch.utils.data.DataLoader(ds_val, batch_size, num_workers=4)
dl_test  = torch.utils.data.DataLoader(ds_test, batch_size, shuffle=True, num_workers=4)

# Instantiate model and optimizer.
model_mlp = SimpleMLP().to(device)
print(model_mlp.layers)
opt = torch.optim.Adam(params=model_mlp.parameters(), lr=lr)

# Training
losses_and_accs = train_model(model_mlp, dl_train, dl_val, opt, epochs, "SimpleMLP", "MNIST", lr, batch_size, device=device)

# And finally plot the curves.
plot_validation_curves(losses_and_accs)
print(f'Accuracy report on TEST:\n {evaluate_model(model_mlp, dl_test, device=device)[1]}')

### Exercise 1.2: Rinse and Repeat

Repeat the verification you did above, but with **Convolutional** Neural Networks. If you were careful about abstracting your model and training code, this should be a simple exercise. Show that **deeper** CNNs *without* residual connections do not always work better and **even deeper** ones *with* residual connections.

**Hint**: You probably should do this exercise using CIFAR10, since MNIST is *very* easy (at least up to about 99% accuracy).

**Spoiler**: If you plan to do optional exercise 2.3, you should think *very* carefully about the architectures of your CNNs here (so you can reuse them!).

In [26]:
# Define the basic building blocks of ResNet: Residual Block
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        
        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels),
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out
    
# Define the ResNet model
class ResNet18(nn.Module):
    def __init__(self, num_blocks, num_classes):
        super(ResNet18, self).__init__()
        self.in_channels = 64    

        self.conv1 = nn.Conv2d(3, self.in_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn = nn.BatchNorm2d(self.in_channels)

        self.layer1 = self._make_layer(ResidualBlock, 64, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(ResidualBlock, 128, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(ResidualBlock, 256, num_blocks[2], stride=2)
        self.layer4 = self._make_layer(ResidualBlock, 512, num_blocks[3], stride=2)

        self.fc = nn.Linear(512, num_classes)

    def _make_layer(self, block, out_channels, num_blocks, stride):
        strides = [stride] + [1]*(num_blocks-1)    # [stride, 1, 1, ...] (num_blocks times) ensures that the first block has stride=stride
        layers = []
        for stride in strides:
            layers.append(block(self.in_channels, out_channels, stride))
            self.in_channels = out_channels
        return nn.Sequential(*layers)
    
    def forward(self, x):
        out = F.relu(self.bn(self.conv1(x)))    # [batch_size, 3, 32, 32] -> [batch_size, 64, 32, 32]
        out = self.layer1(out)                     # [batch_size, 64, 32, 32] -> [batch_size, 64, 32, 32] (stride=1)
        out = self.layer2(out)                     # [batch_size, 64, 32, 32] -> [batch_size, 128, 16, 16] (stride=2)
        out = self.layer3(out)                     # [batch_size, 128, 16, 16] -> [batch_size, 256, 8, 8] (stride=2)
        out = self.layer4(out)                     # [batch_size, 256, 8, 8] -> [batch_size, 512, 4, 4] (stride=2)
        out = F.avg_pool2d(out, 4)                   # [batch_size, 512, 4, 4] -> [batch_size, 512, 1, 1] (avg_pool2d)
        out = out.view(out.size(0), -1)            # [batch_size, 512, 1, 1] -> [batch_size, 512]
        out = self.fc(out)                         # [batch_size, 512] -> [batch_size, num_classes]
        return out

In [None]:
# Load CIFAR10 dataset
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])
trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
testset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)

# Split train into train and validation.
val_size = 5000
I = np.random.permutation(len(trainset))
ds_val = Subset(trainset, I[:val_size])
ds_train = Subset(trainset, I[val_size:])

In [28]:
# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Define hyperparameters
batch_size = 128
epochs = 20
learning_rate = 0.001

dl_train = torch.utils.data.DataLoader(trainset, batch_size=batch_size, shuffle=True, num_workers=2)
dl_val = torch.utils.data.DataLoader(ds_val, batch_size=batch_size, num_workers=2)
dl_test = torch.utils.data.DataLoader(testset, batch_size=batch_size, shuffle=False, num_workers=2)

In [None]:
# Initialize the CNN model
CNN = CNN().to(device)

# Define loss function and optimizer
optimizer = torch.optim.Adam(CNN.parameters(), lr=learning_rate)

# Training CNN
losses_and_accs = train_model(CNN, dl_train, dl_val, optimizer, epochs, "CNN", "CIFAR10", learning_rate, batch_size, device=device)

# And finally plot the curves.
plot_validation_curves(losses_and_accs)
print(f'Accuracy report on TEST:\n {evaluate_model(CNN, dl_test, device=device)[1]}')

In [27]:
# Initialize the ResCNN model
ResNet18 = ResNet18(num_blocks=[2,2,2,2], num_classes=10).to(device)

# Define loss function and optimizer
optimizer = torch.optim.Adam(ResNet18.parameters(), lr=learning_rate)

# Training ResCNN
losses_and_accs = train_model(ResNet18, dl_train, dl_val, optimizer, epochs, "ResNet18", "CIFAR10", learning_rate, batch_size, device=device)

# And finally plot the curves.
plot_validation_curves(losses_and_accs)
print(f'Accuracy report on TEST:\n {evaluate_model(ResNet18, dl_test, device=device)[1]}')

VBox(children=(Label(value='0.001 MB of 0.015 MB uploaded\r'), FloatProgress(value=0.07799479166666666, max=1.…

VBox(children=(Label(value='Waiting for wandb.init()...\r'), FloatProgress(value=0.011288888888925108, max=1.0…

Training epoch 0:   1%|          | 4/391 [00:19<30:50,  4.78s/it]  


KeyboardInterrupt: 

-----
## Exercise 2: Choose at Least One

Below are **three** exercises that ask you to deepen your understanding of Deep Networks for visual recognition. You must choose **at least one** of the below for your final submission -- feel free to do **more**, but at least **ONE** you must submit.

### Exercise 2.1: Explain why Residual Connections are so effective
Use your two models (with and without residual connections) you developed above to study and **quantify** why the residual versions of the networks learn more effectively.

**Hint**: A good starting point might be looking at the gradient magnitudes passing through the networks during backpropagation.

In [None]:
def gradient_magnitudes(model):
    gradient_magnitudes = []
    for param in model.parameters():
        gradient_magnitudes.append(param.grad.abs().mean().item())
    return gradient_magnitudes

In [None]:
# Calculate gradient magnitudes for CNN and ResCNN
cnn_gradients = gradient_magnitudes(model_cnn)
residual_gradients = gradient_magnitudes(model_rescnn)

# Print mean gradient magnitudes
print("Mean Gradient Magnitudes - CNN:")
print(sum(cnn_gradients) / len(cnn_gradients))

print("Mean Gradient Magnitudes - Residual CNN:")
print(sum(residual_gradients) / len(residual_gradients))

### Exercise 2.2: Fully-convolutionalize a network.
Take one of your trained classifiers and **fully-convolutionalize** it. That is, turn it into a network that can predict classification outputs at *all* pixels in an input image. Can you turn this into a **detector** of handwritten digits? Give it a try.

**Hint 1**: Sometimes the process of fully-convolutionalization is called "network surgery".

**Hint 2**: To test your fully-convolutionalized networks you might want to write some functions to take random MNIST samples and embed them into a larger image (i.e. in a regular grid or at random positions).

In [None]:
# A non-parametric CNN.
class ResConvBlock(nn.Module):
    def __init__(self, num=1, channels=8, size=3):
        super().__init__()
        self.layers = nn.ModuleList([nn.Conv2d(channels, channels, kernel_size=size, padding=(size-1)//2) for _ in range(num)])

    def forward(self, x):
        for layer in self.layers:
            res = x
            x = F.relu(layer(x) + res)
        return x

class FullyResCNN(nn.Module):
    def __init__(self, num=2, channels=8, size=3):
        super().__init__()
        self.conv1 = nn.Conv2d(1, channels, kernel_size=size, padding=(size - 1) // 2)
        self.cblock1 = ConvBlock(num=num, channels=channels, size=size)
        self.cblock2 = ConvBlock(num=num, channels=channels, size=size)
        self.cblock3 = ConvBlock(num=num, channels=channels, size=size)
        
    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = self.cblock1(x)
        x = F.max_pool2d(x, 3, 2)
        x = self.cblock2(x)
        x = F.max_pool2d(x, 3, 2)
        x = self.cblock3(x)
        x = F.avg_pool2d(x, kernel_size=x.shape[2:]).squeeze()
        return x

### Exercise 2.3: *Explain* the predictions of a CNN

Use the CNN model you trained in Exercise 1.2 and implement [*Class Activation Maps*](http://cnnlocalization.csail.mit.edu/#:~:text=A%20class%20activation%20map%20for,decision%20made%20by%20the%20CNN.):

> B. Zhou, A. Khosla, A. Lapedriza, A. Oliva, and A. Torralba. Learning Deep Features for Discriminative Localization. CVPR'16 (arXiv:1512.04150, 2015).

Use your implementation to demonstrate how your trained CNN *attends* to specific image features to recognize *specific* classes.

**Note**: Feel free to implement [Grad-CAM](https://arxiv.org/abs/1610.02391) instead of CAM.

In [None]:
# Your code here.