# CS2420: Problem Set 2



> Harvard CS 2420: Computing at Scale (Fall 2025)
>
> Instructor: Professor HT Kung


### **Assignment Instructions**

Read the following instructions carefully before starting the assignment and again before submitting your work:

* This problem set must be completed in a team of 3-4 students.  **If you do not have a three or four person group, please email the TFs immediately.**
* **We suggest you start right away.** While you will only need to write around **30 lines or fewer of code** per code cell, we expect this assignment to take **3-7 days** of significant effort for a group of three to four (training the networks takes a long time, so don't start at the last second).
* The assignment consists of two files: **this Google Colab file** (an `.ipynb` file) and a **LaTeX answer template** (available on Canvas).
* The Google Colab contains all assignment instructions and *Code Cells* that you will use to implement the programming components of the assignment (in Python).
* We provide a significant amount of the code to make it easier to get started. In the *Code Cells*, please add comments to explain the purpose of each line of code in your implementation. **You will not receive credit for implementations that are not well documented.**
* All output from the Colab Notebook **must** be saved. Do not delete code, comment it out if needed. TFs must be able to reproduce your results. **You will not receive credit for implementations that are not reproducible.**
* <font color='red'>**Deliverables are highlighted in red**</font> in this Google Colab file. Use the LaTeX answer template to write down answers for these deliverables.
* Each group will **submit** a PDF of your answers, any logs from generative AI, and your Google Colab file (.ipynb file) containing all completed *Code cells* to "Problem Set 2" on Canvas. Only one submission per group. Check your .ipynb file using this [tool](https://htmtopdf.herokuapp.com/ipynbviewer/) before submitting to ensure that you completed all *Code Cells* (including detailed comments).
* The assignment is due on **10/6/2025 at 11:59 PM ET**.


-----
Outline of this assignment:

1. **Getting Started** [20 points]: In this section, you will train a convolutional neural network in PyTorch on the CIFAR-10 image classification dataset. This section will introduce you to the PyTorch machine learning library and training neural networks.


2. **Post-training Quantization** [60 points]: In this section, you will perform post-training quantization of weights on a small fully-connected neural network trained on the MNIST classification dataset. This section reinforces your understanding of quantization and will give you a concrete understanding of the limitations of quantization.

3. **Conv Pruning** [60 points]:  In this section, you will implement a simplified version of structured filter pruning based on [Pruning Filters for Efficient ConvNets](https://openreview.net/pdf?id=rJqFGTslg) and non-structured pruning based on [Learning both Weights and Connections for Efficient Neural Networks
](https://arxiv.org/abs/1506.02626).

4. **Parameter Efficient Fine-tuning** [35 points]: In this section, you will fine-tune a ResNet-18 model using three methods for updating the model: 1) updating all of the models parameters 2) freezing earlier layers and updating the later layers 3) a combinations of frozen layers and low rank updates based on [LoRA: Low-Rank Adaptation of Large Language Models](https://arxiv.org/pdf/2106.09685.pdf).

5. **Transformer Architecture** [35 points]: In this section, you will be implementing the individual components of the transformer model based on the paper [Attention is All You Need](https://arxiv.org/abs/1706.03762). This section will provide further intuition on the attention mechanism and provide you a high-level overview of how data flows through the transformer model.

6. **Depthwise Separable Convolution** [40 points]: In this section, you will implement depthwise separable convolutions and train a depthwise convolutional neural network on the CIFAR-10 image classification dataset. This section will reinforce your understanding of depthwise convolution operations and improve your PyTorch skills.

---

### **1. Getting Started**

---
First, make a copy of this Google Colab by navigating to `File->Save a copy in Drive`. Share the copy with all other group members. You will modify the *Code Cells* in your new copy to complete the assignment.


While you are reading through the material, please execute each code cell in order, by either selecting the code cell and pressing the play symbol (▶) on the left side of the code cell or by using the hotkey combination: Shift + Enter.

It's important to note that simultaneously running the same Colab notebook on different computers can introduce version conflicts. One method to overcome this is to assign a person in your group to control the notebook while others work on another (copied) notebook to test out their code.


*Code Cell 1.0* imports libraries used throughout the assignment and *Code Cell 1.1* defines model testing and training functions. We recommend reading through and understanding *Code Cell 1.1* before working through the rest of the assignment.

In [None]:
# nvidia-smi allows you to check if a GPU device has been enabled in Colab
!nvidia-smi

# If you did not see a GPU device after running the above command, go to:
# 'Runtime --> Change runtime type --> Hardware accelerator' and select 'gpu'.
# Then, click 'Runtime --> restart runtime'.

# You should use a GPU device for training models as it will *significantly*
# accelerate training compared to using a CPU device.

In [None]:
#@title Code Cell 1.0: Imports

import sys
import time
import os
import math

import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
import torchvision.datasets as datasets
import torchvision.transforms as transforms

from copy import deepcopy

from torch.autograd import Variable

## Tested Environment

We have tested Problem Set 2 under the following setup:

| Library     | Package Version | CUDA Version | Display Version  |
|-------------|-----------------|--------------|------------------|
| PyTorch     | 2.8.0           | 12.6         | 2.8.0+cu126      |
| Torchvision | 0.23.0          | 12.6         | 0.23.0+cu126     |

You may use the same versions of PyTorch and Torchvision for your Problem Set 2.

In [None]:
#@title Code Cell 1.1: Training/Test Functions

# tracks the highest accuracy observed so far
best_acc = 0

def moving_average(a, n=100):
    '''Helper function used for visualization'''
    ret = torch.cumsum(torch.Tensor(a), 0)
    ret[n:] = ret[n:] - ret[:-n]
    return ret[n - 1:] / n

def train(net, epoch, loader, criterion, optimizer, loss_tracker = [], acc_tracker = []):
    net.train()
    train_loss = 0
    correct = 0
    total = 0
    for batch_idx, (inputs, targets) in enumerate(loader):
        inputs, targets = inputs.to(device), targets.to(device)
        optimizer.zero_grad()
        outputs = net(inputs)
        loss = criterion(outputs, targets)
        loss.backward()
        # update optimizer state
        optimizer.step()
        # compute average loss
        train_loss += loss.item()
        loss_tracker.append(loss.item())
        loss = train_loss / (batch_idx + 1)
        # compute accuracy
        _, predicted = outputs.max(1)
        total += targets.size(0)
        correct += predicted.eq(targets).sum().item()
        acc = 100. * correct / total
        # Print status
        sys.stdout.write(f'\rEpoch {epoch}: Train Loss: {loss:.3f}' +
                         f'| Train Acc: {acc:.3f}')
        sys.stdout.flush()
    acc_tracker.append(acc)
    sys.stdout.flush()

def test(net, epoch, loader, criterion, loss_tracker = [], acc_tracker = []):
    global best_acc
    net.eval()
    test_loss = 0
    correct = 0
    total = 0
    with torch.no_grad():
        for batch_idx, (inputs, targets) in enumerate(loader):
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = net(inputs)
            loss = criterion(outputs, targets)

            test_loss += loss.item()
            loss_tracker.append(loss.item())
            _, predicted = outputs.max(1)
            total += targets.size(0)
            correct += predicted.eq(targets).sum().item()

            loss = test_loss / (batch_idx + 1)
            acc = 100.* correct / total
    sys.stdout.write(f' | Test Loss: {loss:.3f} | Test Acc: {acc:.3f}\n')
    sys.stdout.flush()

    # Save checkpoint.
    acc = 100.*correct/total
    acc_tracker.append(acc)
    if acc > best_acc:
        state = {
            'net': net.state_dict(),
            'acc': acc,
            'epoch': epoch,
        }
        if not os.path.isdir('checkpoint'):
            os.mkdir('checkpoint')
        torch.save(state, './checkpoint/ckpt.pth')
        best_acc = acc

**Loading the Train and Test Datasets**

For this assignment, we will use the CIFAR-10 dataset. It contains 10 object classes, where each sample is a color image (RGB channels) with a spatial resolution of 32 x 32 pixels. More details here: https://www.cs.toronto.edu/~kriz/cifar.html. *Code Cell 1.2* will take 1-2 minutes to execute as it downloads the training and test datasets.

In [None]:
#@title Code Cell 1.2

# Load training data
transform_train = transforms.Compose([
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])
trainset = torchvision.datasets.CIFAR10(root='./data', train=True,
                                        download=True,
                                        transform=transform_train)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=128,
                                          shuffle=True, num_workers=2)

# Load testing data
transform_test = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])
testset = torchvision.datasets.CIFAR10(root='./data', train=False,
                                       download=True,
                                       transform=transform_test)
testloader = torch.utils.data.DataLoader(testset, batch_size=128, shuffle=False,
                                         num_workers=2)
print('Finished loading datasets!')

**Constructing our Convolutional Neural Network (CNN)**

For this assignment, we will use a 10-layer CNN which we call `ConvNet` that is provided in *Code Cell 1.3*. The CNN has 9 convolutional layers (`nn.Conv2d`) followed by 1 fully connected (`nn.Linear`) layer. The Batch Normalization layers (`nn.BatchNorm2d`) help make the training process more stable and the ReLU layers (`nn.ReLU`) are the non-linear activation functions required for learning when stacking multiple convolutional layers together.

In this assignment, you will modify `ConvNet` to implement Depthwise Separable Convolutions (Section 2) and Pruning (Section 4).

In [None]:
#@title Code Cell 1.3

def conv_block(in_channels, out_channels, kernel_size=3, stride=1,
               padding=1):
    '''
    A nn.Sequential layer executes its arguments in sequential order. In
    this case, it performs Conv2d -> BatchNorm2d -> ReLU. This is a typical
    block of layers used in Convolutional Neural Networks (CNNs). The
    ConvNet implementation below stacks multiple instances of this three layer
    pattern in order to achieve over 90% classification accuracy on CIFAR-10.
    '''
    return nn.Sequential(
        nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding,
                  bias=False),
        nn.BatchNorm2d(out_channels),
        nn.ReLU(inplace=True)
        )

