# 6.390 Fall 2024 Homework 8
**If you haven't already, please hit :**

`File` -> `Save a Copy in Drive`

**to copy this notebook to your Google drive, and work on a copy. If you don't do this, your changes won't be saved!**

# **RUN THIS NOTEBOOK WITH GPU**

OTHERWISE YOU WILL SPEND A LONG TIME ON TRAINING

**To do this, go to `Runtime` > `Change runtime type` and under `hardware acceleration` set the dropdown to `GPU`.**

As a reminder, we have <a href="https://introml.mit.edu/fall24/info/pytorch_overview">this note on PyTorch</a> for more information about how to use it.

## Boilerplate

Run the following cells to load the data. No need to edit anything here.

In [None]:
import matplotlib.pyplot as plt
import numpy as np
from tqdm.auto import tqdm

import torch
from torch import nn
from torch.nn import functional as F
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
from torchvision import transforms

# To get the MNIST (digit images) dataset
from keras.datasets import mnist

torch.manual_seed(0)

In [None]:
# Download MNIST Data
(mnist_train, labels_train), (mnist_test, labels_test) = mnist.load_data()

# Load data as Numpy arrays of size (#datapoints, 28*28=784)
mnist_train = mnist_train.astype("float32") / 255.0
mnist_test = mnist_test.astype("float32") / 255.0
mnist_train = mnist_train.reshape((len(mnist_train), np.prod(mnist_train.shape[1:])))
mnist_test = mnist_test.reshape((len(mnist_test), np.prod(mnist_test.shape[1:])))

