<a href="https://colab.research.google.com/github/MJMortensonWarwick/Programming_and_Big_Data_Analytics_2425/blob/main/8_01_convolutional_neural_networks_part_ii.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

![](https://drive.google.com/uc?export=view&id=1vv_PsWBnUJwSCkwKDoJAC-vXjtaEA4Ts)

# 8.01 Convolutional Neural Networks (part II)
In this tutorial you will build your own CNN in PyTorch! Please reference back to Notebook 7.02 for insights on to how the code works!

We will be working with a chest x-ray set from Kaggle ([here](https://www.kaggle.com/datasets/sayakbera/fer-2013-7-emotions-uniform-dataset)) where we are predicting the emotion of people in different images.

To get the data we need an API key from Kaggle. To set up an account (if you haven't got one already):
* Click on your avatar in the top right and "Account" from the dropdown menu.
* Select "Settings"
* Scroll down the page and you'll find a button "Create Legacy API Key".
* This downloads an API key to your PC which you can upload here.
* Make sure you create a _Legacy_ key not a new one!!!!

In [None]:
from google.colab import files
files.upload()

Next we need to move this to a folder called kaggle as this is where Colab will look to find the API key. This has been written as Linux commands as we are doing this directly on the underlying (virtual) machine not within Python. Linux commands start with a "!" in Colab. Linux is out of scope for the module so we won't go into detail on the code.

In [None]:
!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 /root/.kaggle/kaggle.json

## Data Extraction
Now you can download the data. The following can be used for any Kaggle dataset. When you are on the dataset page click "Download" and "Download via CLI". This gives you code that looks like below (but add a "!" at the start):

In [None]:
!kaggle datasets download -d sayakbera/fer-2013-7-emotions-uniform-dataset

This has added the data as a zip folder. We can unzip it and create a new folder (Linux again):

In [None]:
!unzip fer-2013-7-emotions-uniform-dataset.zip -d facial-emotion

## Training and Testn Split
This data comes in three sets, `train`, `val` (validation) and `test`. We have seen two of these before, but `val` is new.

Because deep learning effectively uses _epochs_ as a hyperparameter, and the length of time we train the model will impact under- and over-fitting, we want to measure performance every $x$ number of epochs. As with hyperparameter tuning in the classical space, we don't want to use `test` to check perofmance as this is cheating and can cause _data leakage_ (the model being able to see the test data). We also don't want to use the training data as this can't measure overfitting (overfitting can only be measured on unseen data). We could do cross validation but this will be very expensive to do over many epochs (i.e. it will take ages). So instead we will split our data an extra time and create a `val` set. This will be used to measure performance over epochs, but also can be used to tune other hyperparameters.

For ease we will create some variables to store the path to the directories:

In [None]:
train_dir = "/content/facial-emotion/FER2013_7emotions_Uniform_Augmented_Dataset/train"
test_dir = "/content/facial-emotion/FER2013_7emotions_Uniform_Augmented_Dataset/test"
val_dir = "/content/facial-emotion/FER2013_7emotions_Uniform_Augmented_Dataset/validation"

## Data Preparation
Now we can add a generator to prepare out data. This data will perform basic cleaning operations on the data and feed it into the model.

The three tasks we will do are:
* `ToTensor()` changes the data into tensor format as before.
* `Resize()` changes the size of the images to something smaller (to make computations faster and easier).
* `Normalize()` (sic) effectively normalises the data as we have done previously with min-max or standardisation.

For images normalisation is slighltly different as we are working with RGB pixels, meaning each data item is a red score between 0 (no red) and 255 (red); a green score between 0 (no green) and 255 (green); and a blue score between 0 (no blue) and 255 (blue). This means if a pixel is stored as:
* RGB = [0,255,0] the pixel colour will be green.
* RGB = [255,0,0] the colour will be red.
* RGB = [0,0,0] the colour will be white.
* RGB = [255,255,255] the colour will be black.
* RGB = [255,0,255] the colour will be purple.
* And so on.

Because we are not scaling based on mean or max values in our data, all our data has the same scale, we apply the same transformation to all the datasets.

We do these transformations for each of our datasets (`train`, `val` and `test`) and then create a `DataLoader` to move the data into the model. First we split into batches of 64 items per batch. We _shuffle_ our training data, meaning we put the data into random order in case there are biases in how it has been saved that would then be present in our batches. `pin_memory` is used just to make things a little faster as the Colab environment is slightly underpowered.

However, we might also want to experiment with data augmentations! You will see some commented out code, and instructions on the hyperparameters. What should be added? What is appropriate for the problem space?

In [None]:
import torch
from torchvision import transforms, datasets
from torch.utils.data import DataLoader

# Define data transformations
train_transform = transforms.Compose([
    #transforms.RandomResizedCrop(224), # Randomly crop and resize
    #transforms.RandomHorizontalFlip(),  # Example augmentation
    #transforms.RandomVerticalFlip(),  # Example augmentation
    #transforms.RandomRotation(n),  # Example augmentation change n to max degrees you want to rotate
    #transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1), # Random color adjustments

    # keep these ones in!
    transforms.ToTensor(),  # Convert to tensor
    transforms.Resize((32, 32)),  # Resize images (adjust size as needed)
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) # ImageNet normalization
])

val_transform = transforms.Compose([
    transforms.Resize((32, 32)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

test_transform = transforms.Compose([
    transforms.Resize((32, 32)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Load datasets and apply transfomrations
train_dataset = datasets.ImageFolder(root=train_dir, transform=train_transform)
val_dataset = datasets.ImageFolder(root=val_dir, transform=val_transform)
test_dataset = datasets.ImageFolder(root=test_dir, transform=test_transform)

# Create data loaders
batch_size = 128  # Adjust batch size as needed (hyperparameter!)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=2, pin_memory=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, pin_memory=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, pin_memory=True)

## Convolutional Neural Network
Now we can build our model.

You will have to decide what layers you think are appropriate! You can see the code involved in Notebook 7.02.

With each layer ... e.g. `self.conv1(x, y, kernel_size=3)` ... you need to specify _x_ as the number of neurons in the previous layer and _y_ as the size of the output. If we have the above convolutional layer as the first layer, _x_ will likely be 3 (RGB colours) and _y_ will be the number of features. The next layer's input will then need to be the same as the previous ... e.g. `nn.BatchNorm2d(32)` (normalisation has the same output size as input size so only one number). In some cases its hard to calculate what the number should be. You can use AI to help.

Remember, the optimal number of filters (etc.) is very problem specific. What do you think it should be?

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

# Based on the dataset "facial-emotion", there are 7 classes.
num_classes = 7

class EmotionCNN(nn.Module):
    def __init__(self, num_classes=8):
        super(EmotionCNN, self).__init__()
        # e.g. self.conv1 = nn.Conv2d(3, [...], kernel_size=[...])
        # e.g. self.bn1 = nn.BatchNorm2d([...])
        # [...] # more layers
        # e.g. self.dropout = nn.Dropout(0.25)
        self.fc2 = nn.Linear([...], num_classes) # Specify the size of the previous layer

    def forward(self, x):
        # Should match the above
        # e.g. x = F.relu(self.conv1(x))
        # [...] # more layers
        return self.fc2(x) # Removed sigmoid for multi-class with CrossEntropyLoss


cnn = EmotionCNN()
cnn

To complete our model let's specify the loss function and optimiser. We will use a slightly different optimiser than before - [Adam](https://arxiv.org/abs/1412.6980). You can read more from the link if you want to. We will also start with a learning rate of 0.001 (0.1%) and will reduce this number by 0.0005 every epoch. This means that as the training goes on we will take smaller and smaller steps so we can be sure we don't miss the optimal model (the one with the lowest loss). You certainly can experiment with the learning rate (`lr`) and the decay rate (`weight_decay`).

In [None]:
# Define loss function and optimiser
import torch.optim as optim

criterion = nn.CrossEntropyLoss() # Cross-Entropy Loss for multi-class classification
optimiser = optim.Adam(cnn.parameters(), lr=0.001, weight_decay=0.0005) # Adam optimizer
# we add a weight decay parameter to reduce the learning rate each epoch

Let's see how many parameters this is:

In [None]:
# Move model to device
from torchsummary import summary

# Move data to GPU to get summary
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
cnn = cnn.to(device)

summary(cnn, (3, 32, 32))

We are going to apply _early stopping_, which is a new idea. Because we might overfit by running too many epochs, we will set up to autoamtically stop if the performance on validation doesn't improve after 5 epochs (the number 5 is effectively a hyperparameter we will call our _patience_). I.e. if epoch #6 give us a our best score, we then have until epoch #11 to get a better score or the training automatically stops.

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

# Early stopping parameters
patience = 5
best_val_loss = float('inf')
epochs_no_improve = 0

# empty lists to store losses for visualisation at the end
train_losses = []
val_losses = []

# Training loop
num_epochs = 500

# For loop for training
for epoch in range(num_epochs):
    cnn.train()
    running_loss = 0.0
    for images, labels in train_loader:
        # add images and label in batch to the GPU
        images = images.to(device)
        # Labels should be LongTensor for CrossEntropyLoss and no unsqueeze(1)
        labels = labels.long().to(device) # Explicitly cast to LongTensor

        cnn = cnn.to(device) # Pass model to GPU as well (already moved above, but doesn't hurt)

        optimiser.zero_grad() # reset the optimiser to start optimisation again

        # run the images through the model
        outputs = cnn(images)
        loss = criterion(outputs, labels) # calculate loss
        loss.backward() # backpropogate
        optimiser.step() # update parameters
        running_loss += loss.item() # keep track of loss for visualisation

    # store losses for visualisation
    train_losses.append(running_loss/len(train_loader)) # average loss

    # Validation
    cnn.eval()
    val_loss = 0.0
    all_preds = [] # holder for predictions as 0 or 1
    all_labels = [] # compare with labels
    with torch.no_grad(): # not update/backpropogation
      # Warning: val_loader is not defined in the current notebook setup.
      # Temporarily using test_loader for validation. Ideally, a proper val_dataset and val_loader should be created.
      if 'val_loader' not in locals() and 'val_loader' not in globals():
          print("Warning: val_loader is not defined. Using test_loader for validation.")
          current_val_loader = test_loader
      else:
          current_val_loader = val_loader

      for images, labels in current_val_loader:
          # add images and label in batch to the GPU
          images = images.to(device)
          # Labels should be LongTensor for CrossEntropyLoss and no unsqueeze(1)
          labels = labels.long().to(device) # Explicitly cast to LongTensor
          cnn = cnn.to(device) # Pass model as well

          # run the images through the model
          outputs = cnn(images)
          loss = criterion(outputs, labels) # calculate loss
          val_loss += loss.item() # keep track of loss for visualisation

    # store losses and recall for visualisation
    val_losses.append(val_loss/len(current_val_loader)) # average loss

    # Early stopping
    if val_loss < best_val_loss: # is this better than previous record?
        best_val_loss = val_loss # if so set as new best
        epochs_no_improve = 0 # make the no improve counter zero again
    else:
        epochs_no_improve += 1 # otherwise increase the count by one

    train_loss_rnd = round(running_loss / len(train_loader), 4) # avg train loss
    val_loss_rnd = round(val_loss / len(current_val_loader), 4) # avg val loss

    print(f"Epoch [{epoch+1}/{num_epochs}], Training Loss: {train_loss_rnd}, Validation Loss: {val_loss_rnd}")


    if epochs_no_improve == patience: # if epochs without improvement is at the patience level
        print("Early stopping triggered.")
        break

## Results
Let's visualise the results:

In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize=(10, 5))
plt.plot(train_losses, label='Training Loss')
plt.plot(val_losses, label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training and Validation Loss')
plt.legend()
plt.grid(True)
plt.show()

We can see loss on the training set has steadily improved. On the validation set its a bit more "bouncey" but then that's not a big set of images. It looks like our best performance was around 5 epochs. (Note, your results may change if you run it due to how we randomly split the data and so on). Let's see how we do on test:

In [None]:
# Evaluation on the test set
from sklearn.metrics import recall_score, accuracy_score

cnn.eval()
test_loss = 0.0
all_preds = []  # Store all predictions
all_true_labels = []  # Store all true labels

with torch.no_grad():
    for images, labels in val_loader: # Using val_loader for evaluation, consistent with training loop
        images = images.to(device)
        # Labels should be LongTensor and 1D for CrossEntropyLoss
        labels = labels.to(device)

        outputs = cnn(images)
        loss = criterion(outputs, labels)
        test_loss += loss.item()

        # For multi-class, predictions are the class with the highest logit
        predicted_labels = torch.argmax(outputs, dim=1)

        all_preds.extend(predicted_labels.cpu().numpy())
        all_true_labels.extend(labels.cpu().numpy())

test_loss_rnd = round(test_loss / len(val_loader), 4)

# Calculate metrics for multi-class
test_accuracy = accuracy_score(all_true_labels, all_preds)
# 'weighted' accounts for class imbalance
test_recall = recall_score(all_true_labels, all_preds, average='weighted', zero_division=0)


print(f"Test Loss: {test_loss_rnd}")
print(f"Test Accuracy: {round(test_accuracy, 4)}")
print(f"Test Recall (weighted): {round(test_recall, 4)}")

How did you do? What could improve it?