class ConvNet(nn.Module):
    '''
    A 9 layer CNN using the conv_block function above. Again, we use a
    nn.Sequential layer to build the entire model. The Conv2d layers get
    progressively larger (more filters) as the model gets deeper. This
    corresponds to spatial resolution getting smaller (via the stride=2 blocks),
    going from 32x32 -> 16x16 -> 8x8. The nn.AdaptiveAvgPool2d layer at the end
    of the model reduces the spatial resolution from 8x8 to 1x1 using a simple
    average across all the pixels in each channel. This is then fed to the
    single fully connected (linear) layer called classifier, which is the output
    prediction of the model.
    '''
    def __init__(self):
        super(ConvNet, self).__init__()
        self.model = nn.Sequential(
            conv_block(3, 32),
            conv_block(32, 32),
            conv_block(32, 64, stride=2),
            conv_block(64, 64),
            conv_block(64, 64),
            conv_block(64, 128, stride=2),
            conv_block(128, 128),
            conv_block(128, 256),
            conv_block(256, 256),
            nn.AdaptiveAvgPool2d(1)
            )

        self.classifier = nn.Linear(256, 10)

    def forward(self, x):
        '''
        The forward function is called automatically by the model when it is
        given an input image. It first applies the 8 convolution layers, then
        finally the single classifier layer.
        '''
        h = self.model(x)
        B, C, _, _ = h.shape
        h = h.view(B, C)
        return self.classifier(h)

**Training ConvNet on the CIFAR-10 Dataset**

Now that we have loaded our training and testing datasets, and created our model, it is time to train the model. This training process will take around 15 seconds per epoch (an epoch is an entire pass through the training dataset). Usually, we need to train models for many epochs in order to achieve good classification accuracy. For this assignment, we will train most models for 100 epochs. This means that the training process takes roughly 15 seconds * 100 = 25 minutes. **Please do not close or refresh the Colab instance during training and make sure to export results if you plan on using them in the future.** Note that the Google Colab instance is non-persistent, so if the session is left idle for a period of time (such as 30-minutes), the state of the machine will be lost. When this happens, you must execute all Code Cells, in order, up to your current point of progress. To avoid this, you may want to ensure that your session is kept alive by not leaving it idle.

*Code Cell 1.4* provides a `train` function that will perform one epoch worth of training each time it is called. The `test` function will evaluate the performance of the model on the held-out test set.


---
<font color='red'>**PART 1.1:**</font> [5 points]

<font color='red'>**Deliverables**</font>
1. In *Code Cell 1.4*, set the number of epochs (`epochs`) to `5`. Train the model 3 separate times, with learning rate (`lr`) set to `0.0001`, `0.1`, and `1.0`.
2. For each model run over 5 epochs, plot the training loss (`train_loss_tracker`) and test accuracy (`test_acc_tracker`). For the training loss, apply the provided `moving_average` function on `train_loss_tracker` before plotting to smooth out the loss curve.

3. Plot the training loss and test accuracy for each run with the different learning rates `0.0001`, `0.1`, and `1.0`. (x-axis is epoch)
4. Describe the difference in trends for each learning rate run. Which learning rate seemed to work best? Explain why you think it did best relative to the other learning rates. (100 words maximum)
---


<font color='red'>**PART 1.2:**</font> [10 points]
* Set the learning rate (`lr`) to the best setting observed in <font color='red'>**PART 1.1**</font>.
* Change the number of epochs (`epochs`) from `5` to `100`.

Since you are training the model for 100 epochs, it will take roughly 25-30 minutes to complete. We use a deterministic random seed for weight initialization to remove variability between runs.

Adjusting the learning rate during training is a common strategy used to improve the final accuracy of the CNN. We will compare two different methods (*MultiStep* and *CosineAnnealing*) to adjust the learning rate.  
Please check the PyTorch documentation for [MultiStepLR](https://pytorch.org/docs/stable/optim.html?highlight=multistep#torch.optim.lr_scheduler.MultiStepLR) and [CosineAnnealingLR](https://pytorch.org/docs/stable/optim.html?highlight=cosineannealinglr#torch.optim.lr_scheduler.CosineAnnealingLR) to determine how to use them.

<font color='red'>**Deliverables**</font>


1. First try the [MultiStepLR](https://pytorch.org/docs/stable/optim.html?highlight=multistep#torch.optim.lr_scheduler.MultiStepLR) learning rate scheduler. Set `lr_scheduler` (*Line 96 in Code Cell 1.4*) as 'multistep'. In addition, you also need to set the milestones (`milestones`) used in the scheduler to decrease the learning rate by a factor of 10 every 25 epochs. Train the network for 100 epochs and plot the training loss and test accuracy.
2. Then try the [CosineAnnealingLR](https://pytorch.org/docs/stable/optim.html?highlight=cosineannealinglr#torch.optim.lr_scheduler.CosineAnnealingLR) learning rate scheduler. Set `lr_scheduler` (*Line 96 in Code Cell 1.4*) as 'cosine_annealing'. Train the network for 100 epochs and plot the training loss and test accuracy (where the x-axis is the number of epochs).
3. Describe the trends of the learning curves with *MultiStep* and *CosineAnnealing* respectively. (50 words maximum)
4. Record and report the provided total running time.
---


In [None]:
#@title Code Cell 1.4

torch.manual_seed(43) # to give stable randomness

device = 'cuda'
net = ConvNet()
net = net.to(device)

# PART 1.1: set the learning rate (lr) used in the optimizer.
lr =

# PART 1.1: Modify this to train for a short 5 epochs
# PART 1.2: Modify this to train a longer 100 epochs
epochs =

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(net.parameters(), lr=lr, momentum=0.9,
                            weight_decay=5e-4)

# PART 1.2: try different learning rate scheduler
scheduler_name=    # set this to 'multistep' or 'cosine_annealing'
if scheduler_name=='multistep':
    milestones =
    gamma      =
    scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer,
                                                    milestones=milestones,
                                                    gamma=gamma)
elif scheduler_name=='cosine_annealing':
    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=epochs)

else:
    NotImplementedError

# Records the training loss and training accuracy during training
train_loss_tracker, train_acc_tracker = [], []

# Records the test loss and test accuracy during training
test_loss_tracker, test_acc_tracker = [], []

print('Training for {} epochs, with learning rate {} and milestones {}'.format(
      epochs, lr, milestones))

start_time = time.time()
for epoch in range(0, epochs):
    # train(net, epoch, train_loss_tracker, train_acc_tracker, trainloader)
    # test(net, epoch, test_loss_tracker, test_acc_tracker, testloader)
    train(net=net, epoch=epoch, loader=trainloader, criterion=criterion, optimizer=optimizer, loss_tracker=train_loss_tracker, acc_tracker=train_acc_tracker)
    test(net=net, epoch=epoch, loader=testloader, criterion=criterion, loss_tracker=test_loss_tracker, acc_tracker=test_acc_tracker)
    scheduler.step()

total_time = time.time() - start_time
print('Total training time: {} seconds'.format(total_time))

---

### **2. Post-training Quantization**

---

In this section, you will perform post-training quantization of weights on a multi-layer perceptron with fully connected layers to reduce its memory requirements.
We will start by training a small neural network on the [MNIST](http://yann.lecun.com/exdb/mnist/) dataset. Then, we will quantize its weights and verify that performance does not degrade significantly.

In [None]:
#@title Code Cell 2.1

from six.moves import urllib
opener = urllib.request.build_opener()
opener.addheaders = [('User-agent', 'Mozilla/5.0')]
urllib.request.install_opener(opener)

# Run this code cell to train MNIST neural network. Do not modify!
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.optim.lr_scheduler import StepLR

# Train an MNIST neural network. Run this code cell.
class MNISTNet(nn.Module):
    def __init__(self, hidden=128):
      super(MNISTNet, self).__init__()

      # First 2D convolutional layer, taking in 1 input channel (image),
      # outputting 32 convolutional features, with a square kernel size of 3
      self.hidden = hidden
      self.fc1 = nn.Linear(28*28*1, self.hidden)
      self.fc2 = nn.Linear(self.hidden, 10)

    def forward(self, x):
      x = x.view(-1, 28*28)
      x = self.fc1(x)
      x = F.relu(x)
      x = self.fc2(x)
      return F.log_softmax(x, dim=1)

def train_mnist(model, device, train_loader, optimizer, epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % 10000 == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item()))


def test_mnist(model, device, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += F.nll_loss(output, target, reduction='sum').item()  # sum up batch loss
            pred = output.argmax(dim=1, keepdim=True)  # get the index of the max log-probability
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)

    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, len(test_loader.dataset),
        100. * correct / len(test_loader.dataset)))

model = MNISTNet()
device = torch.device("cpu")

transform=transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.1307,), (0.3081,))
    ])
dataset1_mnist = datasets.MNIST('./data_mnist', train=True, download=True,
                    transform=transform)
dataset2_mnist = datasets.MNIST('./data_mnist', train=False,
                    transform=transform)
train_loader_mnist = torch.utils.data.DataLoader(dataset1_mnist)
test_loader_mnist = torch.utils.data.DataLoader(dataset2_mnist)

optimizer = optim.Adam(model.parameters())

scheduler = StepLR(optimizer, step_size=1, gamma=.1)
for epoch in range(1, 5):
    train_mnist(model, device, train_loader_mnist, optimizer, epoch)
    test_mnist(model, device, test_loader_mnist)
    scheduler.step()


---
<font color='red'>**PART 2.1:**</font> [20 points]

We will perform post-training quantization on the weights of a neural network. Recall from lecture that quantization maps high-precision values to low-precision values to reduce the number of bits used to store each value. In this section you will implement two functions:
* **quantize**: compresses a vector/matrix from 32-bit float to 8-bit integer
* **dequantize**: takes the output of **quantize** and recovers the original values (possibly with some error).

**Quantization Example**

Given a vector of 32-bit floats containing the values `[400, 500, 600]`, our goal is to quantize these values so that they are representable in 8 bits. With 8 bits, we can only represent the values 0 through 255 -- but we have a problem: 400, 500 and 600 are outside of this range! To solve this, we want to map the values of the vector such that each value is *approximately* representable by values between 0 and 255. To do this we:
1. Shift the vector so that the minimum value is 0
2. Scale the vector so that every value is between 0 and 255
3. Cast each value to an (8-bit) integer

For example, suppose we are quantizing `v=[400,500,600]`.
* After shifting would have `shift=400`, `v_shifted=[0,100,200]`.
* After scaling, we would have `scale=255/200`, `shift=400`, `v_shifted_scaled=[0*255/200, 100 *255/200, 200 *255/200] = [0, 127.5, 255]`, giving us `v_q=[0, 128, 255]` after rounding.

With this process we have successfully quantized the elements of `v` to be representable by 8 bits (with additional `scale` and shift `parameters`). This yields ~4x reduction in size for large vectors!

Remember, casting should be done at the very last step (after rounding). Please keep this in mind when implementing your functions below!

**Dequantization Example**

