# Overview
In this lab, you will learn how to train a basic neural network to recognize emotions from facial expressions. Through a comprehensive case study, you will explore the entire training process, covering data preparation and loading, constructing the network, training it with the data, and finally testing and evaluating the performance of the trained model.

# Introduction
Emotion recognition using facial expressions is a fascinating application of neural networks in artificial intelligence. By analyzing facial images, we can identify emotions like happiness, sadness, and anger. This course will introduce the fundamentals of neural networks, data preprocessing, and model training with frameworks of pytorch. Participants will learn to build a model for emotion recognition, exploring its applications in areas such as human-computer interaction and mental health.

# Objectives

*   Learn the usage of jupyter nootbook and pytorch framework.
*   Learn how to create a dataloader to load and prepocess the training data.
*   Learn how to implement a neural network model and optimize it.
*   Learn how to train, evaluate and test a model.

# **Case Study**

Import the required library

In [None]:
import os
import numpy as np
import torch
from time import time
from PIL import Image
from torch import nn
from torchvision import transforms
from torch.utils.data import DataLoader, Dataset
import torch.utils.model_zoo as model_zoo
import matplotlib.pyplot as plt
from tqdm.auto import tqdm

Check the GPU availability. If the output result of `device` is cpu not cuda, please change the runtime type to T4 GPU in the above menu.(Remember to switch to GPU in subsequent labs)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

## Prepare Data

Download the data.

The Real-world Affective Faces Database (RAF-DB) is a dataset for facial expression. This version contains 15000 facial images. Images in this database are of great variability in subjects' age, gender and ethnicity, head poses, lighting conditions, occlusions, (e.g. glasses, facial hair or self-occlusion), post-processing operations (e.g. various filters and special effects)



In [None]:
!curl -L -o ./raf-db-dataset.zip\
  https://www.kaggle.com/api/v1/datasets/download/shuvoalok/raf-db-dataset

!unzip raf-db-dataset.zip

Create the dataloader.

Below is a sample data loader for the RAF-DB dataset. **Please understand how it works and develop a custom data loader tailored to your exercise dataset independently.** Please refer to this link for more detailed tutorial: https://pytorch.org/tutorials/beginner/basics/data_tutorial.html#datasets-dataloaders

Key Features of the `RAFDBDataset` Class
* Initialization:
  When you create an instance of the `RAFDBDataset` class, you need to provide the root directory where the dataset is stored. You can also specify whether you want to load the training data or the testing data by setting the `train` parameter to `True` or `False`.
* Loading Images and Labels: The class is designed to automatically scan through the specified directory and gather all the images along with their corresponding labels. The images are organized into subdirectories for each emotion label (from 1 to 7). This means that when you use this class, it will find all the images in the training or testing folders and keep track of them for you.
* Dataset Length: The class includes a method to tell you how many images are available in the dataset. This is useful for setting up training loops and understanding the size of your data.
* Retrieving Data: When you want to get a specific image and its label, you can use an index. The class provides a way to access an image by its index, loading the image from disk, converting it to the right format (RGB), and applying any transformations you specified. It then returns both the image and its associated label.

In [None]:
class RAFDBDataset(Dataset):
    def __init__(self, root_dir, train=True, transform=None):
        self.root_dir = root_dir
        self.train = train
        self.transform = transform
        self.images = []
        self.labels = []

        base_dir = os.path.join(self.root_dir, 'train' if self.train else 'test')

        for label in range(1, 8):
            label_dir = os.path.join(base_dir, str(label))
            for img_name in os.listdir(label_dir):
                if img_name.lower().endswith(('.png', '.jpg', '.jpeg')):
                    self.images.append(os.path.join(label_dir, img_name))
                    self.labels.append(label-1)

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        img_path = self.images[idx]
        image = Image.open(img_path).convert('RGB')
        label = self.labels[idx]

        if self.transform:
            image = self.transform(image)

        return image, label

Create transform function to preprocess the image.
* `transforms.Resize(IMAGE_SIZE)`: Resizes the input image to a specified size, which is (224, 224) in this case.

* `transforms.RandomHorizontalFlip()`: Randomly flips the image horizontally to increase the diversity of the data.

* `transforms.ToTensor()`: Converts a PIL image or NumPy ndarray to a FloatTensor and scales the pixel values to the range [0, 1].

* `normalize`: Normalizes the image data by subtracting the mean and dividing by the standard deviation. The mean and std values provided are for the three color channels (RGB).

* `transforms.RandomErasing(scale=(0.02, 0.25))`: Randomly erases a portion of the image. The scale parameter defines the range of the area ratio of the erased region to the image area.

In [None]:
IMAGE_SIZE = (224, 224)
normalize = transforms.Normalize(mean=[0.5752, 0.4495, 0.4012],
                                    std=[0.2086, 0.1911, 0.1827])
