## !! The final result should be only a runnable .py file !!

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
from tqdm import tqdm
from matplotlib import pyplot as plt
import pickle
import numpy as np
from torch.utils.data import random_split

# 0. Data Pre-processing

In [None]:
data_transforms = transforms.Compose([
    transforms.Grayscale(num_output_channels=1),  # turn the graph to single color channel
    transforms.Resize((227, 227)), # resize to 227 * 227 because we use AlexNet
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485], std=[0.229])  # normalize
])

train_dataset = datasets.ImageFolder(
    '../dataset/train', transform=data_transforms)
# split training set to training set and validation set
# set a random seed to ensure reproducibility of results.
torch.manual_seed(42)
train_size = int(0.8 * len(train_dataset))
val_size = len(train_dataset) - train_size
train_dataset, val_dataset = random_split(train_dataset, [train_size, val_size])

test_dataset = datasets.ImageFolder('../dataset/test', transform=data_transforms)


train_loader = DataLoader(train_dataset, batch_size=1024, shuffle=True, num_workers=8)
val_loader = DataLoader(val_dataset, batch_size=512,shuffle=False, num_workers=8)
test_loader = DataLoader(test_dataset, batch_size=512, shuffle=False, num_workers=8)

# 1. Model

In [None]:
# todo: we should compare the optimal version with the previous ones
# version optimal
# reference: AlexNet
class EmotionCNN(nn.Module):
    def __init__(self, num_classes):
        super(EmotionCNN, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(in_channels=1, out_channels=96, kernel_size=11, stride=4, padding=2), 
            # out_channels is decided by # of filters
            # batch_size doesn't show here and is different from in_channels.
            nn.BatchNorm2d(96),
            nn.ReLU(inplace=True), # inplace: override
            nn.MaxPool2d(kernel_size=3, stride=2),

            nn.Conv2d(96, 256, kernel_size=5, padding=2),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),

            nn.Conv2d(256, 384, kernel_size=3, padding=1),
            nn.BatchNorm2d(384),
            nn.ReLU(inplace=True),

            nn.Conv2d(384, 384, kernel_size=3, padding=1),
            nn.BatchNorm2d(384),
            nn.ReLU(inplace=True),

            nn.Conv2d(384, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),
            # output shape: (batch_size, channels = 256, height = 6, width = 6)
        )
        self.avgpool = nn.AdaptiveAvgPool2d((6, 6))
        self.flatten = nn.Flatten(1) 
        # flatten from channel, ex: [batch_size, channels(1), height, width] -> [batch_size, channels * height * width]
        self.classifier = nn.Sequential(
            nn.Dropout(),
            nn.Linear(256 * 6 * 6, 4096),
            nn.BatchNorm1d(4096),
            nn.ReLU(inplace=True),
            nn.Dropout(),
            nn.Linear(4096, 4096),
            nn.BatchNorm1d(4096),
            nn.ReLU(inplace=True),
            nn.Dropout(),
            nn.Linear(4096, 4096),
            nn.BatchNorm1d(4096),
            nn.ReLU(inplace=True),
            nn.Dropout(),
            nn.Linear(4096, 4096),
            nn.BatchNorm1d(4096),
            nn.ReLU(inplace=True),
            nn.Linear(4096, num_classes),
        )

    def forward(self, x):
        x = self.features(x)
        x = self.avgpool(x)
        x = self.flatten(x)
        x = self.classifier(x)
        return x # the probability of 7 emotion class
    

# initialize model, loss-function and optimizer
model = EmotionCNN(num_classes=7)  # FER-2013 has 7 emotion class
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.9)

In [None]:
# select device
if torch.cuda.is_available():
    device = torch.device("cuda:0")
    print("using cuda")
#elif torch.backends.mps.is_available():
#    device = torch.device("mps")
#    print("using mac mps")
else:
    device = torch.device("cpu")
    print("using cpu")

In [None]:
# training model
num_epochs = 500
model.to(device)

# some data structures to record data
loss_history = []

# early stopping variables
early_stopping_patience = 10 # patience for telerating epochs witout improvment
early_stopping_counter = 0
best_val_loss = float('inf')
best_model_state = None

# progress bar
process = tqdm(range(num_epochs), bar_format='{l_bar}{bar:25}{r_bar}{bar:-25b}', colour='green', ascii='░▒█', unit='epoch')

for epoch in process:
    model.train()
    running_loss = 0.0
    
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    
    avg_loss = f"{(running_loss / len(train_loader)):5f}"
    loss_history.append(avg_loss)
    process.set_description(f"loss[-5:] = {loss_history[-5:]}")
    # print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {running_loss / len(train_loader)}")

    ## ===  Early Stopping  === ##
    # evaluate the model's performance on the validation set after each epoch
    model.eval()
    val_loss = 0.0

    with torch.no_grad():
        for inputs, labels in val_loader:  # load val_loader to evaluate training performance
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            val_loss += loss.item()

    val_loss /= len(val_loader)

    # to see whether the performance of this epoch is better than the privious one or not
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        best_model_state = model.state_dict()
        early_stopping_counter = 0
    else:
        early_stopping_counter += 1

    # if there is no improvement in performance for early_stopping_patience consecutive epochs, then stop training.
    if early_stopping_counter >= early_stopping_patience:
        print("Early stopping: No improvement in validation loss for {} epochs.".format(
            early_stopping_patience))
        break


In [None]:
# save the pth file
torch.save(best_model_state, '8FC 00025LR emotion_cnn.pth')

with open('train_loss_history_8FC.pkl', 'wb') as f:
    pickle.dump(loss_history, f)

In [None]:
print(loss_history)

In [None]:
# evaluate model

model = EmotionCNN(num_classes=7)
model.load_state_dict(torch.load('emotion_cnn.pth'))
model.to(device)
model.eval()

correct = 0
total = 0

with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        _, predicted = torch.max(outputs.data, 1) # predicted is the emotion index
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

accuracy = 100 * correct / total
print(f"Test Accuracy: {accuracy}%")