To dequantize, we do the opposite:
* Given: `v_q=[0,128,255]`, `shift=400`, `scale=255/200`
* Reverse scale: `v_unscaled = v_q / scale = [0, 100.392, 200]`
* Reverse shift: `v_unscaled + 400 = [400, 500.392, 600]`

As you can see, our dequantized values are very close to our original values (`[400,500,600]`), but there is some quantization error due to loss of information during quantization. As you will see in the next section, neural networks may tolerate these errors and still attain high accuracy.

<font color='red'>**Deliverables**</font>

1. In *Code Cell 2.2*, implement the `quantize` function, which takes a 1D or 2D NumPy array and quantizes each element to be representable with 8 bits (i.e., with an integer value between 0 and 255). The function should return a NumPy array with `dtype=uint8`. Keep in mind that the quantized values will need to be dequantized (converted back to a non-quantized form), so you may also return any other value you may find useful for dequantization.

2. In *Code Cell 2.2*, implement the `dequantize` function which takes in a NumPy array with `dtype=uint8` (and any other parameters you think necessary), and attempts to recover the values from before quantization. Because quantizing to 8 bits loses information, you may not necessarily obtain the exact weight values from before quantization. Ideally, the quantization error is small enough to maintain good inference accuracy.

3. In *Code Cell 2.2*, verify that both your `quantize` and `dequantize` functions work by making sure it passes the test case. The test case checks that your quantization function achieves a 4x reduction in memory and asserts that the dequantized data is within some error threshold of the original matrix. You should see "Success!" if your method passes a test case.

4. How much smaller (in bits) is the quantized data than the original? (Remember to account for the extra arguments you added!) Is it 4x? If not, why might this be? (50 words maximum)


In [None]:
#@title Code Cell 2.2

import numpy as np
import sys

def quantize(W):
    # PART 2.1: Implement!
    # Hint: you may return extra parameters
    # Hint: our solution is 3-5 lines of code
    # PART 2.2: Implement!
    pass

def dequantize(W, extra_args):
    # PART 2.1 Implement!
    # Hint: you may pass extra args through the extra_args parameter to assist dequantization
    # Hint: our solution is 3-5 lines of code
    pass

def test_cases():
    def count_bytes(x):
        return sys.getsizeof(x)
    def did_pass(x, thresh=2):
        bytes_original = count_bytes(x.astype(np.float32))
        quantized, args = quantize(x)
        bytes_quantized = count_bytes(quantized)
        dequantized = dequantize(quantized, args)

        smaller = bytes_quantized <= bytes_original//2.3
        mean_err = np.mean(np.abs(dequantized-x))
        accurate = mean_err < thresh

        if not smaller:
            print("FAIL: Size original %d vs quantized %d" % (bytes_original, bytes_quantized))
            return False
        if not accurate:
            print("FAIL: Mean error %f above threshold %f" % (mean_err, thresh))
            return False
        print("Success!")
        return True

    did_pass(np.array([i for i in range(100)]))
    did_pass(np.array([i*.5 for i in range(100)]))
    did_pass(np.random.normal(0, 3, size=(100, 100)))
    did_pass(np.random.uniform(0, 10, size=(100, 100)))
    did_pass(np.random.uniform(5, 10, size=(100, 100)))
    did_pass(np.random.uniform(-5, -10, size=(100, 100)))
    did_pass(np.random.normal(-10, 1, size=(100, 100)))
    did_pass(np.random.uniform(100, 100+255, size=(20, 40)))
test_cases()

---
<font color='red'>**PART 2.2:**</font> [20 points]

We will now evaluate the performance of the neural network with quantization.

<font color='red'>**Deliverables**</font>

1. Quantize each parameter of the model to 8 bits and dequantize (this injects simulated quantization error into the parameters). Then, create a new model from the dequantized parameters.
2. Evaluate the above model and report the test accuracy. What was the drop/increase in accuracy versus the original full-precision model? (50 words maximum)
3. Quantize to 4 bits instead of 8 and dequantize. What accuracy is achieved from these new 4-bit quantized-and-dequantized parameters? What was the drop/increase in accuracy versus the original full-precision model? (50 words maximum)
4. Quantize to 2 bits instead of 4 and dequantize. What accuracy is achieved from these new 2-bit quantized-and-dequantized parameters? What was the drop/increase in accuracy versus the original full-precision model? (50 words maximum)

In [None]:
#@title Code Cell 2.3

print("Baseline Score")
test_mnist(model, device, test_loader_mnist)

# PART 2.2: Quantize parameters!
state_dict = model.state_dict()
state_dict_q = {}
pass

print("Quantized performance")
test_mnist(model, device, test_loader_mnist)


---
<font color='red'>**PART 2.3:**</font> [20 points]

We will now evaluate the performance of the neural network with Stochastic Rounding.

<font color='red'>**Deliverables**</font>

1. Modify the `quantize` function in *Code Cell 2.2* to implement **Stochastic Rounding** instead of standard Banker's Rounding. Quantize each parameter of the model to 2 bits and dequantize. Repeat the experiment 10 times in Code Cell 2.4 and answer the following questions. What accuracy is achieved from these new Stochastic Rounded parameters? What was the drop/increase in accuracy versus the original 2-bit model that uses standard Banker's Rounding? (50 words maximum)

In [None]:
#@title Code Cell 2.4

# PART 2.3: Modify Code Cell 2.2 to implement Stochastic Rounding
# TODO: Use Code Cell 2.3 as a reference to evaluate the performance of Stochastic Rounding

# PART 2.3 Implement!

print("Quantized performance")
test_mnist(model, device, test_loader_mnist)

---

### **3. Structured and Non-structured Filter Pruning**

---

