# Introduction

In this assignment you will practice putting together an image classification pipeline based on CNNs for [CIFAR-10 and/or CIFAR-100](https://www.cs.toronto.edu/~kriz/cifar.html) dataset. The goals of this assignment are as follows:



*   Understand the components of a CNN model and a Vision Transformer (ViT) model.
*   Understand how to modify a standard CNN model towards a specific task.
*   Implement a basic neural network training pipeline in Pytorch.
*   Implement and train an AlexNet model.
*   Implement and train a ResNet model.
*   Implement and train a ViT model.
*   Understand the differences and tradeoffs between these models.

Please fill in all the **TODO** code blocks. Once you are ready to submit:

* Export the notebook `CSCI677_assignment_3.ipynb` as a PDF `[Your USC ID]_CSCI677_assignment_3.pdf`

Please make sure that the notebook have been run before exporting PDF, and your code and all cell outputs are visible in your submitted PDF. Regrading request will not be accepted if your code/output is not visible in the original submission. Thank you!

In case you haven't installed PyTorch yet, run the following command to install torch and torchvision.

In [35]:
!pip install torch torchvision



In [36]:
import math
import torch
import torch.nn as nn
import torch.nn.functional as F

# **Data Preparation**

[CIFAR-10](https://www.cs.toronto.edu/~kriz/cifar.html) is a well known dataset composed of 60,000 colored 32x32 images in 10 classes, with 6000 images per class. The utility function `cifar10()` returns the entire CIFAR-10 dataset as a set of four Torch tensors:
* `x_train` contains all training images (real numbers in the range  [0,1] )
* `y_train` contains all training labels (integers in the range  [0,9] )
* `x_test` contains all test images
* `y_test` contains all test labels

This function automatically downloads the CIFAR-10 dataset the first time you run it.

[CIFAR-100](https://www.cs.toronto.edu/~kriz/cifar.html) is just like the CIFAR-10 dataset, except it has 100 classes containing 600 images each. Below we provided wrapper classes for CIFAR-10 and CIFAR-100 datasets. You can choose one or both of them for training your CNNs. If you choose one of them, use the same one to train all your models.

In [37]:
from torchvision import datasets
from torchvision import transforms
from torch.utils.data import DataLoader

class CIFAR10Dataset:
    def __init__(self, batch_size=128, root="data"):
        self.transform = transforms.Compose(
            [transforms.ToTensor(),
             transforms.Normalize((0.4914, 0.4822, 0.4465), (0.247, 0.243, 0.261))]
        )
        self.batch_size = batch_size

        self.training_data = datasets.CIFAR10(
            root=root,
            train=True,
            download=True,
            transform=self.transform
        )
        self.train_dataloader = DataLoader(self.training_data, batch_size=self.batch_size, shuffle=True)

        self.test_data = datasets.CIFAR10(
            root=root,
            train=False,
            download=False,
            transform=self.transform
        )
        self.test_dataloader = DataLoader(self.test_data, batch_size=self.batch_size, shuffle=False)

        self.classes = ('plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck')


class CIFAR100Dataset:
    def __init__(self, batch_size=128, root="data"):
        self.transform = transforms.Compose(
            [transforms.ToTensor(),
             transforms.Normalize((0.5071, 0.4867, 0.4408), (0.2675, 0.2565, 0.2761))]  # CIFAR-100 normalization values
        )
        self.batch_size = batch_size

        self.training_data = datasets.CIFAR100(
            root=root,
            train=True,
            download=True,
            transform=self.transform
        )
        self.train_dataloader = DataLoader(self.training_data, batch_size=self.batch_size, shuffle=True)

        self.test_data = datasets.CIFAR100(
            root=root,
            train=False,
            download=False,
            transform=self.transform
        )
        self.test_dataloader = DataLoader(self.test_data, batch_size=self.batch_size, shuffle=False)

        self.classes = self.training_data.classes


In [38]:
# Function to count the number of trainable parameters in a model
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

# Example usage
model = torch.nn.Linear(10, 2)  # Example model
print(f"Number of parameters: {count_parameters(model)}")

Number of parameters: 22


# AlexNet (20 pts)
AlexNet, introduced by Alex Krizhevsky in 2012, marked a significant breakthrough in deep learning for computer vision. This deep convolutional neural network consists of five convolutional layers, some followed by max-pooling layers, and three fully connected layers. AlexNet was designed for large-scale image classification tasks and was notably successful in the ImageNet Large Scale Visual Recognition Challenge.

## Implement AlexNet (20 pts)
Classical AlexNet architecture is as follows:


![LeNet-5 Architecture](https://miro.medium.com/v2/resize:fit:4800/format:webp/1*wgJ9iOjl_JzjOZ3e9jDFAw.png)


The original AlexNet was designed for high-resolution images (224x224x3) from the ImageNet dataset. However, the CIFAR-10 and CIFAR-100 datasets consist of lower-resolution images (32x32x3). To adapt AlexNet for these datasets, you need to modify it.

Requirements:
* **Input Adaptation**: Modify the network to accept 32x32x3 input dimensions, suitable for CIFAR-10 and CIFAR-100 images.
* **Architecture**: Implement a network with the following layers:

  (Convolutional Layer 1 -> ReLU -> Max Pooling 1) ->

  (Convolutional Layer 2 -> ReLU -> Max Pooling 2) ->

  (Convolutional Layer 3 -> ReLU -> Convolutional Layer 4 -> ReLU -> Convolutional Layer 5 -> ReLU -> Max Pooling 3) ->

  Flattening ->

  (Linear -> ReLU) ->

  (Linear -> ReLU) -> Linear.
* Use you can design your own convolution filters and max pooling layers.
* Your model must contains less than **40 Million** parameters. We provide `count_parameters()` function to count the number of parameters in a model.

**Hint**: you can use nn.Sequential() to simplify your implementation.

In [39]:
class AlexNet(nn.Module):
    def __init__(self, num_classes=10):
        super(AlexNet, self).__init__()
        self.cnn = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, stride=2, padding=3), nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=1),
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=2, padding=3), nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=1),
            nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, stride=2, padding=1), nn.ReLU(),
            nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, stride=2, padding=1), nn.ReLU(),
            nn.Conv2d(in_channels=256, out_channels=512, kernel_size=3, stride=1, padding=1), nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Flatten()
        )
        
        self.mlp = nn.Sequential(
            nn.Linear(512, 256), nn.ReLU(),
            nn.Linear(256, 128), nn.ReLU(),
            nn.Linear(128, num_classes)
        )
    
    def forward(self, x):
        x = self.cnn(x)
        x = self.mlp(x)
        return x 