train_transform=transforms.Compose([
    transforms.Resize(IMAGE_SIZE),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    normalize,
    transforms.RandomErasing(scale=(0.02, 0.25))
])
test_transform=transforms.Compose([
    transforms.Resize(IMAGE_SIZE),
    transforms.ToTensor(),
    normalize,
])

In [None]:
DATA_PATH = 'DATASET'
train_dataset = RAFDBDataset(root_dir=DATA_PATH, train=True, transform=train_transform)
test_dataset = RAFDBDataset(root_dir=DATA_PATH, train=False, transform=test_transform)

BATCH_SIZE = 32
train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2)
test_dataloader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)

Visualize the data to check if the load was successful.

In [None]:
import numpy as np
import matplotlib.pyplot as plt

class_names=['surprise', 'fear', 'disgust', 'happy', 'sad', 'anger', 'natural']

mean = np.array([0.5752, 0.4495, 0.4012])
std = np.array([0.2086, 0.1911, 0.1827])

def imshow_denormalize(axs, img, title):
    img_denorm = img * std[:, None, None] + mean[:, None, None]
    img_denorm = np.clip(img_denorm, 0, 1)
    axs.imshow(np.transpose(img_denorm, (1, 2, 0)))
    axs.set_title(title)

fig, axs = plt.subplots(1, 4, figsize=(20, 5))

show_index = [0, 500, 1000, 2000]
for i, index in enumerate(show_index):
    img, label = test_dataset[index]
    imshow_denormalize(axs[i], img, class_names[label])
    axs[i].axis('off')

plt.show()

## Init Model

ResNet-18 Model Structure

For a comprehensive description of the model, you may refer to this paper: https://openaccess.thecvf.com/content_cvpr_2016/papers/He_Deep_Residual_Learning_CVPR_2016_paper.pdf

In [None]:
class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_planes, planes, stride=1, downsample=None):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)
        self.downsample = downsample

    def forward(self, x):
        residual = x
        out = self.conv1(x)
        out = self.bn1(out)
        out = torch.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        if self.downsample is not None:
            residual = self.downsample(x)

        out += residual
        out = torch.relu(out)
        return out

class ResNet(nn.Module):
    def __init__(self, block, num_blocks, num_classes=1000):
        super(ResNet, self).__init__()
        self.in_planes = 64

        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
        self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)
        self.avg_pool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * block.expansion, num_classes)

    def _make_layer(self, block, planes, num_blocks, stride):
        downsample = None
        if stride != 1 or self.in_planes != planes * block.expansion:
            downsample = nn.Sequential(
                nn.Conv2d(self.in_planes, planes * block.expansion, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(planes * block.expansion),
            )

        layers = []
        layers.append(block(self.in_planes, planes, stride, downsample))
        self.in_planes = planes * block.expansion
        for _ in range(1, num_blocks):
            layers.append(block(self.in_planes, planes))

        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = torch.relu(x)
        x = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.avg_pool(x)
        x = x.view(-1, 512 * BasicBlock.expansion)
        x = self.fc(x)
        return x

def resnet18(pretrained=False, num_classes=7, pretrained_weights=None):
    model = ResNet(BasicBlock, [2, 2, 2, 2], num_classes)
    if pretrained:
      if pretrained_weights:
        #Load the provided pretrained model weights
        if os.path.isfile(pretrained_weights):
          pretrain_dict = torch.load(pretrained_weights, map_location=torch.device('cpu'))
        if 'state_dict' in pretrain_dict:
          pretrain_dict = pretrain_dict['state_dict']
        state_dict = {k.replace('module.', ''): v for k, v in pretrain_dict.items() if k.replace('module.', '') in model.state_dict()}
        state_dict.pop("fc.weight")
        state_dict.pop("fc.bias")
        model.load_state_dict(state_dict, strict=False)
        print('Pretrained Weights Loaded')
      else:
        #Load the ImageNet pretrained model weights
        pretrain_dict = model_zoo.load_url('https://download.pytorch.org/models/resnet18-5c106cde.pth')
        pretrain_dict.pop("fc.weight")
        pretrain_dict.pop("fc.bias")
        model.load_state_dict(pretrain_dict, strict=False)
        print('Pretrained Weights Loaded')
    return model

In [None]:
model = resnet18(pretrained=True, num_classes=7).to(device)

## Start Training

* `criterion`: initializes a loss function, Cross-entropy loss is commonly used for classification tasks. It measures the performance of the model's predictions against the actual labels and is particularly useful when the target labels are mutually exclusive.
* `optimizer`: creates an optimizer, which is an algorithm that adjusts the model's parameters to minimize the loss function. Here, the Adam optimizer is used.
* `scheduler`: creates a learning rate scheduler, which is used to adjust the learning rate over time during training. The `ExponentialLR` scheduler reduces the learning rate by a factor of gamma every epoch, which can help the model converge more effectively.

In [None]:
criterion = nn.CrossEntropyLoss()

optimizer = torch.optim.Adam(model.parameters(), lr=0.00005, weight_decay=1e-4)
scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.9)