In this section, you will implement a simplified version of structured filter pruning proposed in [Pruning Filters for Efficient ConvNets](https://openreview.net/pdf?id=rJqFGTslg). Instead of pruning weights, this paper describes removing whole filters from each convolutional layer in a CNN. Compared to pruning weights across the network, filter pruning is a naturally structured pruning method that does not introduce irregular sparsity. Therefore, it does not require using sparse libraries or specialized hardware.
For each convolutional layer, we measure each filter’s relative importance by its absolute weight sum $\sum|\mathcal{F}_{i,j}|$ (i.e., its $\ell_1$-norm). When pruning a layer, $m$ filters with the smallest relative importance will be pruned, where $m$ = (prune percentage $\times$ total number of filters in this layer).

In *Code Cell 3.1*, we provide a `SparseConv2d` layer, which is similar to the standard `nn.Conv2d` layer, but adds a `mask` tensor. The `mask` is used to zero out filters in this layer before the convolution operation is performed. Initially, `mask` is set to all ones, meaning the `SparseConv2d` layer's behavior will be identical to the `nn.Conv2d` layer.

Besides structured pruning, we will also implement non-structured pruning proposed in [Learning both Weights and Connections for Efficient Neural Networks
](https://arxiv.org/abs/1506.02626) for comparsion. Non-structured pruning is more flexible than structured pruning, and allows irregular sparsity in the weight tensor.


In [None]:
#@title Code Cell 3.1


def _make_pair(x):
    if hasattr(x, '__len__'):
        return x
    else:
        return (x, x)

class SparseConv2d(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, stride=1,
                     padding=1):
        super(SparseConv2d, self).__init__()
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.kernel_size = kernel_size
        self.stride = _make_pair(stride)
        self.padding = _make_pair(padding)

        # initialize weights of this layer
        self._weight = nn.Parameter(torch.randn([self.out_channels, self.in_channels,
                                                        self.kernel_size, self.kernel_size]))
        stdv = 1. / math.sqrt(in_channels)
        self._weight.data.uniform_(-stdv, stdv)
        # initialize mask
        # Since we are going to zero out the whole filter, the number of
        # elements in the mask is equal to the number of filters.
        self.register_buffer('_mask', torch.ones(out_channels))


    def forward(self, x):
        return F.conv2d(x, self.weight, stride=self.stride,
                        padding=self.padding)

    @property
    def weight(self):
        # check out https://pytorch.org/docs/stable/notes/broadcasting.html
        # to better understand the following line
        return self._mask[:,None,None,None] * self._weight

---
<font color='red'>**PART 3.1:**</font> [5 points]

<font color='red'>**Deliverables**</font>

1. In *Code Cell 3.2*, implement `SparseConvNet` using the `sparse_conv_block` in a similar fashion to *Code Cell 1.3*. Replace all the `nn.Conv2d` layers in `ConvNet` with the `SparseConv2d` layers provided in *Code Cell 3.1*. Use the following table to complete the network:

|Layer #|in_channels|out_channels|stride|
|-------|-----------|------------|------|
|1|3|32|1|
|2|32|32|1|
|3|32|64|2|
|4|64|64|1|
|5|64|64|1|
|6|64|128|2|
|7|128|128|1|
|8|128|256|1|
|9|256|256|1|

---

In [None]:
#@title Code Cell 3.2

def sparse_conv_block(in_channels, out_channels, kernel_size=3, stride=1,
                      padding=1):
    '''
    Replaces 3x3 nn.Conv2d with 3x3 SparseConv2d
    '''
    return nn.Sequential(
        SparseConv2d(in_channels, out_channels, kernel_size, stride, padding),
        nn.BatchNorm2d(out_channels),
        nn.ReLU(inplace=True)
        )

class SparseConvNet(nn.Module):
    '''
    A 9 layer CNN using the sparse_conv_block function above.
    PART 3.1: Implement!
    '''
    def __init__(self):
        super(SparseConvNet, self).__init__()
        # PART 3.1: Implement!
        # self.model = nn.Sequential(...)

        self.classifier = nn.Linear(256, 10)

    def forward(self, x):
        '''
        PART 3.1: Implement!
        '''
        h = self.model(x)
        B, C, _, _ = h.shape
        h = h.view(B, C)
        return self.classifier(h)

---
<font color='red'>**PART 3.2:**</font> [5 points]

<font color='red'>**Deliverables**</font>
1. Using *Code Cell 3.3*, train `SparseConvNet` for 5 epochs with a learning rate of $0.1$. Confirm that this performance is approximately equal to what you observed in <font color='red'>**PART 1.1**</font>. (Note that this current model is not sparse, as no pruning has yet been performed. You will implement pruning in the next part. The purpose here is to validate that the performance is the same as the standard convolution.)
2. Plot the training error and test accuracy of `SparseConvNet` trained over 5 epochs. (x-axis is epoch)
---


<font color='red'>**PART 3.3:**</font> [25 points]

<font color='red'>**Deliverables**</font>
1. Implement the `filter_l1_pruning` function in *Code Cell 3.3* and set the pruning schedule (using `prune_percentage` and `prune_epoch`) to prune an additional 10% filters every 10 epochs, starting at epoch 10, ending at epoch 50. By the end, you should achieve 50% sparsity for each convolution layer in the CNN. For simplicity, you do not need to prune the `nn.Linear` layer, which is the final layer in the model.
2. Train `SparseConvNet` for 100 epochs using the same learning rate and `MultiStep` learning rate schedule as in <font color='red'>**PART 1.2**</font>.
3. Compare the test accuracy curves against the baseline `ConvNet` model from <font color='red'>**PART 1.2**</font> in a single plot.
4. Describe any observations in trends of test accuracy related to the pruning stages. (150 words maximum)

---

In [None]:
#@title Code Cell 3.3

torch.manual_seed(43) # to give stable randomness

def get_sparse_conv2d_layers(net):
    '''
    Helper function which returns all SparseConv2d layers in the net.
    Use this below to implement layerwise pruning.
    '''
    sparse_conv_layers = []
    for layer in net.children():
        if isinstance(layer, SparseConv2d):
            sparse_conv_layers.append(layer)
        else:
            child_layers = get_sparse_conv2d_layers(layer)
            sparse_conv_layers.extend(child_layers)

    return sparse_conv_layers

def filter_l1_pruning(net, prune_percent):
    for i, layer in enumerate(get_sparse_conv2d_layers(net)):
        num_nonzero = layer._mask.sum().item()
        num_total = len(layer._mask)
        num_prune = round(num_total * prune_percent)
        sparsity = 100.0 * (1 - (num_nonzero / num_total))
        print(num_prune, num_total, prune_percent)

        # PART 3.3: Implement pruning by settings elements in layer._mask
        #           to zero corresponding to the smallest l1-norm filters
        #           in layer._weight. Note: to update variable such as
        #           layer._mask and layer._weight, do the following:
        #           layer._mask.data[idx] = 0
        pass


device = 'cuda'
net = SparseConvNet()
net = net.to(device)

# Set these parameters based on PART 1.2
lr =
milestones =

# PART 3.3: Set this prune an additional 10% every 10 epochs, starting at
#           epoch 10, ending at epoch 50. By the end, you should achieve
#           50% sparsity for each convolution layer in the CNN. Current
#           paramaters indicate 10% pruning at the end of epoch 0.
prune_percentage =
prune_epoch =

epochs = 100

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(net.parameters(), lr=lr, momentum=0.9,
                            weight_decay=5e-4)
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer,
                                                 milestones=milestones,
                                                 gamma=0.1)

train_loss_tracker, train_acc_tracker = [], []
test_loss_tracker, test_acc_tracker = [], []

print('Training for {} epochs, with learning rate {} and milestones {}'.format(
      epochs, lr, milestones))

start_time = time.time()
for epoch in range(0, epochs):
    train(net=net, epoch=epoch, loader=trainloader, criterion=criterion, optimizer=optimizer, loss_tracker=train_loss_tracker, acc_tracker=train_acc_tracker)

    if epoch == prune_epoch:
        print('Pruning at epoch {}'.format(epoch))
        filter_l1_pruning(net, prune_percentage)
        # unstructured_pruning(net, prune_percentage)

    test(net=net, epoch=epoch, loader=testloader, criterion=criterion, loss_tracker=test_loss_tracker, acc_tracker=test_acc_tracker)
    scheduler.step()


total_time = time.time() - start_time
print('Total training time: {} seconds'.format(total_time))

---
<font color='red'>**PART 3.4:**</font> [25 points]

<font color='red'>**Deliverables**</font>
1. Use the `SparseConv2d` (the layer using *unstructured pruning*) in *Code Cell 3.4* to replace the `SparseConv2d` (the layer using *structured pruning*) in *Code Cell 3.1*.
2. Implement the `unstructured_pruning` function in *Code Cell 3.4* and use it to replace the `filter_l1_pruning` function in *Code Cell 3.3*. Or make a copy of *Code Cell 3.3*.
3. Re-run training with `unstructured_pruning` (i.e., *Code Cell 3.1, 3.2, 3.3*). You may make copies of those code cells if that is easier for you.
4. Compare the 3 test accuracy curves among the baseline ConvNet model from <font color='red'>**PART 1.2**</font>, the structured pruning model from <font color='red'>**PART 3.3**</font>, and the unstructured pruning model from <font color='red'>**PART 3.4**</font> in a single plot.
5. Describe any observations in the comparison of structured and unstructured pruning. (150 words maximum)

---

In [None]:
#@title Code Cell 3.4

class SparseConv2d(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, stride=1,
                     padding=1):
        super(SparseConv2d, self).__init__()
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.kernel_size = kernel_size
        self.stride = _make_pair(stride)
        self.padding = _make_pair(padding)

        # initialize weights of this layer
        self._weight = nn.Parameter(torch.randn([self.out_channels, self.in_channels,
                                                        self.kernel_size, self.kernel_size]))
        stdv = 1. / math.sqrt(in_channels)
        self._weight.data.uniform_(-stdv, stdv)
        # initialize mask
        # Since we are going to zero out the whole filter, the number of
        # elements in the mask is equal to the number of filters.
        self.register_buffer('_mask', torch.ones_like(self._weight))


    def forward(self, x):
        return F.conv2d(x, self.weight, stride=self.stride,
                        padding=self.padding)

    @property
    def weight(self):
        return self._mask * self._weight


def unnstructured_pruning(net, prune_percent):
    for i, layer in enumerate(get_sparse_conv2d_layers(net)):
        num_nonzero = layer._mask.sum().item()
        num_total = layer._mask.numel()
        num_prune = round(num_total * prune_percent)
        sparsity = 100.0 * (1 - (num_nonzero / num_total))
        print(num_prune, num_total, prune_percent)

        # PART 3.4: Implement pruning by settings elements in layer._mask
        #           to zero corresponding to weights with the smallest magnitude
        #           in layer._weight.
        pass

---

### **4. Parameter Efficient Fine-tuning**

---

In this section, you will be modifying a model to enable parameter efficient fine-tuning (PEFT).
We will combine two techniques: freezing portions of a model, and applying low-rank updates.


Specifically, you will using the library from [LoRA: Low-Rank Adaptation of Large Language Models](https://arxiv.org/abs/2106.09685).
We recommend reading Section 4 as it describes the method in detail.
LoRA can decrease the computation and memory requirements during training by reducing the number of parameters updated.

Instead of simply applying a full update $\Delta W$ to a frozen weight matrix $W_0 \in R^{d \times k}$, we can represent that update using low-rank decomposition. Namely, $\Delta W = BA$, where $B \in R^{d \times r}$ , $ A \in R^{r \times k}$, and the rank $r << min(d, k)$, resulting in:

$$
W_0 + \Delta W = W_0 + BA
$$

As we are using *low-rank approximation*, it would be more precise to state that $\Delta W \approx BA$, resulting in:

$$
W_0 + \Delta W \approx W_0 + BA
$$


In this section you will be responsible for creating a partially frozen network that applies low-rank updates to a classifier when training on a new [dataset](https://datashare.ed.ac.uk/handle/10283/3192). Code Cells 4.0a-c download and initialize the `cinic-10` dataset, as well as install the [LoRA library](https://github.com/microsoft/LoRA/tree/main).

In summary, you will:
1. Load pre-trained ResNet-18
2. Modify the network to have frozen early layers
3. Further modify the network to have a LoRA classifier layer
4. Compare the fine-tuning performance of: unfrozen, partially frozen, and partially frozen + LoRA models

In [None]:
#@title Code Cell 4.0a
!mkdir -p data/cinic-10
!curl -L https://datashare.is.ed.ac.uk/bitstream/handle/10283/3192/CINIC-10.tar.gz | tar xz -C data/cinic-10

In [None]:
#@title Code Cell 4.0b
cinic_traindir = os.path.join("data", "cinic-10", "train")
cinic_testdir = os.path.join("data", "cinic-10", "test")

cinic_mean = [0.47889522, 0.47227842, 0.43047404]
cinic_std = [0.24205776, 0.23828046, 0.25874835]
cinic_normalize = transforms.Normalize(mean=cinic_mean, std=cinic_std)

transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=cinic_mean, std=cinic_std)
])

train_transform = transforms.Compose([
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(mean=cinic_mean, std=cinic_std)
])

c_trainset = datasets.ImageFolder(root=cinic_traindir, transform=train_transform)
c_trainloader = torch.utils.data.DataLoader(c_trainset,
                                          batch_size=128,
                                          shuffle=True,
                                          num_workers=2)

c_testset = datasets.ImageFolder(root=cinic_testdir, transform=transform)
c_testloader = torch.utils.data.DataLoader(c_testset,
                                         batch_size=100,
                                         shuffle=True,
                                         num_workers=2)

classes = ('airplane', 'automobile', 'bird', 'cat',
           'deer', 'dog', 'frog', 'horse', 'ship', 'truck')

In [None]:
#@title Code Cell 4.0c
!pip install loralib
import loralib as lora

device = 'cuda'
default_criterion = nn.CrossEntropyLoss
default_epochs = 1
default_lr = 0.01

---
<font color='red'>**PART 4.1:**</font> [25 points]

<font color='red'>**Deliverables**</font>
1. In *Code Cell 4.1*, instantiate a pre-trained ResNet-18 model (`IMAGENET1K_V1` weights) using the `torchvision` library.
2. In *Code Cell 4.2*, implement the freezing of earlier layers in the model. As long as the final classification layer is not frozen, you are free to choose whichever layers to freeze. You may want to start with leaving the last 4 conv layers unfrozen.
3. In *Code Cell 4.3*, replace the final classification layer of the ResNet-18 model with a LoRA linear layer.
4. You will train and evaluate the aforementioned models in *Code Cell 4.4* through *Code Cell 4.6*. Plot the test accuracy curves (accuracy vs epoch) on the same graph for these three models: the baseline pre-trained model (`pt_net`), the partially frozen model (`net_freeze`), and the model with both frozen layers and a LoRA classification layer (`net_freeze_lora`). Note: when making adjustments to your models, make sure to run *Code Cells 4.2 - 4.6* **in sequential order** to avoid unexpected results.

---

In [None]:
#@title Code Cell 4.1

# PART 4.1: Implement!
# Read the documentation at https://pytorch.org/vision/stable/models.html
# Create a Resnet-18 network with weights that were pretrained on IMAGENET1K_V1.
from torchvision.models import resnet18 # [ must import resnet18]
# TODO: implement!
pt_net =