In [40]:
model = AlexNet()
x = torch.randn((1, 3, 32, 32))
print(model.cnn(x).shape)
model(x)

torch.Size([1, 512])


tensor([[ 0.0133,  0.0565, -0.0720,  0.0266, -0.0618,  0.0786, -0.0128,  0.0264,
          0.0949, -0.0431]], grad_fn=<AddmmBackward0>)

# ResNet (20 pts)
ResNet, short for Residual Network, was introduced in 2015 by Kaiming He et al. At its core, ResNet introduces the concept of residual blocks, which allows gradients to flow directly through the network's many layers. In comparison to earlier architectures like AlexNet, ResNet's approach demonstrates the transformative power of residual connections.

In this section, you will implement ResNet-18 for CIFAR-10/100.

## Implement Residual Block (10 pts)
The Residual Block is a crucial component in ResNet. It works by introducing a shortcut connection, also known as a skip connection, alongside a regular neural network layer. This shortcut connection enables the flow of information directly from one layer to another, bypassing some intermediate layers.

The key idea is to learn a residual function, which represents the difference between the desired output and the current output of the block. By doing so, the block aims to make the output closer to what it should be. This approach mitigates the vanishing gradient problem, which can occur in very deep networks, making it easier to train deep models effectively.

![Residual Block](https://miro.medium.com/v2/resize:fit:1140/format:webp/1*6WlIo8W1_Qc01hjWdZy-1Q.png)


The weight layer usually consists of a convolutional layer and a batch normalization layer. The batch normalization layer, often abbreviated as BatchNorm, normalizes the input of a neural network layer across a mini-batch of data during training. BatchNorm not only accelerates convergence but also acts as a form of regularization, reducing the risk of overfitting. In PyTorch, it is implemented by nn.BatchNorm2d().

You are asked to implement the residual block with the following requirements:
* The residual block takes input of size n * n * `in_channels` and output m * m * `out_channels` with m = (n-1) / `stride` + 1
* The residual function consists of the following components:

  Conv -> BatchNorm -> ReLU -> Conv -> BatchNorm

  where Conv means 3x3 convolutional filters with padding 1. If `stride` != 1, set stride for the first Conv.
* The shortcut should be identity if `in_channels` == `out_channels` and `stride` == 1. Otherwise, it should be a convolutional layer with kernel_size=1 and stride=`stride`.
* After adding the residual function and the shortcut, apply another ReLU activation.

In [42]:
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super(ResidualBlock, self).__init__()
        self.block = nn.Sequential(
            nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=3, stride=stride, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(out_channels)
        )
        self.skip = nn.Identity() if (stride == 1 and in_channels == out_channels) else nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride)
        self.final_act = nn.ReLU()

    def forward(self, x):
        identity = self.skip(x)
        res = self.block(x)
        res = res + identity
        res = self.final_act(res) 
        return res 