# Split test data into a test and validation set:
val_data = mnist_test[: (mnist_test.shape[0] // 2), :]
test_data = mnist_test[(mnist_test.shape[0] // 2) :, :]
train_data = mnist_train

val_labels = labels_test[: (mnist_test.shape[0] // 2)]
test_labels = labels_test[(mnist_test.shape[0] // 2) :]
train_labels = labels_train

# Display dataset information
print("Downloaded the following data:")
print(
    f"train_data has shape {train_data.shape}, containing {train_data.shape[0]} images represented as ({train_data.shape[1]}, 1) vectors"
)
print(
    f"val_data has shape {val_data.shape}, containing {val_data.shape[0]} images represented as ({val_data.shape[1]}, 1) vectors"
)
print(
    f"test_data has shape {test_data.shape}, containing {test_data.shape[0]} images represented as ({test_data.shape[1]}, 1) vectors"
)

In [None]:
# We'll create a Dataset class to use with PyTorch's Built-In Dataloaders
class MNISTDataset(Dataset):
    """
    A custom dataset class to use with PyTorch's built-in dataloaders.
    This will make feeding images to our models much easier downstream.

    data: np.arrays downloaded from Keras' databases
    vectorize: if True, outputed image data will be (784,)
                   if False, outputed image data will be (28,28)
    """

    def __init__(self, data, labels, vectorize=True):
        self.data = data
        self.labels = labels
        self.vectorize = vectorize

    def __getitem__(self, idx):
        image_data = self.data[idx, :]
        image_data = image_data.reshape((1, 28, 28))
        if self.vectorize:
            image_data = image_data.reshape((784,))
        image_label = self.labels[idx]
        return image_data, image_label

    def __len__(self):
        return self.data.shape[0]


# Create MNISTDataset objects for each of our train/val/test sets
train_dataset = MNISTDataset(train_data, train_labels)
val_dataset = MNISTDataset(val_data, val_labels)
test_dataset = MNISTDataset(test_data, test_labels)

# Create a PyTorch dataloader for each train/val/test set
# We'll use a batch size of 256 for the rest of this assignment.
train_loader = DataLoader(train_dataset, batch_size=256)
val_loader = DataLoader(val_dataset, batch_size=256)
test_loader = DataLoader(test_dataset, batch_size=256)

# Create another dataset that doesn't reshape data
cnn_train_dataset = MNISTDataset(train_data, train_labels, vectorize=False)
cnn_val_dataset = MNISTDataset(val_data, val_labels, vectorize=False)
cnn_test_dataset = MNISTDataset(test_data, test_labels, vectorize=False)

# create new dataloaders without data reshaping
cnn_train_loader = DataLoader(cnn_train_dataset, batch_size=256)
cnn_val_loader = DataLoader(cnn_val_dataset, batch_size=256)
cnn_test_loader = DataLoader(cnn_test_dataset, batch_size=256)

# Display dataloader info
print("Created the following Dataloaders:")
print(f"train_loader has {len(train_loader)} batches of training data")
print(f"val_loader has {len(val_loader)} batches of validation data")
print(f"test_loader has {len(test_loader)} batches of testing data")

In [None]:
# Defining a checker function so you can submit to the course website.
def get_modules(module_object):
    """
    gets a list of modules without nn.Sequential groupings, as a list of strings
    """
    modules_list = []
    for module in module_object.children():
        if isinstance(module, nn.Sequential):
            modules_list += get_modules(module)
        else:
            modules_list.append(str(module))
    return modules_list

# Image Classification

### Train/test scripts

We'll first implement the functions to `train` and `test` our classifiers. We've written most of the functions for you. Your job is to fill in the calculation of the loss.

As a reminder: For our loss function we will be using categorical cross-entropy loss between the output of our  model and our ground truth labels.

Note that:
* Use a built-in module instead of implementing the loss yourself!
* We are expecting a one-line solution in each case. Don't overthink this!
* The line you fill in for both `train()` and `test()` should be the same.

In [None]:
# EDIT ME!


def train(model, device, train_loader, optimizer, val_loader=None, pbar=False):
    """
    Function for training our networks. One call to train() performs a single
    epoch for training.

    model: an instance of our model, in this assignment, this will be your autoencoder

    device: either "cpu" or "cuda", depending on if you're running with GPU support

    train_loader: the dataloader for the training set

    optimizer: optimizer used for training (the optimizer implements SGD)

    val_loader: (optional) validation set to include
    """

    # Set the model to training mode.
    model.train()

    # we'll keep adding the loss of each batch to total_loss, so we can calculate
    # the average loss at the end of the epoch.
    total_loss = 0
    total_correct = 0
    total_items = 0

    # We'll iterate through each batch. One call of train() trains for 1 epoch.
    # batch_idx: an integer representing which batch number we're on
    # input: a pytorch tensor representing a batch of input images.
    if pbar:
        train_loader = tqdm(train_loader)

    for batch_idx, (input, target) in enumerate(train_loader):
        # This line sends data to GPU if you're using a GPU
        input = input.to(device)
        target = target.type(torch.LongTensor).to(device)

        # Zero out gradients from previous iteration
        optimizer.zero_grad()

        # feed our input through the network
        output = model.forward(input)

        ## TODO: YOUR CODE HERE

        loss_function = None  # FILL IN!
        loss_value = loss_function  # ( FILL IN! )

        ## END YOUR CODE

        # Perform backprop
        loss_value.backward()
        optimizer.step()

        # accumulate loss to later calculate the average
        total_loss += loss_value
        total_correct += torch.sum(torch.argmax(output, dim=1) == target)
        total_items += input.shape[0]

    return total_loss.item() / len(train_loader), (total_correct / total_items).item()

In [None]:
# EDIT ME!
def test(model, device, test_loader, pbar=False):
    """
    Function for testing our models. One call to test() runs through every
    datapoint in our dataset once.

    model: an instance of our model, in this assignment, this will be your autoencoder

    device: either "cpu" or "cuda:0", depending on if you're running with GPU support

    test_loader: the dataloader for the data to run the model on
    """
    # set model to evaluation mode
    model.eval()

    # we'll keep track of total loss to calculate the average later
    test_loss = 0
    total_correct = 0
    total_items = 0

    # donâ€™t track gradients in testing, since no backprop
    with torch.no_grad():
        # iterate thorugh each test image
        if pbar:
            test_loader = tqdm(test_loader)

        for input, target in test_loader:

            # send input image to GPU if using GPU
            input = input.to(device)
            target = target.type(torch.LongTensor).to(device)

            # run input through our model
            output = model(input)

            ## TODO: YOUR CODE HERE

            loss_function = None  # FILL IN!
            loss_value = loss_function  # ( FILL IN! )

            ## END YOUR CODE

            # Accumulate for accuracy
            test_loss += loss_value
            total_correct += torch.sum(torch.argmax(output, dim=1) == target)
            total_items += input.shape[0]

    # calculate average loss/accuracy per batch
    test_loss /= len(test_loader)
    accuracy = total_correct / total_items

    return test_loss.item(), accuracy.item()

## Fully Connected Classifiers

The code box below will be modified to use various fully-connected architectures for the following questions.

You will need to design your own `layers`, for example `layers=[nn.Linear(in_features=64, out_features=4)]` defines a single layer with 64 inputs and 4 output units.

In [None]:
# EDIT ME!

# define the layers of our model
layers = [
    ## TODO: YOUR CODE HERE
]

# set number of epochs to train for
epochs = 1  ## Change me in problem 5.3

# check if running on CPU or GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# initialize a fully-connected classifier
fc_model = nn.Sequential(*layers).to(device)

# initialize our optimizer. We'll use Adam
optimizer = torch.optim.Adam(fc_model.parameters())

# train your classifier
for epoch in range(1, epochs + 1):
    train_loss, train_acc = train(fc_model, device, train_loader, optimizer)
    val_loss, val_acc = test(fc_model, device, val_loader)
    print(
        "Train Epoch: {:02d} \tTraining Loss: {:.6f} \tTraining Acc: {:.6f}\n \t\t\tValidation Loss: {:.6f} \tValidation Acc: {:.6f}\n".format(
            epoch, train_loss, train_acc, val_loss, val_acc
        )
    )

# test your model
test_loss, test_acc = test(fc_model, device, test_loader)
print("\n\t\t\tTest Loss: {:.6f} \t Test Accuracy: {:.6f}".format(test_loss, test_acc))

## Convolutional Classifier

The code box below will be modified to use for convolutional neural network architectures. Note that the code is slightly different, using a different dataloader that doesn't flatten/vectorize the images.

In [None]:
# EDIT ME!

# check if running on CPU or GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# define the layers of our model
layers = [
    ## TODO: YOUR CODE HERE
]

# initialize a CNN classifier
cnn_model = nn.Sequential(*layers).to(device)

# initialize our optimizer. We'll use Adam
optimizer = torch.optim.Adam(cnn_model.parameters())

# set number of epochs to train for
epochs = 1

# train your classifier
for epoch in range(1, epochs + 1):
    train_loss, train_acc = train(cnn_model, device, cnn_train_loader, optimizer)
    val_loss, val_acc = test(cnn_model, device, cnn_val_loader)
    print(
        "Train Epoch: {:02d} \tTraining Loss: {:.6f} \tTraining Acc: {:.6f} \tValidation Loss: {:.6f} \tValidation Acc: {:.6f}".format(
            epoch, train_loss, train_acc, val_loss, val_acc
        )
    )

# test your model
test_loss, test_acc = test(cnn_model, device, cnn_test_loader)
print("\nTest Loss: {:.6f} \t Test Accuracy: {:.6f}".format(test_loss, test_acc))