# Need criterion and optimizer per model
pt_net_criterion = default_criterion()
pt_net_optimizer = torch.optim.SGD(pt_net.parameters(), lr=0.01, momentum=0.9, weight_decay=5e-4)

# Get the baseline accuracy
pt_net.to(device)
test(net=pt_net, epoch=0, loader=c_testloader, criterion=pt_net_criterion, loss_tracker=[], acc_tracker=[])

In [None]:
#@title Code Cell 4.2

# PART 4.1: Implement!
# Freeze earlier layers of the model (e.g., layer0 -> layerN). [7.5 pts]
# Modifications should be applied to net_freeze.
# Hint: you may want to use the named_parameters() method, and leave biases
# unfrozen.
net_freeze = deepcopy(pt_net)

# Code that freezes layers of network
# TODO: implement!

# Need criterion and optimizer per model
net_freeze_criterion = default_criterion()
net_freeze_optimizer = torch.optim.SGD(net_freeze.parameters(), lr=0.01, momentum=0.9, weight_decay=5e-4)

# Get the baseline accuracy
net_freeze.to(device)
test(net=net_freeze, epoch=0, loader=c_testloader, criterion=net_freeze_criterion, loss_tracker=[], acc_tracker=[])

In [None]:
#@title Code Cell 4.3

# PART 4.1: Implement!
# Replace the classifer layer with a LoRA linear layer [7.5 pts]
# Modifications should be applied to net_freeze_lora.
# We have already installed and imported this library as "lora"
# Library reference: https://github.com/microsoft/LoRA/tree/main
# Hint: when replacing the layer, you may want to keep a copy of the original
# weights. You may also want try different ranks (but r=10 should suffice).
net_freeze_lora = deepcopy(net_freeze)

# Modify network to have a LoRA linear classifer
# TODO: implement!

# Need criterion and optimizer per model
net_freeze_lora_criterion = default_criterion()
net_freeze_lora_optimizer = torch.optim.SGD(net_freeze_lora.parameters(), lr=0.01, momentum=0.9, weight_decay=5e-4)

# Get the baseline accuracy
net_freeze_lora.to(device)
test(net=net_freeze_lora, epoch=0, loader=c_testloader, criterion=net_freeze_lora_criterion, loss_tracker=[], acc_tracker=[])

---
<font color='red'>**PART 4.2:**</font> [10 points]

<font color='red'>**Deliverables**</font>
1. You will train and evaluate the aforementioned models in *Code Cell 4.4* through *Code Cell 4.6*. Plot the test accuracy curves (accuracy vs epoch) on the same graph for these three models: the baseline pre-trained model (`pt_net`), the partially frozen model (`net_freeze`), and the model with both frozen layers and a LoRA classification layer (`net_freeze_lora`). Note: if you make any adjustments to your models, make sure to run *Code Cells 4.1 - 4.3* in **sequential order and before** *Code Cells 4.4 - 4.6* to avoid unexpected behavior.
---


In [None]:
#@title Code Cell 4.4

# Fine-tune and test vanilla pre-trained network
pt_net.to(device)
for epoch in range(default_epochs):
    train(net=pt_net, epoch=epoch, loader=c_trainloader,
          criterion=pt_net_criterion, optimizer=pt_net_optimizer,
          loss_tracker=[], acc_tracker=[])
    test(net=pt_net, epoch=epoch, loader=c_testloader,
         criterion=pt_net_criterion,
         loss_tracker=[], acc_tracker=[])


In [None]:
#@title Code Cell 4.5

# Fine-tune and test partially frozen network
net_freeze.to(device)
for epoch in range(default_epochs):
    train(net=net_freeze, epoch=epoch, loader=c_trainloader,
          criterion=net_freeze_criterion, optimizer=net_freeze_optimizer,
          loss_tracker=[], acc_tracker=[])
    test(net=net_freeze, epoch=epoch,
         loader=c_testloader, criterion=net_freeze_criterion,
         loss_tracker=[], acc_tracker=[])


In [None]:
#@title Code Cell 4.6

# Fine-tune and test partially frozen network with LoRA classifier
net_freeze_lora.to(device)
for epoch in range(default_epochs):
    train(net=net_freeze_lora, epoch=epoch, loader=c_trainloader,
          criterion=net_freeze_lora_criterion, optimizer=net_freeze_lora_optimizer,
          loss_tracker=[], acc_tracker=[])
    test(net=net_freeze_lora, epoch=epoch,
         loader=c_testloader, criterion=net_freeze_lora_criterion,
         loss_tracker=[], acc_tracker=[])


---

### **5. Transformers**

---

In [None]:
#@title Code Cell 5.1

import numpy as np
import matplotlib.pyplot as plt
import time
import logging

import torch
import torch.nn as nn
import torch.nn.functional as F

torch.manual_seed(0)
np.random.seed(0)

In this question, your team will be implementing the transformer model, focusing on the original transformer model introduced in the paper "Attention is All You Need" by Vaswani et al. (2017).

In this programming assignment, your team will implement all the components of the transformer model. In the next programming assignment, you will use the components you have implemented here to train a small language model. For this, your team has the option to use PyTorch or TensorFlow for implementation (For Tensorflow, make sure to replace the class definitions).

You will find Figure 1 in the paper very helpful for this problem set. Be sure to carefully review relevant PyTorch/TensorFlow documentation. In your writeup, you will need to write and justify the hyperparameters that you will need to implement the transformer model.

<img src="https://drive.google.com/uc?export=view&id=1TX5V8DRzYlGFv0OMoE6qcW8f-tM5WDH5" width="500"/>


# I: Implementing Components of the Transformer

## 1) Positional Encoding and Embedding
---

**a)** Positional Encoding

The input to the transformer model is just the set of vectors without any notion of order. Without recurrent or convolutional layers we need to inject some information about the position of the words in the sentence. If this is not done, the set of words "I am a student" and "am I a student" would be treated as the same input.

In the original transformer model, the authors use the positional encoding of the form:

$$PE_{(pos, 2i)} = sin(pos / 10000^{2i/d_{model}})$$
$$PE_{(pos, 2i+1)} = cos(pos / 10000^{2i/d_{model}})$$

where $pos$ is the position and $i$ is the dimension. Implement this positional encoding function below.

In [None]:
#@title Code Cell 5.2

def positional_encoding(length, depth):
  """
  Generate positional encoding matrix using sine and cosine functions.

  Args:
    length: Maximum sequence length (corresponds to 'pos' in the formula)
    depth: Embedding dimension (corresponds to 'd_model' in the formula)

  Returns:
    Positional encoding matrix of shape (length, depth)
  """
  # Create position indices: [0, 1, 2, ..., length-1]
  positions = np.arange(length)[:, np.newaxis]  # Shape: (length, 1)

  # Create dimension indices: [0, 1, 2, ..., depth-1]
  dimensions = np.arange(depth)[np.newaxis, :]  # Shape: (1, depth)

  # Calculate the angle rates: 1 / (10000^(2i/d_model))
  # For even indices (2i), for odd indices (2i+1), we use floor division
  angle_rates = 1 / np.power(10000, (2 * (dimensions // 2)) / np.float32(depth))

  # Calculate angles: pos * angle_rates
  angle_rads = positions * angle_rates  # Shape: (length, depth)

  # Apply sin to even indices (2i) and cos to odd indices (2i+1)
  angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])  # Apply sine to even indices
  angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])  # Apply cosine to odd indices

  # Convert to PyTorch tensor and add batch dimension
  pos_encoding = torch.tensor(angle_rads, dtype=torch.float32)

  return pos_encoding

**b)** Plot the positional encoding with length 2048 and depth 256. Comment on your observations.

In [None]:
#@title Code Cell 5.3

# Generate positional encoding with length=2048 and depth=256
pos_enc = positional_encoding(2048, 256)

# Plot the positional encoding
plt.figure(figsize=(15, 5))
plt.pcolormesh(pos_enc.numpy(), cmap='RdBu')
plt.xlabel('Depth (Embedding Dimension)')
plt.ylabel('Position')
plt.colorbar()
plt.title('Positional Encoding: Sine and Cosine Patterns')
plt.show()

# Comment: The plot shows alternating sine and cosine wave patterns.
# Lower dimensions (left) have slower-varying patterns (lower frequencies),
# while higher dimensions (right) have faster-varying patterns (higher frequencies).
# This allows the model to attend to both local and global positional information.

**c)** Now, construct `PositionalEmbedding` class that finds the embedding vector of the token and adds the positional encoding to it. `You may use the torch.nn.Embedding`/`tf.keras.layers.Embedding` layer to find the token embedding.

In [None]:
#@title Code Cell 5.4
class PositionalEmbedding(nn.Module):
    def __init__(self, vocab_size, d_model, max_len=2048):
      """
      Combines token embeddings with positional encodings.

      Args:
        vocab_size: Size of the vocabulary
        d_model: Embedding dimension
        max_len: Maximum sequence length
      """
      super(PositionalEmbedding, self).__init__()
      self.d_model = d_model

      # Token embedding layer: converts token indices to dense vectors
      self.embedding = nn.Embedding(vocab_size, d_model)

      # Generate positional encoding and register as buffer (not a trainable parameter)
      self.pos_encoding = positional_encoding(max_len, d_model)
      self.register_buffer('positional_encoding', self.pos_encoding)

    def forward(self, x):
      """
      Forward pass: compute token embeddings and add positional encodings.

      Args:
        x: Input tensor of token indices, shape (batch_size, seq_len)

      Returns:
        Embedding with positional encoding, shape (batch_size, seq_len, d_model)
      """
      # Get sequence length from input
      seq_len = x.shape[1]

      # Get token embeddings and scale by sqrt(d_model) as in the paper
      x = self.embedding(x) * math.sqrt(self.d_model)

      # Add positional encoding (only up to sequence length)
      x = x + self.positional_encoding[:seq_len, :]

      return x

In [None]:
#@title Code Cell 5.5
vocab_size = 10
d_model = 16
max_len = 20

model = PositionalEmbedding(vocab_size, d_model, max_len=max_len)

input_tensor = torch.randint(0, vocab_size, (2, 5))

output = model(input_tensor)


print("Input Tensor Shape:")
print(input_tensor.shape)

print("\nOutput Tensor Shape:")
print(output.shape)


def test_positional_embedding_shape():
    expected_shape = (2, 5, d_model)
    assert output.shape == expected_shape, f"Expected shape {expected_shape}, but got {output.shape}"

# Running the test
test_positional_embedding_shape()
print("\nShape test passed!")