## Implement ResNet (10 pts)
ResNet-18 is part of the ResNet family, known for its exceptional depth and performance in image classification tasks. It consists of 18 layers, beginning with one convolutional layer, followed by a few residual blocks, and ending with a fully-connected layer. Here is a glimpse of its architecture:


![ResNet-18](https://www.researchgate.net/profile/Sajid-Iqbal-13/publication/336642248/figure/fig1/AS:839151377203201@1577080687133/Original-ResNet-18-Architecture.png)


In this part of the assignment, you are asked to implement a modified ResNet for CIFAR-10/100. Requirements:
* The model should take inputs of 32x32x3 and output a vector of dimension equal to the number of classes (10 for CIFAR-10 and 100 for CIFAR-100).
* The model should begin with a convolutional layer with kernel_size=3 and padding=1:

  Conv -> BatchNorm -> ReLU

  The output size should be 64x32x32.
* After the first layer, append with 4 residual blocks such that the output size changes as follows:
  
  (Input size after previous step) 64x32x32 -> 64x32x32 -> 256x16x16 -> 256x8x8 -> 512x2x2
* The model should end with average pooling (kernel_size=2), flattening, and a fully-connected layer.


In [33]:
class ResNet(nn.Module):
    def __init__(self, num_classes=10):
        super(ResNet, self).__init__()
        self.proj_in = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=64, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU()
        )
        self.trunk = nn.Sequential(
            ResidualBlock(in_channels=64, out_channels=64, stride=1),
            ResidualBlock(in_channels=64, out_channels=256, stride=2),
            ResidualBlock(in_channels=256, out_channels=256, stride=2),
            ResidualBlock(in_channels=256, out_channels=512, stride=4),
            nn.AvgPool2d(kernel_size=2),
            nn.Flatten(),
            nn.Linear(512, num_classes)
        )

    def forward(self, x):
        x = self.proj_in(x)
        x = self.trunk(x) 
        return x 

# Vision Transformer (20 pts)
The Vision Transformer (ViT), introduced in 2020 by Dosovitskiy et al., applies the Transformer architectures, originally designed for natural language processing, to visual data.