The `train_epoch` function is designed to train a deep learning model for one epoch using a given training dataset. The function efficiently manages the training process, including forward and backward passes, parameter updates, and performance logging.

In [None]:
def train_epoch(model, train_dataloader, criterion, optimizer, epoch=0, log_eval=50):
  accs, losses = [], []
  start_time = time()
  for idx, (X, y) in enumerate(train_dataloader):
    start_time = time()
    X = X.to(device)
    y = y.to(device)
    model.train()
    optimizer.zero_grad()

    preds = model(X)
    loss = criterion(preds, y)
    losses.append(loss.item())

    loss.backward()
    optimizer.step()

    total_acc = (preds.argmax(1) == y).sum().item()
    acc = total_acc / y.size(0)
    accs.append(acc)
    end_iter_time = time() - start_time
    if idx % log_eval == 0:
      print(f"Iteration: {idx} | time {end_iter_time} | train acc: {acc} | train_loss: {loss}")
  epoch_acc = sum(accs) / len(accs)
  epoch_loss = sum(losses) / len(losses)
  return epoch_acc, epoch_loss

In [None]:
num_epoch = 20
acc_list = []
loss_list = []
for epoch in tqdm(range(1, num_epoch+1)):
  start_time = time()
  train_acc, train_loss = train_epoch(model, train_dataloader, criterion, optimizer, epoch)
  scheduler.step()
  end_epoch_time = time() - start_time
  print(f"End of epoch {epoch} | time {end_epoch_time} | train acc: {train_acc}\
  train_loss: {train_loss}")
  acc_list.append(train_acc)
  loss_list.append(train_loss)

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

sns.set()


fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(10, 4))

acc_line, = ax1.plot(range(1, len(acc_list) + 1), acc_list, label='Accuracy', color='blue', marker='o')
ax1.set_title('Training Accuracy')
ax1.set_xlabel('Epochs')
ax1.set_ylabel('Accuracy')
for i, acc in enumerate(acc_list):
    ax1.text(i + 1, acc, f'{acc:.2f}', ha='center', va='bottom', fontsize=8)
ax1.set_xticks(range(1, len(acc_list) + 1))
ax1.legend()

loss_line, = ax2.plot(range(1, len(loss_list) + 1), loss_list, label='Loss', color='red', marker='o')
ax2.set_title('Training Loss')
ax2.set_xlabel('Epochs')
ax2.set_ylabel('Loss')
for i, loss in enumerate(loss_list):
    ax2.text(i + 1, loss, f'{loss:.2f}', ha='center', va='top', fontsize=8)
ax2.set_xticks(range(1, len(loss_list) + 1))
ax2.legend()

plt.tight_layout()
plt.show()

Evaluate the model

In [None]:
def eval_epoch(model, test_dataloader):
    accs, losses = [], []
    for idx, (X, y) in enumerate(test_dataloader):
        X = X.to(device)
        y = y.to(device)

        model.eval()
        preds = model(X)

        total_acc = (preds.argmax(1)==y).sum().item()
        acc = total_acc / y.size(0)
        accs.append(acc)
    epoch_acc = sum(accs) / len(accs)
    return epoch_acc

In [None]:
model.eval()
test_acc = eval_epoch(model, test_dataloader)
print(test_acc)

## Transfer Learning

**As you may have observed, when we instantiate the ResNet-18 architecture, we utilize weights from a pre-trained model.**

Pretrained models play a significant role in the field of deep learning. They are trained on large-scale datasets to learn general feature representations that can be transferred to other tasks, reducing the time and data required to train new models from scratch.

In this section, we will intuitively demonstrate the effects brought by pre-trained models through a practical attempt.

First, download the ResNet-18 model weights pretrained with the Celeb-1M dataset, which are specifically optimized for face-related tasks.

In [None]:
import gdown

gdown.download('https://drive.google.com/uc?id=1e7FmEfTIB__ATpSw5oHz61N1-bTl0Dlk', './resnet18_celeb.pth')

Then we repeat the training process with the new model.

In [None]:
model2 = resnet18(pretrained=True, num_classes=7, pretrained_weights='./resnet18_celeb.pth').to(device)

In [None]:
optimizer2 = torch.optim.Adam(model2.parameters(), lr=0.00005, weight_decay=1e-4)
scheduler2 = torch.optim.lr_scheduler.ExponentialLR(optimizer2, gamma=0.9)