## 2) Attention Layers

The Attention mechanism is at the center of the transformer architecture. Recall that the self-attention output is defined as:

$$
\text{Attention}(Q, K, V) = \text{softmax}\left(\frac{QK^T}{\sqrt{d_k}}\right)V
\quad \text{(1)}
$$

where $Q$, $K$, and $V$ are the query, key, and value matrices, respectively. $d_k$ is the dimension of each of the key vectors, and it helps with normalizing dot products that can become large. Here is a short intuition for each of them:

- **Query (Q)** represents something that is looking to gather information, indicating which information is important to the model.

- **Key (K)** represents the information that is being looked at, indicating which information is important to the model.

- **Value (V)** represents the information that is being outputted/passed along.


This allows us to define the multi-head attention, which concatenates different attention layer, allowing the model to focus on different representations of the input in different positions. We define the multi-head attention as follows:

$$
\text{MultiHead}(Q, K, V) = \text{Concat}(\text{head}_1, \ldots, \text{head}_h)W^O
\quad \text{(2)}
$$

where each head is defined as:

$$
\text{head}_i = \text{Attention}(QW_i^Q, KW_i^K, VW_i^V)
\quad \text{(3)}
$$

where $W_i^Q$, $W_i^K$, and $W_i^V$ are the weight matrices for the query, key, and value vectors, respectively. $W^O$ is the weight matrix that is applied to the concatenated output of the heads.



The Query, Key, and Value vectors are determined by multiplying the input embedding matrix with the weight matrices $W^Q$, $W^K$, and $W^V$, respectively. These are the parameters that are learned during the training process.






**a)** The global self-attention layer processes context sequence and is designed to capture the dependencies between the words in the sequence. For the attention layer equation (2), we know that the input to each of the heads in the multi-head attention are determined by the following:

\begin{equation}
XW^Q, \quad K = XW^K, \quad V = XW^V
\end{equation}

respectively, where $X$ is the input embedding matrix. From this, implement the `GlobalSelfAttention` class. Ensure within the global-self attention layer, you add the normalization and the skip connection. **This will need to be done for all the attention layers you implement in the problem set.** Note that you are **allowed** to use the **torch.nn.MultiheadAttention** class or TensorFlow equivalent.


<img src="https://drive.google.com/uc?export=view&id=1PoY5zupGP46K6fs0qJMDynzJecBBmT0s" width="500"/>

In [None]:
#@title Code Cell 5.6
class GlobalSelfAttention(nn.Module):
    def __init__(self, **kwargs):
        """
        Global self-attention layer with layer normalization and residual connection.
        Uses PyTorch's MultiheadAttention module.

        Args:
          d_model: Embedding dimension
          num_heads: Number of attention heads
          dropout_rate: Dropout probability
        """
        super(GlobalSelfAttention, self).__init__()
        self.d_model = kwargs['d_model']
        self.num_heads = kwargs['num_heads']

        # Multi-head attention layer
        self.mha = nn.MultiheadAttention(
            embed_dim=self.d_model,
            num_heads=self.num_heads,
            dropout=kwargs.get('dropout_rate', 0.1),
            batch_first=True  # Input shape: (batch, seq, feature)
        )

        # Layer normalization
        self.layernorm = nn.LayerNorm(self.d_model)

    def forward(self, x):
        """
        Forward pass with self-attention, residual connection, and layer norm.

        Args:
          x: Input tensor, shape (batch_size, seq_len, d_model)

        Returns:
          Output tensor with same shape as input
        """
        # Self-attention: Q, K, V all come from the same input x
        # attn_output shape: (batch_size, seq_len, d_model)
        attn_output, _ = self.mha(x, x, x, need_weights=False)

        # Residual connection: add input to attention output
        x = x + attn_output

        # Layer normalization
        x = self.layernorm(x)

        return x



**b)** The cross attention layer connects the encoder and the decoder together. Implement the `CrossAttention` class, paying attention to the fact that the query comes from the decoder, while the key and value come from the encoder. (So in your call function, you will have 2 inputs)


<img src="https://drive.google.com/uc?export=view&id=1s54R7qkXE8hXgn9XnPGD7sYUFKR08RLV" width="500"/>

In [None]:
#@title Code Cell 5.7
class CrossAttention(nn.Module):
    def __init__(self, **kwargs):
        """
        Cross-attention layer connecting encoder and decoder.
        Query comes from decoder, Key and Value come from encoder.

        Args:
          d_model: Embedding dimension
          num_heads: Number of attention heads
          dropout_rate: Dropout probability
        """
        super(CrossAttention, self).__init__()
        self.d_model = kwargs['d_model']
        self.num_heads = kwargs['num_heads']

        # Multi-head attention layer
        self.mha = nn.MultiheadAttention(
            embed_dim=self.d_model,
            num_heads=self.num_heads,
            dropout=kwargs.get('dropout_rate', 0.1),
            batch_first=True
        )

        # Layer normalization
        self.layernorm = nn.LayerNorm(self.d_model)

    def forward(self, x, context):
        """
        Forward pass with cross-attention.

        Args:
          x: Query from decoder, shape (batch_size, target_seq_len, d_model)
          context: Key and Value from encoder, shape (batch_size, source_seq_len, d_model)

        Returns:
          Output tensor, shape (batch_size, target_seq_len, d_model)
        """
        # Cross-attention: Q from x (decoder), K and V from context (encoder)
        attn_output, _ = self.mha(x, context, context, need_weights=False)

        # Residual connection: add decoder input to attention output
        x = x + attn_output

        # Layer normalization
        x = self.layernorm(x)

        return x



**c)** The causal self-attention layer is similar to the global self-attention layer we discussed in (a), but it is designed for the decoder. To prevent the model from looking into the future, we apply a causal mask to the attention weights, ensuring that the model only attends to the previous tokens. Implement the `CausalSelfAttention` class below.


<img src="https://drive.google.com/uc?export=view&id=1pXMWcB-lIWjRtFxjBTZ73S9Mn1PE-YLc" width="500"/>

In [None]:
#@title Code Cell 5.8
class CausalSelfAttention(nn.Module):
    def __init__(self, **kwargs):
        """
        Causal self-attention layer for decoder with masking to prevent looking ahead.

        Args:
          d_model: Embedding dimension
          num_heads: Number of attention heads
          dropout_rate: Dropout probability
        """
        super(CausalSelfAttention, self).__init__()
        self.d_model = kwargs['d_model']
        self.num_heads = kwargs['num_heads']

        # Multi-head attention layer
        self.mha = nn.MultiheadAttention(
            embed_dim=self.d_model,
            num_heads=self.num_heads,
            dropout=kwargs.get('dropout_rate', 0.1),
            batch_first=True
        )

        # Layer normalization
        self.layernorm = nn.LayerNorm(self.d_model)

    def forward(self, x):
        """
        Forward pass with causal self-attention.

        Args:
          x: Input tensor, shape (batch_size, seq_len, d_model)

        Returns:
          Output tensor with same shape as input
        """
        batch_size, seq_len, d_model = x.shape

        # Create causal mask: upper triangular matrix of -inf values
        # This prevents attention to future positions
        causal_mask = torch.triu(
            torch.ones(seq_len, seq_len, device=x.device) * float('-inf'),
            diagonal=1  # Start from diagonal=1 to keep current position
        )

        # Apply causal self-attention with mask
        attn_output, _ = self.mha(
            x, x, x,
            attn_mask=causal_mask,
            need_weights=False
        )

        # Residual connection
        x = x + attn_output

        # Layer normalization
        x = self.layernorm(x)

        return x

# 3) Additional Layers - Feedforward and Layer Normalization

After the attention layers in the encoder and the decoder, we have a simple feedforward neural network. It consists of two linear transformations with a ReLU activation in between, and a dropout layer (with dropout rate 0.1) after the linear transformations.

**Implement this network**, and add the layer normalization and the residual connection to the feedforward network.


In [None]:
#@title Code Cell 5.9
class FeedForward(nn.Module):
    def __init__(self, d_model, dff, dropout_rate=0.1):
        """
        Feedforward network with two linear layers, ReLU, dropout, layer norm, and residual.

        Args:
          d_model: Input and output dimension
          dff: Hidden layer dimension (typically 4 * d_model)
          dropout_rate: Dropout probability
        """
        super(FeedForward, self).__init__()

        # First linear transformation: d_model -> dff
        self.linear1 = nn.Linear(d_model, dff)

        # ReLU activation
        self.relu = nn.ReLU()

        # Second linear transformation: dff -> d_model
        self.linear2 = nn.Linear(dff, d_model)

        # Dropout layer applied after second linear transformation
        self.dropout = nn.Dropout(dropout_rate)

        # Layer normalization
        self.layernorm = nn.LayerNorm(d_model)

    def forward(self, x):
        """
        Forward pass through feedforward network.

        Args:
          x: Input tensor, shape (batch_size, seq_len, d_model)

        Returns:
          Output tensor with same shape as input
        """
        # Store input for residual connection
        residual = x

        # First linear layer
        x = self.linear1(x)

        # ReLU activation
        x = self.relu(x)

        # Second linear layer
        x = self.linear2(x)

        # Dropout
        x = self.dropout(x)

        # Residual connection: add original input
        x = x + residual

        # Layer normalization
        x = self.layernorm(x)

        return x

# II Implementing the Transformer Model

## 1) Encoder
---

**a)** Now, we have all the components we need to construct the transformer model. We will start by implementing the encoder layer that consists global self attention and feedforward network. Implement the `EncoderLayer` class below.


In [None]:
#@title Code Cell 5.10
class EncoderLayer(nn.Module):
    def __init__(self, *, d_model, num_heads, dff, dropout_rate=0.1):
        """
        Single encoder layer: global self-attention + feedforward network.

        Args:
          d_model: Embedding dimension
          num_heads: Number of attention heads
          dff: Feedforward hidden dimension
          dropout_rate: Dropout probability
        """
        super(EncoderLayer, self).__init__()

        # Global self-attention layer
        self.self_attention = GlobalSelfAttention(
            d_model=d_model,
            num_heads=num_heads,
            dropout_rate=dropout_rate
        )

        # Feedforward network
        self.ffn = FeedForward(d_model, dff, dropout_rate)

    def forward(self, x):
        """
        Forward pass through encoder layer.

        Args:
          x: Input tensor, shape (batch_size, seq_len, d_model)

        Returns:
          Output tensor with same shape as input
        """
        # Apply self-attention (includes residual + layer norm)
        x = self.self_attention(x)

        # Apply feedforward network (includes residual + layer norm)
        x = self.ffn(x)

        return x