![ViT](https://miro.medium.com/v2/resize:fit:1400/format:webp/1*q0tvs1aDxi_7Otm_Zgys1A.png)

In this section, you are tasked with implementing a Vision Transformer model for the CIFAR-10 or CIFAR-100 dataset.

## Implement Patch Embedding Block with Positional Encoding (10 pts)

In Vision Transformers (ViTs), the Patch Embedding Block converts input images into a sequence of patch embeddings, enabling the model to process image data using transformer architectures. Since transformers are not inherently aware of the spatial relationships between patches, positional encoding is added to provide this information.

Overview:
- **Input:** 3x32x32 images, **Arguments:**: `patch_size` (make sure `32 % patch_size == 0` ), `embed_dim`
- Divide the image into non-overlapping patches of size `3 x patch_size x patch_size`. You should end up getting `(32 // patch_size)**2` patches.
- Flatten the pixels in each patch (into a single dimension of size `3 x patch_size x patch_size`), apply Layer Normalization, project it into a higher-dimensional space (e.g., 256 dimensions) using a fully-connected layer, and then apply another Layer Normalization.
- Add positional encodings to the patch embeddings to retain spatial information.

You are asked to implement the Patch Embedding Block as follow:
- Transform "b c (h x p) (w x p) -> b (h x w) (p x p x c)" where b is batch size, c is number of channels, h x p = 32 is the input image height, w x p = 32 is the input image width, and p is the patch size (e.g., 4).
- "b (h x w) (p x p x c)" -> LayerNorm -> fully-connected layer -> LayerNorm
- add positional encodings

In [47]:
def get_positional_encoding(seq_len, embed_dim):
    # refer to this paper https://proceedings.neurips.cc/paper_files/paper/2017/file/3f5ee243547dee91fbd053c1c4a845aa-Paper.pdf
    # follow the original implementation proposed in Section 3.5 
    pe = torch.zeros(seq_len, embed_dim)
    for pos in range(seq_len):
        for i in range(0, embed_dim, 2):
            pe[pos, i] = torch.sin(torch.Tensor([pos]) / 10_000. ** (2. * i / embed_dim))
            pe[pos, i + 1] = torch.cos(torch.Tensor([pos]) / 10_000. ** (2. * i / embed_dim))
    return pe

class PatchEmbedding(nn.Module):
    def __init__(self, patch_size, embed_dim):
        super(PatchEmbedding, self).__init__()
        self.patch_size = patch_size
        self.embed_dim = embed_dim 
        self.norm1 = nn.LayerNorm(3 * patch_size**2)
        self.proj = nn.Linear(3 * patch_size ** 2, embed_dim)
        self.norm2 = nn.LayerNorm(embed_dim)

    def forward(self, x):
        x = x.unfold(2, self.patch_size, self.patch_size).unfold(3, self.patch_size, self.patch_size)
        patches = einops.rearrange(x, 'b c nh nw ph pw -> b (nh nw) (c ph pw)')
        patches = self.norm1(patches)
        embeds = self.proj(patches)
        embeds = self.norm2(embeds)
        pe = get_positional_encoding(embeds.shape[1], self.embed_dim).to(x.device)
        embeds += pe 
        return embeds
        

In [11]:
import einops
img = torch.randn((1, 3, 32, 32))
img_patches = img.unfold(2, 8, 8).unfold(3, 8, 8)  # (B x C x NP x NP x PS x PS) 
img_patches = einops.rearrange(img_patches, 'b c nh nw ph pw -> b (nh nw) (c ph pw)')
img_patches.shape
pe = PatchEmbedding(patch_size=8, embed_dim=256)
print(pe(img).shape)

torch.Size([1, 16, 256])


## Implement Vision Transformer (ViT) (10 pts)

The Vision Transformer (ViT) model comprises three components (refer to figure above):

1. **Patch Embedding:** Converts input images into a sequence of patch embeddings.

2. **Transformer Encoder:** Processes the sequence of patch embeddings to capture complex patterns and relationships.

3. **MLP Head:** Maps the output from the Transformer Encoder to class predictions.

**Implementation Requirements:**

- **Input and Output Dimensions:** The model should accept inputs of size 32x32x3 and output a vector with a dimension equal to the number of classes (10 for CIFAR-10 and 100 for CIFAR-100).

- **Patch Embedding:** Begin with the PatchEmbedding module you previously implemented.

- **Transformer Encoder:** Utilize `nn.TransformerEncoder()` to process the sequence of patch embeddings, capturing high-level representations.

- **MLP Head:** Conclude with an mean pooling operation over the temporal dimension (dimension 1), followed by a Multi-Layer Perceptron (MLP) head that maps the pooled embeddings to class predictions.


In [55]:
class VisionTransformer(nn.Module):
    def __init__(self, img_size=32, patch_size=4, in_channels=3, embed_dim=512, depth=6, num_heads=8, num_classes=10):
        super(VisionTransformer, self).__init__()
        self.pe = PatchEmbedding(patch_size, embed_dim)
        transformer_layer = nn.TransformerEncoderLayer(d_model=embed_dim, nhead=num_heads)
        self.transformer = nn.TransformerEncoder(transformer_layer, num_layers=depth)
        self.mlp = nn.Sequential(
            nn.Linear(embed_dim, embed_dim),
            nn.ReLU(),
            nn.Linear(embed_dim, num_classes)
        )

    def forward(self, x):
        embeds = self.pe(x)
        embeds = self.transformer(embeds)
        embeds = embeds.mean(dim=1)
        preds = self.mlp(embeds)
        return preds 

# Training Neural Networks (20 pts)
In this section, you will implement a `Trainer` class, use it to train the models that you defined previously, and evaluate them.

## Check CUDA and GPUs
The following code helps you check if CUDA is available and lists the available GPUs.

In [13]:
# Check if CUDA is available
if torch.cuda.is_available():
    # Get the number of available GPUs
    num_gpus = torch.cuda.device_count()
    print(f"Number of available GPUs: {num_gpus}")

    # Get the name of each GPU
    for i in range(num_gpus):
        gpu_name = torch.cuda.get_device_name(i)
        print(f"GPU {i}: {gpu_name}")

    # Set the current GPU device
    device = torch.cuda.current_device()
    print(f"Current GPU device: {device} - {torch.cuda.get_device_name(device)}")
else:
    print("CUDA is not available.")

Number of available GPUs: 1
GPU 0: NVIDIA GeForce RTX 3090
Current GPU device: 0 - NVIDIA GeForce RTX 3090


## Complete the Trainer Class (15 pts)
Fill-in all the TODOs

In [57]:
class Trainer:
    def __init__(self, dataset, net, optimizer, loss_function=nn.CrossEntropyLoss(),
                 device="cuda:0" if torch.cuda.is_available() else "cpu"):
        self.dataset = dataset
        self.net = net.to(device)
        self.lossFunction = loss_function
        self.optimizer = optimizer
        self.device = device

    def train_one_epoch(self):
        total_loss = 0.
        for batch, labels in self.dataset.train_dataloader: 
            batch = batch.to(self.device)
            labels = labels.to(self.device)
            preds = self.net(batch)
            loss = self.lossFunction(preds, labels)
            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()
            total_loss += loss 
        return total_loss

    def compute_test_accuracy(self, path):
        accs = []
        for batch, labels in self.dataset.test_dataloader:
            batch = batch.to(self.device)
            labels = labels.to(self.device)
            preds = self.net(batch)
            pred_labels = F.softmax(preds, dim=1).argmax(dim=1)
            acc = (pred_labels == labels).float().mean()
            accs.append(acc)
        avg_acc = sum(accs) / len(accs)
        torch.save(self.net, path)
        print(f'Average Accuracy: {avg_acc * 100:.2f}%')
        return avg_acc

    def train(self, path, num_epochs=20):
      self.net.train()  # Set model to training mode
      best_accuracy = 0.0
      for epoch in range(num_epochs):
          total_loss = self.train_one_epoch()
          print(f'Loss: {total_loss:.2f}')
          if epoch > 0 and epoch % 5 == 0: 
              avg_acc = self.compute_test_accuracy(path)
              if avg_acc > best_accuracy:
                  best_accuracy = avg_acc

## Training (5 pts)
Follow these steps to train and evaluate your models (AlexNet, ResNet, and ViT):
* Create the model, the dataset, and the optimizer. We suggest using SGD with a learning rate of `1e-2`, but you are welcome to explore other options.
* Configure the trainer.
* Compute and print test accuracy before training.
* Train the model.
* Compute and print test accuracy after training.


In [59]:
dataset = CIFAR10Dataset()
cp_path = './checkpoints/alexnet.pt'
# AlexNet train and evaluation
print('############ AlexNet ###############')
# alexnet = AlexNet(num_classes=10)
# optim = torch.optim.Adam(alexnet.parameters(), lr=1e-3)
# alexnet_trainer = Trainer(dataset, alexnet, optim)
# alexnet_trainer.compute_test_accuracy(cp_path)
# alexnet_trainer.train(cp_path)
# ResNet train and evaluation
print('############ ResNet ###############')
# cp_path = './checkpoints/resnet.pt'
# resnet = ResNet(num_classes=10)
# optim = torch.optim.Adam(resnet.parameters(), lr=1e-3)
# resnet_trainer = Trainer(dataset, resnet, optim)
# resnet_trainer.compute_test_accuracy(cp_path)
# resnet_trainer.train(cp_path)

# ViT train and evaluation
print('############ ViT ###############')
cp_path = './checkpoints/vit.pt'
vit = VisionTransformer(depth=1)
optim = torch.optim.Adam(vit.parameters(), lr=1e-3)
vit_trainer = Trainer(dataset, vit, optim)
vit_trainer.compute_test_accuracy(cp_path)
vit_trainer.train(cp_path)

############ AlexNet ###############
############ ResNet ###############
############ ViT ###############
Average Accuracy: 6.10%
Loss: 672.07
Loss: 541.00
Loss: 482.46
Loss: 444.26
Loss: 420.28
Loss: 396.21
Average Accuracy: 60.67%


KeyboardInterrupt: 

## Evaluation using Confusion Matrix (5 pts)
A confusion matrix is a fundamental tool for evaluating the performance of classification models. Each row of the matrix represents the instances in an actual class while each column represents the instances in a predicted class.

You are asked to evaluate your trained model by computing and printing the confusion matrix. You can either compute it by yourself or use sklearn.metrics.confusion_matrix().

In [None]:
# TODO

## Observations (15 pts)
Write down your observations regarding the results you obtained throughout this assignment. Here are some suggestions:
* **Accuracy and Loss Curves**: Plot and compare the training and validation accuracy and loss curves for each model. This helps visualize how well each model is learning over time and whether they are overfitting or underfitting.
* **Top Misclassified Images**: Examine the classes that are most frequently misclassified by each model. This can provide insights into the types of images that are challenging for each model and may suggest areas for improvement.
* **Feature Visualization**: Visualize the feature maps or activations of intermediate layers in each CNN. This can help you understand what features or patterns each model is learning and whether they differ in terms of learned representations.
* **Robustness Testing**: Assess the robustness of each model by introducing noise, transformations, or adversarial examples to the test data. This can help identify which models are more resilient to perturbations.
* **Runtime and Resource Usage**: Compare the training time and resource usage (e.g., GPU memory) of each model.
* **Hyperparameter Tuning**: Analyze the impact of hyperparameters (learning rates, batch sizes, etc.) on training speed and convergence.
* **Model Size and Efficiency**: Analyze the trade-off between model size and accuracy for each model.
* **Ablation Studies**: Conduct ablation studies by removing or modifying specific components (e.g., dropout, batch normalization, etc.) of each model to understand their contributions to performance.

You don't need to follow them. Feel free to write down any observation you have, or to use tools like Tensorboard to support your observations. You are also welcome to give comments on the design of the assignment.

## **TODO: write down your observations**