In [None]:
num_epoch = 10
acc_list2 = []
loss_list2 = []
for epoch in tqdm(range(1, num_epoch+1)):
  start_time = time()
  train_acc, train_loss = train_epoch(model2, train_dataloader, criterion, optimizer2, epoch)
  scheduler2.step()
  end_epoch_time = time() - start_time
  print(f"End of epoch {epoch} | time {end_epoch_time} | train acc: {train_acc}\
  train_loss: {train_loss}")
  acc_list2.append(train_acc)
  loss_list2.append(train_loss)

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

sns.set()


fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(10, 4))

acc_line, = ax1.plot(range(1, len(acc_list2) + 1), acc_list2, label='Accuracy', color='blue', marker='o')
ax1.set_title('Training Accuracy')
ax1.set_xlabel('Epochs')
ax1.set_ylabel('Accuracy')
for i, acc in enumerate(acc_list2):
    ax1.text(i + 1, acc, f'{acc:.2f}', ha='center', va='bottom')
ax1.set_xticks(range(1, len(acc_list2) + 1))
ax1.legend()

loss_line, = ax2.plot(range(1, len(loss_list2) + 1), loss_list2, label='Loss', color='red', marker='o')
ax2.set_title('Training Loss')
ax2.set_xlabel('Epochs')
ax2.set_ylabel('Loss')
for i, loss in enumerate(loss_list2):
    ax2.text(i + 1, loss, f'{loss:.2f}', ha='center', va='top')
ax2.set_xticks(range(1, len(loss_list2) + 1))
ax2.legend()

plt.tight_layout()
plt.show()

In [None]:
model2.eval()
test_acc2 = eval_epoch(model2, test_dataloader)
print(test_acc2)

As you can see, we can achieve a very large performance improvement just by changing the pre-trained model without making any other changes. This is because the features pre-trained on Celeb-1M are better suited to the task of facial expression recognition. Therefore, when training the model, it is very important to choose a suitable pre-training model weights.

## **Notice**

Save the trained model weights using the following code and **move it to your Google Drive**. We will use this model in the next lab.

In [None]:
torch.save(model2, 'fer_resnet18.pth')

## Visualize Test Result

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import torch

class_names = ['surprise', 'fear', 'disgust', 'happy', 'sad', 'anger', 'natural']

# Select the samples from the test set
indices = [0, 400, 800, 1800]
test_images = [test_dataset[i][0] for i in indices]
labels = [test_dataset[i][1] for i in indices]

mean = np.array([0.5752, 0.4495, 0.4012])
std = np.array([0.2086, 0.1911, 0.1827])

fig, axs = plt.subplots(1, 4, figsize=(20, 5))

for i, (test_image, label) in enumerate(zip(test_images, labels)):
    img_denorm = test_image * std[:, None, None] + mean[:, None, None]
    img_denorm = np.clip(img_denorm, 0, 1)

    axs[i].imshow(np.transpose(img_denorm, (1, 2, 0)))
    axs[i].set_title(class_names[label])
    axs[i].axis('off')

    image = test_image.to(device)

    with torch.no_grad():
        image = image.unsqueeze(0)
        pred = model(image)
        predicted_class = pred.argmax(1).item()

    print(f"Image {indices[i]} - True label: {class_names[label]}, Predicted label: {class_names[predicted_class]}")

plt.show()

## Custom Data Test

Replace the path with your own data to test the result.

In [None]:
class_names=['surprise', 'fear', 'disgust', 'happy', 'sad', 'anger', 'natural']

test_img_path = 'PATH_TO_YOUR_DATA'
test_image = Image.open(test_img_path).convert('RGB')
test_image = test_transform(test_image)

mean = np.array([0.5752, 0.4495, 0.4012])
std = np.array([0.2086, 0.1911, 0.1827])
img_denorm = test_image * std[:, None, None] + mean[:, None, None]
img_denorm = np.clip(img_denorm, 0, 1)

plt.imshow(np.transpose(img_denorm, (1, 2, 0)))

image = test_image.to(device)

with torch.no_grad():
    image = image.unsqueeze(0)
    pred = model(image)
    predicted_class = pred.argmax(1).item()

print(predicted_class)
print(f"Predicted class name: {class_names[predicted_class]}")

# **Exercise**

Now that you've learned from the case study, it's your turn to try training a network yourself

Using what you learned from the case study, try training with the following dataset: https://www.kaggle.com/datasets/zawarkhan69/human-facial-expression-dataset

Instructions

* Download and visualize statistics about the dataset
* Create a suitable dataloader for the dataset
* Set a proper `transform` and training hyperparameters
* Train and evaluate the model