**b)** Implement the `Encoder` class that stacks multiple encoder layers.

In [None]:
#@title Code Cell 5.11
class Encoder(nn.Module):
    def __init__(self, num_layers, d_model, num_heads,
                 dff, vocab_size, dropout_rate=0.1):
        """
        Complete encoder: stacks multiple encoder layers.

        Args:
          num_layers: Number of encoder layers to stack
          d_model: Embedding dimension
          num_heads: Number of attention heads
          dff: Feedforward hidden dimension
          vocab_size: Size of input vocabulary
          dropout_rate: Dropout probability
        """
        super().__init__()

        self.d_model = d_model
        self.num_layers = num_layers

        # Positional embedding layer
        self.pos_embedding = PositionalEmbedding(vocab_size, d_model)

        # Stack of encoder layers using ModuleList
        self.enc_layers = nn.ModuleList([
            EncoderLayer(
                d_model=d_model,
                num_heads=num_heads,
                dff=dff,
                dropout_rate=dropout_rate
            )
            for _ in range(num_layers)
        ])

        # Dropout applied after positional embedding
        self.dropout = nn.Dropout(dropout_rate)

    def forward(self, x):
        """
        Forward pass through entire encoder.

        Args:
          x: Input token indices, shape (batch_size, seq_len)

        Returns:
          Encoded representation, shape (batch_size, seq_len, d_model)
        """
        # Apply positional embedding
        x = self.pos_embedding(x)

        # Apply dropout after embedding
        x = self.dropout(x)

        # Pass through each encoder layer sequentially
        for enc_layer in self.enc_layers:
            x = enc_layer(x)

        return x

## 2) Decoder
---

**a)** Implement the `DecoderLayer` class that consists of causal self attention, cross attention, and feedforward network.

In [None]:
#@title Code Cell 5.12
class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, dff, dropout_rate=0.1):
        """
        Single decoder layer: causal self-attention + cross-attention + feedforward.

        Args:
          d_model: Embedding dimension
          num_heads: Number of attention heads
          dff: Feedforward hidden dimension
          dropout_rate: Dropout probability
        """
        super(DecoderLayer, self).__init__()

        # Causal self-attention layer (decoder attends to its own previous outputs)
        self.causal_self_attention = CausalSelfAttention(
            d_model=d_model,
            num_heads=num_heads,
            dropout_rate=dropout_rate
        )

        # Cross-attention layer (decoder attends to encoder outputs)
        self.cross_attention = CrossAttention(
            d_model=d_model,
            num_heads=num_heads,
            dropout_rate=dropout_rate
        )

        # Feedforward network
        self.ffn = FeedForward(d_model, dff, dropout_rate)

    def forward(self, x, context):
        """
        Forward pass through decoder layer.

        Args:
          x: Decoder input, shape (batch_size, target_seq_len, d_model)
          context: Encoder output, shape (batch_size, source_seq_len, d_model)

        Returns:
          Decoder output, shape (batch_size, target_seq_len, d_model)
        """
        # Apply causal self-attention (includes residual + layer norm)
        x = self.causal_self_attention(x)

        # Apply cross-attention with encoder output (includes residual + layer norm)
        x = self.cross_attention(x, context)

        # Apply feedforward network (includes residual + layer norm)
        x = self.ffn(x)

        return x

**b)** Implement the `Decoder` class that stacks multiple decoder layers.

In [None]:
#@title Code Cell 5.13
class Decoder(nn.Module):
    def __init__(self, num_layers, d_model, num_heads, dff, vocab_size, dropout_rate=0.1):
        """
        Complete decoder: stacks multiple decoder layers.

        Args:
          num_layers: Number of decoder layers to stack
          d_model: Embedding dimension
          num_heads: Number of attention heads
          dff: Feedforward hidden dimension
          vocab_size: Size of target vocabulary
          dropout_rate: Dropout probability
        """
        super(Decoder, self).__init__()

        self.d_model = d_model
        self.num_layers = num_layers

        # Positional embedding layer for target sequence
        self.pos_embedding = PositionalEmbedding(vocab_size, d_model)

        # Stack of decoder layers using ModuleList
        self.dec_layers = nn.ModuleList([
            DecoderLayer(
                d_model=d_model,
                num_heads=num_heads,
                dff=dff,
                dropout_rate=dropout_rate
            )
            for _ in range(num_layers)
        ])

        # Dropout applied after positional embedding
        self.dropout = nn.Dropout(dropout_rate)

    def forward(self, x, context):
        """
        Forward pass through entire decoder.

        Args:
          x: Target token indices, shape (batch_size, target_seq_len)
          context: Encoder output, shape (batch_size, source_seq_len, d_model)

        Returns:
          Decoded representation, shape (batch_size, target_seq_len, d_model)
        """
        # Apply positional embedding to target sequence
        x = self.pos_embedding(x)

        # Apply dropout after embedding
        x = self.dropout(x)

        # Pass through each decoder layer sequentially, providing encoder context
        for dec_layer in self.dec_layers:
            x = dec_layer(x, context)

        return x

## 3) Transformer - Putting it all together
---
We now have the encoder and the decoder ready, all you have to do is to put them together and add a final dense layer with a softmax activation to get the output. Implement the `Transformer` class below.

In [None]:
#@title Code Cell 5.14
class Transformer(nn.Module):
    def __init__(self, num_layers, d_model, num_heads, dff,
                 input_vocab_size, target_vocab_size, dropout_rate=0.1):
        """
        Complete Transformer model: encoder + decoder + final output layer.

        Args:
          num_layers: Number of encoder/decoder layers
          d_model: Embedding dimension
          num_heads: Number of attention heads
          dff: Feedforward hidden dimension
          input_vocab_size: Size of source vocabulary
          target_vocab_size: Size of target vocabulary
          dropout_rate: Dropout probability
        """
        super(Transformer, self).__init__()

        # Encoder stack
        self.encoder = Encoder(
            num_layers=num_layers,
            d_model=d_model,
            num_heads=num_heads,
            dff=dff,
            vocab_size=input_vocab_size,
            dropout_rate=dropout_rate
        )

        # Decoder stack
        self.decoder = Decoder(
            num_layers=num_layers,
            d_model=d_model,
            num_heads=num_heads,
            dff=dff,
            vocab_size=target_vocab_size,
            dropout_rate=dropout_rate
        )

        # Final linear layer to project to target vocabulary
        self.final_layer = nn.Linear(d_model, target_vocab_size)

    def forward(self, context, x):
        """
        Forward pass through entire transformer.

        Args:
          context: Source token indices, shape (batch_size, source_seq_len)
          x: Target token indices, shape (batch_size, target_seq_len)

        Returns:
          Logits over target vocabulary, shape (batch_size, target_seq_len, target_vocab_size)
        """
        # Encode source sequence
        context = self.encoder(context)  # Shape: (batch, source_seq_len, d_model)

        # Decode target sequence with encoder context
        x = self.decoder(x, context)  # Shape: (batch, target_seq_len, d_model)

        # Project to target vocabulary to get logits
        logits = self.final_layer(x)  # Shape: (batch, target_seq_len, target_vocab_size)

        return logits

Run the test function below to make sure your Transformer can initialize properly and the dimensions work out.

In [None]:
# @title Test Cell
def run_test():
    # Transformer hyperparameters
    num_layers = 4
    d_model = 512
    num_heads = 8
    dff = 2048
    input_vocab_size = 8000
    target_vocab_size = 8001
    dropout_rate = 0.1

    transformer_model = Transformer(num_layers, d_model, num_heads, dff,
                                    input_vocab_size, target_vocab_size, dropout_rate)

    batch_size = 64
    context_len = 201
    target_len = 200

    context = torch.randint(0, input_vocab_size, (batch_size, context_len))
    x = torch.randint(0, target_vocab_size, (batch_size, target_len))

    print(context.shape)

    logits = transformer_model(context, x)

    print(f"logits shape: {logits.shape}")
    assert logits.shape == (batch_size, target_len, target_vocab_size), \
        f"Expected shape {(batch_size, target_len, target_vocab_size)}, but got {logits.shape}"

run_test()

---

### **6. Depthwise Separable Convolutions**

---

In this section, you will convert the previous convolutional neural network to use depthwise separable convolutions rather than standard convolutions. Doing so will reduce the number of parameters in the neural network and enable deployment on resource-limited devices.

A depthwise separable convolution separates each channel of the 3D input, performs convolution individually on each channel, then "aggregates" the channels with a standard 1x1 convolution on the output of the previous step.

We refer you to this [blog](https://medium.com/@zurister/depth-wise-convolution-and-depth-wise-separable-convolution-37346565d4ec) and [paper](https://arxiv.org/abs/1704.04861) for more details about depthwise separable convolution.

> **Note:** Before running *Code Cell 6.1*, please run *Code Cell 1.1* → *Code Cell 1.3* to load the functions `train()`, `test()`, `trainloader`, `testloader`, and `conv_block()`.

---
<font color='red'>**PART 6.1:**</font> [25 points]

<font color='red'>**Deliverables**</font>

1. Implement the `DepthwiseSeparableConvolution` module in PyTorch.
(<font color="red">Do not use the "`groups`" parameter of [pytorch `nn.Conv2d`](https://pytorch.org/docs/stable/generated/torch.nn.Conv2d.html?highlight=conv#torch.nn.Conv2d) to implement this -- we want you to understand the details and mechanics of the operation!</font>)
2. Verify the correctness of `DepthwiseSeparableConvolution`. (*Code Cell 6.1* should print "Success...!" after you run it.)
3. Verify that `DepthwiseSeparableConvolution` reduces the number of parameters by a factor of the number of output channels by printing out the number of parameters for both a standard convolution and the depthwise separable convolution. By what factor did depthwise separable convolutions reduce the parameters? Does this match what should theoretically happen? (50 words maximum)


In [None]:
#@title Code Cell 6.1
import numpy as np

class DepthwiseSeparableConvolutionSolution(nn.Module):
  def __init__(self, C_in, C_out, W_H):
      self.C_in = C_in
      self.C_out = C_out
      self.W_H = W_H
      super(DepthwiseSeparableConvolutionSolution, self).__init__()
      self.depthwise = nn.Conv2d(C_in, C_in, kernel_size=W_H, groups=C_in, bias=False)
      self.pointwise = nn.Conv2d(C_in, C_out, kernel_size=1, bias=False)

  def forward(self, x):
      # Pad
      N,C_in,H_in,W_in = x.size()
      p_left = (self.W_H-1+1)//2
      p_right = self.W_H-1-p_left
      p_top = p_left
      p_bottom = p_right
      x = torch.nn.functional.pad(x, (p_left, p_right, p_top, p_bottom))
      out = self.depthwise(x)
      out = self.pointwise(out)
      return out

class DepthwiseSeparableConvolution(nn.Module):
  def __init__(self, C_in, C_out, W_H):
    # C_in - input channels
    # C_out - output channels
    # W_H - width = height dimension of filters
    super(DepthwiseSeparableConvolution, self).__init__()

    self.C_in = C_in
    self.C_out = C_out
    self.W_H = W_H

    # Please do not change this -- this will be overwritten during testing!
    self.kernel_weights = [nn.Parameter(torch.Tensor(self.W_H, self.W_H)) for i in range(self.C_in)]
    self.one_d_conv_weight = nn.Parameter(torch.Tensor(self.C_out, self.C_in, 1, 1))

    for i,k in enumerate(self.kernel_weights):
      self.register_parameter("k_%d" % i, k)

    self.reset_parameters()

  def reset_parameters(self) -> None:
    for k in self.kernel_weights+[self.one_d_conv_weight]:
      torch.nn.init.kaiming_uniform_(k, a=math.sqrt(5))

  def forward(self, x):
    # Input shape: (N, C_in, H_in, W_in)
    # Output shape: (N, C_out, H_in, W_in) (uses same padding for same dimension)
    # Note: N is batch size

    # Step 1: Calculate padding for "same" convolution
    # Padding formula: pad = (kernel_size - 1) / 2 for odd kernel, distributed for even
    p_left = (self.W_H - 1 + 1) // 2  # Left/top padding
    p_right = self.W_H - 1 - p_left   # Right/bottom padding
    
    # Apply padding to input: (left, right, top, bottom)
    x_padded = F.pad(x, (p_left, p_right, p_left, p_right))

    # Step 2: Depthwise convolution - apply each filter to its corresponding channel
    depthwise_outputs = []
    for i in range(self.C_in):
      # Extract single channel from input: shape (N, 1, H_padded, W_padded)
      channel_input = x_padded[:, i:i+1, :, :]
      
      # Extract corresponding kernel weight and reshape for conv2d
      # Shape: (1, 1, W_H, W_H) - one output channel, one input channel
      kernel = self.kernel_weights[i].unsqueeze(0).unsqueeze(0)
      
      # Apply convolution to this channel only (no padding needed, already padded)
      channel_output = F.conv2d(channel_input, kernel, stride=1, padding=0)
      
      depthwise_outputs.append(channel_output)
    
    # Concatenate all channel outputs along channel dimension
    # Shape: (N, C_in, H_in, W_in)
    depthwise_result = torch.cat(depthwise_outputs, dim=1)

    # Step 3: Pointwise convolution (1x1 conv) to aggregate channels
    # This combines information across channels to produce C_out output channels
    output = F.conv2d(depthwise_result, self.one_d_conv_weight, stride=1, padding=0)

    return output

# DepthwiseSeparableConvolution test case:
# You may use this to understand how depthwise conv works!
#
# Input of shape (batch=1, c_in=2, h_in=2, h_out2):
# x:
# [1,2] [4,5]
# [3,4] [5,6]
#
# Depthwise conv (c_in=2, c_out=2, width/height=2)
# f:
# [1,1] [2,2]
# [1,1] [2,2]
#
# 1d conv weight:
# fd:
# [2,3], [4,5]
#
# Operation:
# conv(x[0], f[0]), conv(x[1], f[1]) = x' =
# [1 3 ] [8 18]
# [4 10] [18 40]
#
# conv1d(x', fd)
# [26 60 ] [44 102]
# [62 140] [106 240]

m = DepthwiseSeparableConvolution(2,2,2)

m.kernel_weights = [nn.Parameter(torch.from_numpy(x).float()) for x in [
                     np.array([[1,1], [1,1]]),
                     np.array([[2,2], [2,2]])]]
m.one_d_conv_weight = nn.Parameter(torch.from_numpy(np.array(
    [[2,3],[4,5]]).reshape((2,2,1,1))).float())

#######################################################
# Solution version -- feel free to test against this
#######################################################
# m = DepthwiseSeparableConvolutionSolution(2,2,2)
# m.depthwise.weight = nn.Parameter(torch.from_numpy(np.array([
#                       [[1,1], [1,1]],
#                       [[2,2], [2,2]],
# ]).reshape(2,1,2,2)).float())
# m.pointwise.weight = nn.Parameter(torch.from_numpy(np.array([
#                       [2,3],[4,5]
# ]).reshape(2,2,1,1)).float())
##############################################

x = torch.from_numpy(np.array([
                 [[1,2],[3,4]],
                 [[4,5],[5,6]]
]).reshape(1,2,2,2)).float()

groundtruth = np.array([[[26,60],[62,140]], [[44, 102],[106,240]]])
err = np.linalg.norm(m(x).detach().numpy()-groundtruth)
if err <= 1e-5:
  print("Success: err: %f!" % err)
else:
  print("Failure: err: %f!" % err)

# Compare number of parameters for depthwise separable conv and standard conv
standard_conv = torch.nn.Conv2d(10, 20, 5, bias=False)
dwise_conv = DepthwiseSeparableConvolution(10, 20, 5)

# Count parameters in a module
def count_params(module):
  """Count the total number of parameters in a PyTorch module."""
  return sum(p.numel() for p in module.parameters() if p.requires_grad)

# Print parameter counts
standard_params = count_params(standard_conv)
dwise_params = count_params(dwise_conv)
reduction_factor = standard_params / dwise_params

print(f"Standard Conv Parameters: {standard_params}")
print(f"Depthwise Separable Conv Parameters: {dwise_params}")
print(f"Reduction Factor: {reduction_factor:.2f}x")

---
<font color='red'>**PART 6.2:**</font> [10 points]

<font color='red'>**Deliverables**</font>

1. Implement the `dw_conv_block` function which performs in sequence: depthwise separable convolution, batch normalization, ReLU.

2. Train the neural network for several epochs with your implementation of `DepthwiseSeparableConvolution`; compare the runtime with `DepthwiseSeparableConvolutionSolution`. How much faster is the solution versus your implementation? Why might this be? (50 words maximum)

3. Train the neural network for 100 epochs with `DepthwiseSeparableConvolutionSolution` and plot the training error and test accuracy (x-axis is epoch).

In [None]:
#@title Code Cell 6.2

def dw_conv_block(in_channels, out_channels, kernel_size=3):
    '''
    dw_conv_block performs in sequence:
    - Depthwise Separable Convolution (in_channels -> out_channels)
    - Batch Normalization
    - ReLU activation
    
    Args:
      in_channels: Number of input channels
      out_channels: Number of output channels
      kernel_size: Size of convolutional kernel (default 3x3)
    
    Returns:
      nn.Sequential module containing the three operations
    '''
    return nn.Sequential(
        # Depthwise separable convolution
        DepthwiseSeparableConvolutionSolution(in_channels, out_channels, kernel_size),
        # Batch normalization for output channels
        nn.BatchNorm2d(out_channels),
        # ReLU activation
        nn.ReLU(inplace=True)
    )


class DWConvNet(nn.Module):
    '''
    Modified CNN with depthwise separable convs.
    Uses depthwise separable convolutions in layers 2, 4, 7, and 9.
    '''
    def __init__(self):
        super(DWConvNet, self).__init__()

        # Build model with mix of standard and depthwise separable convolutions
        self.model = nn.Sequential(
            conv_block(3, 32),                    # Layer 1: standard conv
            dw_conv_block(32, 32),                # Layer 2: depthwise separable
            conv_block(32, 64, stride=2),         # Layer 3: standard conv (downsample)
            dw_conv_block(64, 64),                # Layer 4: depthwise separable
            conv_block(64, 64),                   # Layer 5: standard conv
            conv_block(64, 128, stride=2),        # Layer 6: standard conv (downsample)
            dw_conv_block(128, 128),              # Layer 7: depthwise separable
            conv_block(128, 256),                 # Layer 8: standard conv
            dw_conv_block(256, 256),              # Layer 9: depthwise separable
            nn.AdaptiveAvgPool2d(1)               # Global average pooling to 1x1
        )

        # Final classification layer
        self.classifier = nn.Linear(256, 10)

    def forward(self, x):
        '''
        Forward pass through the network.

        Args:
          x: Input image tensor, shape (batch_size, 3, 32, 32)

        Returns:
          Class logits, shape (batch_size, 10)
        '''
        # Apply convolutional layers
        h = self.model(x)

        # Flatten spatial dimensions
        B, C, x, y = h.shape
        h = h.view(B, C*x*y)  # Should be (B, 256) after AdaptiveAvgPool2d(1)
        
        # Apply classifier
        return self.classifier(h)

In [None]:
#@title Code Cell 6.3
# We will use Adam with default learning rate instead of SGD.
# You will find that the Adam optimizer converges much faster than SGD!

torch.manual_seed(43) # to give stable randomness

device = 'cuda'
net = DWConvNet()
net = net.to(device)

# Set these parameters based on PART 1.2
# From Part 1.2, the best learning rate schedule was MultiStepLR
# with milestones at every 25 epochs (25, 50, 75)
epochs = 100
milestones = [25, 50, 75]  # Decrease learning rate by factor of 10 at these epochs

criterion = nn.CrossEntropyLoss()
# Using Adam optimizer (converges faster than SGD for this architecture)
optimizer = torch.optim.Adam(net.parameters())
# MultiStepLR scheduler to decay learning rate at milestones
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer,
                                                 milestones=milestones,
                                                 gamma=0.1)

# Track training and test metrics
train_loss_tracker, train_acc_tracker = [], []
test_loss_tracker, test_acc_tracker = [], []

print('Training for {} epochs and milestones {}'.format(
      epochs, milestones))

start_time = time.time()
# Training loop over all epochs
for epoch in range(0, epochs):
    # Train for one epoch
    train(net=net, epoch=epoch, loader=trainloader, criterion=criterion, 
          optimizer=optimizer, loss_tracker=train_loss_tracker, 
          acc_tracker=train_acc_tracker)
    # Evaluate on test set
    test(net=net, epoch=epoch, loader=testloader, criterion=criterion, 
         loss_tracker=test_loss_tracker, acc_tracker=test_acc_tracker)
    # Update learning rate according to schedule
    scheduler.step()

total_time = time.time() - start_time
print('Total training time: {} seconds'.format(total_time))