# Setup

## Check GPU

In [None]:
!nvidia-smi -L

## Install dependencies

In [None]:
!pip install --upgrade numpy pandas scikit-learn matplotlib seaborn tqdm datetime

In [None]:
!pip install --upgrade torch torchtext torchvision torchaudio

In [None]:
!pip install --upgrade pytorch-lightning

In [None]:
!pip install --upgrade fastai

## Imports

In [None]:
# DL
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
import torchvision.transforms as transforms
from fastai.layers import SelfAttention

# PyTorch training framework
import pytorch_lightning as pl
from pytorch_lightning.metrics.functional import accuracy
from pytorch_lightning.callbacks.early_stopping import EarlyStopping
from pytorch_lightning.callbacks import ModelCheckpoint
from pytorch_lightning import loggers as pl_loggers

# Preprocessing
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
from torch.utils.data import WeightedRandomSampler

# CV utilities
from PIL import Image
import cv2
import deepface
from deepface.commons import functions

# Metrics
from sklearn.metrics import classification_report
from sklearn.metrics import confusion_matrix
from sklearn.metrics import ConfusionMatrixDisplay

# Plotting utilities
from matplotlib import pyplot as plt
import seaborn as sns

# General utilities
import os
import sys
import warnings
from tqdm.notebook import tqdm
from datetime import datetime

In [None]:
warnings.filterwarnings("ignore")
# device = "cpu"
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

# Data acquisition

In [None]:
# # The Kaggle API client expects this file to be in ~/.kaggle,
# # so move it there.
# !mkdir -p ~/.kaggle
# !cp "/content/drive/MyDrive/Colab Notebooks/kaggle.json" ~/.kaggle/

# # This permissions change avoids a warning on Kaggle tool startup.
# !chmod 600 ~/.kaggle/kaggle.json

In [None]:
# # Let's make sure the kaggle.json file is present.
# !ls -lha ~/.kaggle/

In [None]:
# # List available datasets.
# !kaggle datasets list

In [None]:
# # Copy the stackoverflow data set locally.
# !kaggle datasets download -d deadskull7/fer2013

In [None]:
# !unzip fer2013.zip

In [None]:
# fer = pd.read_csv("./fer2013.csv")

In [None]:
# fer.to_csv("/content/drive/MyDrive/Colab Notebooks/FER.csv", index=False)

In [None]:
fer = pd.read_csv("./FER.csv")

In [None]:
# emotion_mapping = {0: 'Anger', 1: 'Disgust', 2: 'Fear', 
#                    3: 'Happiness', 4: 'Sadness', 5: 'Surprise', 6: 'Neutrality'}

# df["label"] = df.apply(lambda row: emotion_mapping[row["emotion"]], axis=1)
# df = df.drop(["emotion"], axis=1)

In [None]:
# df

In [None]:
# !git clone https://github.com/microsoft/FERPlus.git

In [None]:
ferplus = pd.read_csv("./FERPlus/fer2013new.csv")

In [None]:
def clean_data(fer, ferplus):
    # drop usage and emotion in fer
    fer = fer.drop(["Usage"], axis=1)
    # concatenate
    df = pd.concat([fer, ferplus], axis=1)
    
    # keep ferplus labels
    df["label"] = df[["neutral", "happiness", "surprise", "sadness", "anger", "disgust", "fear", "contempt", "unknown", "NF"]].idxmax(axis=1)
    df = df[["pixels", "Usage", "label"]]

    # get rid of ambiguous faces
    df = df.drop(df[df["label"] == "NF"].index)

    # get rid of unknown/rare emotion
    df = df.drop(df[df["label"] == "contempt"].index)
    df = df.drop(df[df["label"] == "unknown"].index)

    df.reset_index(inplace=True, drop=True)

    return df

In [None]:
df = clean_data(fer, ferplus)

In [None]:
df.to_csv("./FER+.csv")

# Data exploration

In [None]:
n_images = df.shape[0]
height = int(np.sqrt(len(df["pixels"][0].split()))) 
width = int(height)

In [None]:
fig, axs = plt.subplots(figsize=(10, 10), nrows=3, ncols=3)
for i in range(9): 
    h = i//3
    w = i % 3
    idx = np.random.randint(0, n_images)
    img_arr = np.fromstring(df["pixels"][idx], dtype=int, sep=' ').reshape(height, width)
    
    axs[h,w].imshow(img_arr, interpolation='none', cmap='gray')
    axs[h,w].set_title(df["label"][idx])

In [None]:
df["label"].value_counts()

In [None]:
num_classes = df["label"].nunique()
num_datapoints = df["label"].count()
print(f"There are {num_classes} different emotion classes across {num_datapoints} datapoints")

In [None]:
with sns.axes_style('darkgrid'):
    fig, ax = plt.subplots(figsize=(10,6))
    sns.countplot(data = df, x = 'label', order = df['label'].value_counts().index,ax=ax)
    
    for p in ax.patches:
        x=p.get_bbox().get_points()[:,0]
        y=p.get_bbox().get_points()[1,1]
        ax.annotate('{:d}'.format(p.get_height()), (x.mean(), y), ha='center', va='bottom')
    
    fig.show()

# Data preparation

In [None]:
le = LabelEncoder()
le.fit(df["label"].to_numpy())

In [None]:
def prepare_data(data, label_encoder):
    width = 48
    height = 48
    X = np.zeros((len(data), height, width), dtype=np.uint8)

    y = le.transform(data["label"].to_numpy())
    y = torch.Tensor(y).to(torch.long)

    for i, row in enumerate(data.index):
        pixels = np.fromstring(data['pixels'][row], dtype=int, sep=' ')
        image = np.asarray(pixels).reshape(48, 48)
        image = image.astype(np.uint8)
        X[i] = image
        # X[i] = np.expand_dims(image, -1)

    return X, y

In [None]:
X_train, y_train    = prepare_data(df[df['Usage'] == 'Training'], le)
X_val, y_val        = prepare_data(df[df['Usage'] == 'PrivateTest'], le)
X_test, y_test      = prepare_data(df[df['Usage'] == 'PublicTest'], le)

In [None]:
print(X_train.shape, X_train.dtype)
print(y_train.shape, y_train.dtype)

print("\n\n")

print(X_val.shape, X_val.dtype)
print(y_val.shape, y_val.dtype)

print("\n\n")

print(X_test.shape, X_test.dtype)
print(y_test.shape, y_test.dtype)

## Data augmentation

### Small

In [None]:
train_transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(0.2*np.pi),
    # transforms.RandomPerspective(distortion_scale=0.2),
    transforms.ToTensor(),
    # transforms.Normalize((0.5,),(0.5,))
])

In [None]:
test_transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.ToTensor(),
    # transforms.Normalize((0.5,),(0.5,))
])

### N-crops

In [None]:
#  mu, st = 0, 255

In [None]:
# train_transform = transforms.Compose([
#     transforms.ToPILImage(),
#     transforms.RandomResizedCrop(48, scale=(0.8, 1.2)),
#     transforms.RandomApply([transforms.RandomAffine(0, translate=(0.2, 0.2))], p=0.5),
#     transforms.RandomHorizontalFlip(),
#     transforms.RandomApply([transforms.RandomRotation(10)], p=0.5),
#     transforms.TenCrop(40),
#     transforms.Lambda(lambda crops: torch.stack([transforms.ToTensor()(crop) for crop in crops])),
#     transforms.Lambda(lambda tensors: torch.stack([transforms.Normalize(mean=(mu,), std=(st,))(t) for t in tensors])),
#     transforms.Lambda(lambda tensors: torch.stack([transforms.RandomErasing(p=0.5)(t) for t in tensors])),
# ])

In [None]:
# test_transform = transforms.Compose([
#     transforms.ToPILImage(),
#     transforms.TenCrop(40),
#     transforms.Lambda(lambda crops: torch.stack([transforms.ToTensor()(crop) for crop in crops])),
#     transforms.Lambda(lambda tensors: torch.stack([transforms.Normalize(mean=(mu,), std=(st,))(t) for t in tensors])),
# ])

## Datasets

In [None]:
class MyDataset(Dataset):
    def __init__(self, images, labels, transform=None, augment=False):
        self.images = images
        self.labels = labels
        self.transform = transform

        self.augment = augment

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        img = self.images[idx]
        if self.transform:
            img = self.transform(img)

        label = self.labels[idx]
        sample = (img, label)

        return sample

In [None]:
train_set   = MyDataset(X_train, y_train, train_transform)
val_set     = MyDataset(X_val, y_val, test_transform)
test_set    = MyDataset(X_test, y_test, test_transform)

## Dataloaders

In [None]:
# batch_size = 64
batch_size = 128

In [None]:
train_loader      = DataLoader(train_set, batch_size=batch_size, shuffle=True)
val_loader      = DataLoader(val_set, batch_size=batch_size, shuffle=True)
test_loader     = DataLoader(test_set, batch_size=batch_size, shuffle=False)

In [None]:
inputs, labels = next(iter(train_loader))
print(inputs.shape, labels.shape)

unique, counts = np.unique(labels, return_counts=True)
print(np.asarray((le.classes_[unique], counts)).T)

In [None]:
for batch in test_loader:
    inputs, labels = batch
    print(inputs.shape)
    print(labels.shape)
    break

In [None]:
class BalancedSampler(WeightedRandomSampler):
    def __init__(self, dataset):
        y = dataset.labels
        
        class_sample_count = np.array([len(np.where(y==t)[0]) for t in np.unique(y)])
        weight = 1. / class_sample_count
        samples_weight = np.array([weight[t] for t in y])

        samples_weight = torch.from_numpy(samples_weight)
        samples_weight = samples_weight.to(torch.double)
        
        # sampler = WeightedRandomSampler(samples_weight, len(samples_weight))
        super().__init__(samples_weight, len(samples_weight))


In [None]:
train_loader    = DataLoader(train_set, batch_size=batch_size, sampler=BalancedSampler(train_set))
val_loader      = DataLoader(val_set, batch_size=batch_size, sampler=BalancedSampler(val_set))
test_loader     = DataLoader(test_set, batch_size=batch_size, shuffle=False)

In [None]:
inputs, labels = next(iter(train_loader))
print(inputs.shape, labels.shape)

unique, counts = np.unique(labels, return_counts=True)
print(np.asarray((le.classes_[unique], counts)).T)

## Sampling

In [None]:
sample = next(iter(train_loader))
X, y = sample
Xi = X[0,...]
yi = y[0]

img = torch.squeeze(Xi)
img = img.numpy()
plt.imshow(img, cmap='gray')
plt.title(le.classes_[yi])
plt.show()

In [None]:
sample = next(iter(val_loader))
X, y = sample
Xi = X[0,...]
yi = y[0]

img = torch.squeeze(Xi)
img = img.numpy()
plt.imshow(img, cmap='gray')
plt.title(le.classes_[yi])
plt.show()

# Model definition

In [None]:
class MyLightningModule(pl.LightningModule):
    def __init__(self, Ncrops=False):
        super().__init__()
        self.Ncrops = Ncrops

    def training_step(self, batch, batch_idx):
        inputs, labels = batch

        if self.Ncrops:
            # fuse crops and batchsize
            bs, ncrops, c, h, w = inputs.shape
            inputs = inputs.view(-1, c, h, w)
            # repeat labels ncrops times
            labels = torch.repeat_interleave(labels, repeats=ncrops, dim=0)

        logits = self(inputs)
        loss = F.cross_entropy(logits, labels)

        self.log('train_loss', loss)
        return loss

    def validation_step(self, batch, batch_idx):
        inputs, labels = batch

        if self.Ncrops:
            # fuse crops and batchsize
            bs, ncrops, c, h, w = inputs.shape
            inputs = inputs.view(-1, c, h, w)
            # forward
            logits = self(inputs)
            # combine results across the crops
            logits = logits.view(bs, ncrops, -1)
            logits = torch.sum(logits, dim=1) / ncrops
        else:
            logits = self(inputs)

        loss = F.cross_entropy(logits, labels)
        probs = F.log_softmax(logits, dim=1)
        preds = torch.argmax(probs, dim=1)
        acc = accuracy(preds, labels)        
        
        metrics = {'val_acc': acc, 'val_loss': loss}
        self.log_dict(metrics)
        return metrics

    def test_step(self, batch, batch_idx):
        inputs, labels = batch

        if self.Ncrops:
            # fuse crops and batchsize
            bs, ncrops, c, h, w = inputs.shape
            inputs = inputs.view(-1, c, h, w)
            # forward
            logits = self(inputs)
            # combine results across the crops
            logits = logits.view(bs, ncrops, -1)
            logits = torch.sum(logits, dim=1) / ncrops
        else:
            logits = self(inputs)

        loss = F.cross_entropy(logits, labels)
        probs = F.log_softmax(logits, dim=1)
        preds = torch.argmax(probs, dim=1)
        acc = accuracy(preds, labels)     

        metrics = {'test_acc': acc, 'test_loss': loss}
        self.log_dict(metrics)
        return metrics

    def predict_step(self, batch, batch_idx, dataloader_idx):
        inputs, _ = batch
        
        logits = self(inputs)
        probs = F.log_softmax(logits, dim=1)
        preds = torch.argmax(probs, dim=1)

        return preds

## SimpleCNN

In [None]:
class SimpleCNN(MyLightningModule):
    def __init__(self, num_classes):
        super().__init__()

        self.conv1a = nn.Conv2d(in_channels=1, out_channels=10, kernel_size=3)
        self.conv1b = nn.Conv2d(10, out_channels=10, kernel_size=3)

        self.conv2a = nn.Conv2d(10, 10, 3)
        self.conv2b = nn.Conv2d(10, 10, 3)

        self.lin1 = nn.Linear(10 * 9 * 9, 50)
        self.lin2 = nn.Linear(50, num_classes)

        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.drop = nn.Dropout()

    def forward(self, x):
        # (1, 48, 48) -> (10, 46, 46)
        x = F.relu(self.conv1a(x))
        # (10, 46, 46) -> (10, 44, 44)
        x = F.relu(self.conv1b(x))
        # (10, 44, 44) -> (10, 22, 22)
        x = self.pool(x)

        # (10, 22, 22) -> (10, 20, 20)
        x = F.relu(self.conv2a(x))
        # (10, 20, 20) -> (10, 18, 18)
        x = F.relu(self.conv2b(x))
        # (10, 18, 18) -> (10, 9, 9)
        x = self.pool(x)
        x = self.drop(x)

        # (10, 9, 9) -> (10 * 9 * 9,)
        x = x.view(-1, 10 * 9 * 9)
        # (10 * 9 * 9,) -> (50,)
        x = F.relu(self.lin1(x))
        # (50,) -> (num_classes,)
        x = self.lin2(x)

        return x
    
    def configure_optimizers(self):
        # optimizer = torch.optim.Adam(self.parameters(), lr=0.001)
        optimizer = torch.optim.SGD(
            self.parameters(), lr=0.01, momentum=0.9, nesterov=True, 
            weight_decay=0.0001)
        
        scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
            optimizer, mode='max', factor=0.5, patience=2, verbose=False)
        
        return {
            'optimizer': optimizer,
            'lr_scheduler': scheduler,
            'monitor': 'val_acc'
        }

## Deep-Emotion

In [None]:
class DeepEmotion(MyLightningModule):
    def __init__(self, num_classes):
        '''
        https://github.com/omarsayed7/Deep-Emotion/blob/master/deep_emotion.py
        '''
        super().__init__()
        self.conv1 = nn.Conv2d(1,10,3)
        
        self.conv2 = nn.Conv2d(10,10,3)
        self.pool2 = nn.MaxPool2d(2,2)

        self.conv3 = nn.Conv2d(10,10,3)
        self.conv4 = nn.Conv2d(10,10,3)
        self.pool4 = nn.MaxPool2d(2,2)

        self.norm = nn.BatchNorm2d(10)

        self.fc1 = nn.Linear(810,50)
        self.fc2 = nn.Linear(50,num_classes)

        self.localization = nn.Sequential(
            nn.Conv2d(1, 8, kernel_size=7),
            nn.MaxPool2d(2, stride=2),
            nn.ReLU(True),
            nn.Conv2d(8, 10, kernel_size=5),
            nn.MaxPool2d(2, stride=2),
            nn.ReLU(True)
        )

        self.fc_loc = nn.Sequential(
            nn.Linear(640, 32),
            nn.ReLU(True),
            nn.Linear(32, 3 * 2)
        )
        self.fc_loc[2].weight.data.zero_()
        self.fc_loc[2].bias.data.copy_(torch.tensor([1, 0, 0, 0, 1, 0], dtype=torch.float))

    def stn(self, x):
        xs = self.localization(x)
        xs = xs.view(-1, 640)
        theta = self.fc_loc(xs)
        theta = theta.view(-1, 2, 3)

        grid = F.affine_grid(theta, x.size())
        x = F.grid_sample(x, grid)
        return x, grid

    def forward(self, x):
        x, _ = self.stn(x)

        x = F.relu(self.conv1(x))
        x = self.conv2(x)
        x = F.relu(self.pool2(x))

        x = F.relu(self.conv3(x))
        x = self.norm(self.conv4(x))
        x = F.relu(self.pool4(x))

        # out = F.dropout(out)
        x = x.view(-1, 810)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)

        return x

    def configure_optimizers(self):
        # optimizer = torch.optim.Adam(self.parameters(), lr=0.001)
        optimizer = torch.optim.SGD(
            self.parameters(), lr=0.01, momentum=0.9, nesterov=True, 
            weight_decay=0.0001)
        
        scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
            optimizer, mode='max', factor=0.5, patience=2, verbose=False)
        
        return {
            'optimizer': optimizer,
            'lr_scheduler': scheduler,
            'monitor': 'val_acc'
        }

## MultipleSelfAttention module

In [None]:
class MultipleSelfAttention(nn.Module):    
    def __init__(self, shape, num_heads):
        super().__init__()

        self.ch, self.h, self.w = shape
        self.n = num_heads

        n, ch, w, h = self.n, self.ch, self.w, self.h
        # hybrid attention
        self.attention = SelfAttention(self.n*self.ch)
        
        # weight learning
        self.conv = nn.Conv2d(in_channels=ch, out_channels=ch, kernel_size=1, stride=1)
        self.pool = nn.MaxPool2d(2, 2)
        h, w = h//2, w//2
        self.fc = nn.Linear(ch*h*w, num_heads)

    def forward(self, x):
        n, ch, w, h = self.n, self.ch, self.w, self.h
        
        # hybrid attention
        xh = x.repeat(1, n, 1, 1)
        xh = self.attention(xh)
        xh = xh.view(-1, n, ch, h, w) # (bs, N, ch, h, w)
        
        # weight learning
        # (bs, ch, h, w) -> (bs, ch, h, w)
        xs = self.conv(x) 
        # (bs, ch, h, w) -> (bs, ch, h/2, w/2)
        xs = self.pool(x) 
        # (bs, ch, h/2, w/2) -> (bs, ch*h/2*w/2)
        h, w = h//2, w//2
        xs = xs.view(-1, ch*h*w)
        # (bs, ch*h/2*w/2) -> (bs, n)
        xs = F.sigmoid(self.fc(xs))
        xs = F.normalize(xs, p=1, dim=1) # obtain probabilities

        # weighted sum
        x = torch.sum(torch.mul(xh, xs[:,:,None,None,None]), dim=1)

        return x

## Deep-Emotion-Attention

In [None]:
class DeepEmotionAttention(MyLightningModule):
    def __init__(self, num_classes):
        '''
        https://github.com/omarsayed7/Deep-Emotion/blob/master/deep_emotion.py
        '''
        super().__init__()
        self.conv1 = nn.Conv2d(1,10,3)
        
        self.conv2 = nn.Conv2d(10,10,3)
        self.pool2 = nn.MaxPool2d(2,2)

        self.conv3 = nn.Conv2d(10,10,3)
        self.conv4 = nn.Conv2d(10,10,3)
        self.pool4 = nn.MaxPool2d(2,2)

        self.norm = nn.BatchNorm2d(10)

        self.fc1 = nn.Linear(810,50)
        self.fc2 = nn.Linear(50,num_classes)

        self.localization = nn.Sequential(
            nn.Conv2d(1, 8, kernel_size=7),
            MultipleSelfAttention((8, 42, 42), 8),
            nn.MaxPool2d(2, stride=2),
            nn.ReLU(True),
            nn.Conv2d(8, 10, kernel_size=5),
            nn.MaxPool2d(2, stride=2),
            nn.ReLU(True)
        )

        self.fc_loc = nn.Sequential(
            nn.Linear(640, 32),
            nn.ReLU(True),
            nn.Linear(32, 3 * 2)
        )
        self.fc_loc[2].weight.data.zero_()
        self.fc_loc[2].bias.data.copy_(torch.tensor([1, 0, 0, 0, 1, 0], dtype=torch.float))

    def stn(self, x):
        xs = self.localization(x)
        xs = xs.view(-1, 640)
        theta = self.fc_loc(xs)
        theta = theta.view(-1, 2, 3)

        grid = F.affine_grid(theta, x.size())
        x = F.grid_sample(x, grid)
        return x, grid

    def forward(self, x):
        x, _ = self.stn(x)

        x = F.relu(self.conv1(x))
        x = self.conv2(x)
        x = F.relu(self.pool2(x))

        x = F.relu(self.conv3(x))
        x = self.norm(self.conv4(x))
        x = F.relu(self.pool4(x))

        # out = F.dropout(out)
        x = x.view(-1, 810)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)

        return x

    def configure_optimizers(self):
        # optimizer = torch.optim.Adam(self.parameters(), lr=0.001)
        optimizer = torch.optim.SGD(
            self.parameters(), lr=0.01, momentum=0.9, nesterov=True, 
            weight_decay=0.0001)
        
        scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
            optimizer, mode='max', factor=0.5, patience=2, verbose=False)
        
        return {
            'optimizer': optimizer,
            'lr_scheduler': scheduler,
            'monitor': 'val_acc'
        }

## VGGAttentionFace

In [None]:
class SpatialTransformer(nn.Module):
    def __init__(self):
        super().__init__()
        
        self.localization = nn.Sequential(
            nn.Conv2d(1, 8, kernel_size=7),
            nn.MaxPool2d(2, stride=2),
            nn.ReLU(True),
            nn.Conv2d(8, 10, kernel_size=5),
            nn.MaxPool2d(2, stride=2),
            nn.ReLU(True)
        )

        self.fc_loc = nn.Sequential(
            nn.Linear(640, 32),
            nn.ReLU(True),
            nn.Linear(32, 3 * 2)
        )
        self.fc_loc[2].weight.data.zero_()
        self.fc_loc[2].bias.data.copy_(torch.tensor([1, 0, 0, 0, 1, 0], dtype=torch.float))

    def forward(self, x):
        xs = self.localization(x)
        xs = xs.view(-1, 640)
        theta = self.fc_loc(xs)
        theta = theta.view(-1, 2, 3)

        grid = F.affine_grid(theta, x.size())
        x = F.grid_sample(x, grid)
        return x, grid

In [None]:
class VGGFaceAttention(MyLightningModule):
    def __init__(self, num_classes, Ncrops=False):
        super().__init__(Ncrops)

        self.conv1a = nn.Conv2d(in_channels=1, out_channels=64, kernel_size=3, padding='same')
        self.conv1b = nn.Conv2d(64, out_channels=64, kernel_size=3, padding='same')

        self.msa = MultipleSelfAttention((64, 24, 24), 8) # combine N attention heads

        self.conv2a = nn.Conv2d(64, 128, 3, padding='same')
        self.conv2b = nn.Conv2d(128, 128, 3, padding='same')

        self.conv3a = nn.Conv2d(128, 256, 3, padding='same')
        self.conv3b = nn.Conv2d(256, 256, 3, padding='same')

        self.conv4a = nn.Conv2d(256, 512, 3, padding='same')
        self.conv4b = nn.Conv2d(512, 512, 3, padding='same')

        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)

        self.bn1a = nn.BatchNorm2d(64)
        self.bn1b = nn.BatchNorm2d(64)

        self.bn2a = nn.BatchNorm2d(128)
        self.bn2b = nn.BatchNorm2d(128)

        self.bn3a = nn.BatchNorm2d(256)
        self.bn3b = nn.BatchNorm2d(256)

        self.bn4a = nn.BatchNorm2d(512)
        self.bn4b = nn.BatchNorm2d(512)

        self.lin1 = nn.Linear(512 * 3 * 3, 4096)
        self.lin2 = nn.Linear(4096, 4096)
        self.lin3 = nn.Linear(4096, num_classes)

        self.drop = nn.Dropout()

    def forward(self, x):
        # (1, 48, 48) -> (64, 24, 24)
        x = F.relu(self.bn1a(self.conv1a(x)))
        x = F.relu(self.bn1b(self.conv1b(x)))
        x = self.pool(x)

        # (64, 24, 24) -> (64, 24, 24)
        x = self.msa(x) # apply multiple self attention

        # (64, 24, 24) -> (128, 12, 12)
        x = F.relu(self.bn2a(self.conv2a(x)))
        x = F.relu(self.bn2b(self.conv2b(x)))
        x = self.pool(x)

        # (128, 12, 12) -> (256, 6, 6)
        x = F.relu(self.bn3a(self.conv3a(x)))
        x = F.relu(self.bn3b(self.conv3b(x)))
        x = self.pool(x)

        # (256, 6, 6) -> (512, 3, 3)
        x = F.relu(self.bn4a(self.conv4a(x)))
        x = F.relu(self.bn4b(self.conv4b(x)))
        x = self.pool(x)

        x = x.view(-1, 512 * 3 * 3)
        x = F.relu(self.drop(self.lin1(x)))
        x = F.relu(self.drop(self.lin2(x)))
        x = self.lin3(x)

        return x

    def configure_optimizers(self):
        # optimizer = torch.optim.Adam(self.parameters(), lr=0.001)
        optimizer = torch.optim.SGD(
            self.parameters(), lr=0.01, momentum=0.9, nesterov=True, 
            weight_decay=0.0001)
        
        scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
            optimizer, mode='max', factor=0.5, patience=2, verbose=False)
        
        return {
            'optimizer': optimizer,
            'lr_scheduler': scheduler,
            'monitor': 'val_acc'
        }

## VGGFaceAttentionSTN

In [None]:
class SpatialTransformer(nn.Module):
    def __init__(self):
        super().__init__()
        
        self.localization = nn.Sequential(
            # (64, 24, 24) -> (64, 20, 20)
            nn.Conv2d(64, 8*64, kernel_size=5),
            # (64, 20, 20) -> (64, 10, 10)
            nn.MaxPool2d(2, stride=2),
            nn.ReLU(True),
            # (64, 10, 10) -> (64, 6, 6)
            nn.Conv2d(8*64, 10*64, kernel_size=5),
            # (64, 6, 6) -> (64, 3, 3)
            nn.MaxPool2d(2, stride=2),
            nn.ReLU(True)
        )

        self.fc_loc = nn.Sequential(
            nn.Linear(64*10*3*3, 64),
            nn.ReLU(True),
            nn.Linear(64, 3 * 2)
        )
        self.fc_loc[2].weight.data.zero_()
        self.fc_loc[2].bias.data.copy_(torch.tensor([1, 0, 0, 0, 1, 0], dtype=torch.float))

    def forward(self, x):
        xs = self.localization(x)
        xs = xs.view(-1, 64*10*3*3)
        theta = self.fc_loc(xs)
        theta = theta.view(-1, 2, 3)

        grid = F.affine_grid(theta, x.size())
        x = F.grid_sample(x, grid)
        return x, grid

In [None]:
class VGGFaceAttentionSTN(MyLightningModule):
    def __init__(self, num_classes, Ncrops=False):
        super().__init__(Ncrops)

        
        self.conv1a = nn.Conv2d(in_channels=1, out_channels=64, kernel_size=3, padding='same')
        self.conv1b = nn.Conv2d(64, out_channels=64, kernel_size=3, padding='same')
        
        self.stn = SpatialTransformer()
        self.msa = MultipleSelfAttention((64, 24, 24), 8) # combine N attention heads

        self.conv2a = nn.Conv2d(64, 128, 3, padding='same')
        self.conv2b = nn.Conv2d(128, 128, 3, padding='same')

        self.conv3a = nn.Conv2d(128, 256, 3, padding='same')
        self.conv3b = nn.Conv2d(256, 256, 3, padding='same')

        self.conv4a = nn.Conv2d(256, 512, 3, padding='same')
        self.conv4b = nn.Conv2d(512, 512, 3, padding='same')

        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)

        self.bn1a = nn.BatchNorm2d(64)
        self.bn1b = nn.BatchNorm2d(64)

        self.bn2a = nn.BatchNorm2d(128)
        self.bn2b = nn.BatchNorm2d(128)

        self.bn3a = nn.BatchNorm2d(256)
        self.bn3b = nn.BatchNorm2d(256)

        self.bn4a = nn.BatchNorm2d(512)
        self.bn4b = nn.BatchNorm2d(512)

        self.lin1 = nn.Linear(512 * 3 * 3, 4096)
        self.lin2 = nn.Linear(4096, 4096)
        self.lin3 = nn.Linear(4096, num_classes)

        self.drop = nn.Dropout()

    def forward(self, x):
        # (1, 48, 48) -> (64, 24, 24)
        x = F.relu(self.bn1a(self.conv1a(x)))
        x = F.relu(self.bn1b(self.conv1b(x)))
        x = self.pool(x)

        # (64, 24, 24) -> (64, 24, 24)
        x, _ = self.stn(x)
        x = self.msa(x) # apply multiple self attention

        # (64, 24, 24) -> (128, 12, 12)
        x = F.relu(self.bn2a(self.conv2a(x)))
        x = F.relu(self.bn2b(self.conv2b(x)))
        x = self.pool(x)

        # (128, 12, 12) -> (256, 6, 6)
        x = F.relu(self.bn3a(self.conv3a(x)))
        x = F.relu(self.bn3b(self.conv3b(x)))
        x = self.pool(x)

        # (256, 6, 6) -> (512, 3, 3)
        x = F.relu(self.bn4a(self.conv4a(x)))
        x = F.relu(self.bn4b(self.conv4b(x)))
        x = self.pool(x)

        x = x.view(-1, 512 * 3 * 3)
        x = F.relu(self.drop(self.lin1(x)))
        x = F.relu(self.drop(self.lin2(x)))
        x = self.lin3(x)

        return x

    def configure_optimizers(self):
        # optimizer = torch.optim.Adam(self.parameters(), lr=0.001)
        optimizer = torch.optim.SGD(
            self.parameters(), lr=0.01, momentum=0.9, nesterov=True, 
            weight_decay=0.0001)
        
        scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
            optimizer, mode='max', factor=0.5, patience=2, verbose=False)
        
        return {
            'optimizer': optimizer,
            'lr_scheduler': scheduler,
            'monitor': 'val_acc'
        }


# Model training

## Model creation

In [None]:
model = SimpleCNN(num_classes)

In [None]:
# model = DeepEmotion(num_classes)

In [None]:
# model = DeepEmotionAttention(num_classes)

In [None]:
# model = VGGFaceAttention(num_classes)

In [None]:
# model = VGGFaceAttentionSTN(num_classes)

## Callbacks

In [None]:
%load_ext tensorboard
!rm -rf ./logs
%tensorboard --logdir logs

In [None]:
num_epochs = 300

tb_logger = pl_loggers.TensorBoardLogger('./logs/')

early_stop_callback = EarlyStopping(
   monitor='val_acc',
   min_delta=0.00,
   patience=num_epochs/10,
   verbose=False,
   mode='max'
)

## Training loop (PL)

In [None]:
# trainer = pl.Trainer(max_epochs=num_epochs, gpus=-1, logger=tb_logger, callbacks=[early_stop_callback])
trainer = pl.Trainer(max_epochs=num_epochs, gpus=-1, callbacks=[early_stop_callback])
# trainer = pl.Trainer(max_epochs=num_epochs, gpus=-1, logger=tb_logger)
# trainer = pl.Trainer(max_epochs=num_epochs, gpus=-1)

In [None]:
trainer.fit(model, train_dataloader=train_loader, val_dataloaders=val_loader)

## Training loop (Custom)

In [None]:
# num_epochs = 100
# optimizer = torch.optim.Adam(model.parameters(), lr=0.005, weight_decay=0.0001)
# criterion = nn.CrossEntropyLoss()

# model = model.to(device)
# model.train()

# for epoch in range(num_epochs):
#     try:
#         with tqdm(train_loader, unit="batch", leave=False) as tepoch:
#             for batch in tepoch:
                
#                 tepoch.set_description(f"Epoch {epoch}")

                
#                 inputs, labels = batch
#                 inputs, labels = inputs.to(device), labels.to(device)

#                 optimizer.zero_grad()
                
#                 logits = model(inputs)
#                 loss = criterion(logits, labels)
#                 loss.backward()
#                 optimizer.step()
            
#                 tepoch.set_postfix(loss=loss.item())
#     except KeyboardInterrupt:
#         break

# print('Finished Training')

## Save model

In [None]:
state_dict_dir = "/content/drive/MyDrive/Colab Notebooks/weights/"

state_dict_name = f"{model.__class__.__name__}_{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.pt"

state_dict_path = os.path.join(state_dict_dir, state_dict_name)
print(state_dict_path)

In [None]:
torch.save(model.state_dict(), state_dict_path)

# Model evaluation

In [None]:
state_dict_dir = "./"
for f in os.listdir(state_dict_dir):
    print(f)

In [None]:
# state_dict_name = "DeepEmotion_2021-07-10_15-10-43.pt"
# state_dict_name = "DeepEmotionAttention_2021-07-11_08-25-46.pt"
# state_dict_name = "VGGFaceAttention_2021-07-09_10-25-38.pt"
state_dict_name = "VGGFaceAttentionSTN_2021-07-11_15-16-48.pt"

state_dict_path = os.path.join(state_dict_dir, state_dict_name)
print(state_dict_path)

In [None]:
# model = SimpleCNN(num_classes)
# model = DeepEmotion(num_classes)
# model = DeepEmotionAttention(num_classes)
# model = VGGFaceAttention(num_classes)
model = VGGFaceAttentionSTN(num_classes)


model.load_state_dict(torch.load(state_dict_path))

## Test set accuracy

In [None]:
tester = pl.Trainer(gpus=-1)

In [None]:
tester.test(model, test_loader)

## Classification report

In [None]:
y_preds = tester.predict(model, test_loader)

In [None]:
y_true = list()
y_pred = list()

for i,batch in enumerate(test_loader):
    inputs, labels = batch

    y_true += labels.tolist()
    y_pred += y_preds[i].tolist()

In [None]:
target_names = le.classes_
print(classification_report(y_true, y_pred, target_names=target_names))

## Confusion matrix

In [None]:
cm = confusion_matrix(y_true, y_pred, normalize='true')

# df_cm = pd.DataFrame(cm, target_names, target_names)
# plt.figure(figsize=(10,7))
# sns.set(font_scale=1.4) # for label size
# sns.heatmap(df_cm, annot=True, annot_kws={"size": 16}) # font size

fig, ax = plt.subplots(figsize=(10,10))
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=target_names)

disp.plot(ax=ax, cmap=plt.cm.Blues)

fig.show()

# from google.colab import files
# filename = "cm-deepemotion.svg"
# filename = "cm-deepemotion-attention.svg"
# filename = "cm-vggface-attention.svg"
filename = "cm-vggface-attention-stn.svg"
# plt.savefig(filename)
# files.download(filename) 

## Wrong predictions

In [None]:
X_wrong = []
y_wrong_true = []
y_wrong_pred = []
for idx, (y, y_hat) in enumerate(zip(y_true, y_pred)):
    if y != y_hat:
        X_wrong.append(test_set[idx][0])
        y_wrong_true.append(y)
        y_wrong_pred.append(y_hat)

In [None]:
fig, axs = plt.subplots(nrows=3, ncols=3, figsize=(10,10))


for i in range(axs.shape[0]):
    for j in range(axs.shape[1]):
        ax = axs[i,j]
        rand_idx = np.random.randint(0, len(X_wrong)-1)
        X, y, y_hat = X_wrong[rand_idx], y_wrong_true[rand_idx], y_wrong_pred[rand_idx]

        img = torch.squeeze(X)
        img = img.numpy()
        ax.imshow(img, cmap='gray')
        ax.set_title(f"true: {le.classes_[y]}, pred: {le.classes_[y_hat]}")

plt.show()

# Live demonstration

In [None]:
label_encoder = le

## Face detection and alignment

In [None]:
def detect_align_preprocess_face(img):    
    # detect and align
    face_img, region = functions.detect_face(
        img = img, 
        detector_backend = 'mtcnn', 
        grayscale = False, 
        enforce_detection = False, 
        align = True)

    if face_img.shape[0] == 0 or img.shape[1] == 0: # not detected
        face_img = None
        
    # preprocess
    if face_img is not None:
        # draw rectangle
        x = region[0]; y = region[1]
        w = region[2]; h = region[3]
        img = cv2.rectangle(img, (x,y), (x+w,y+h), (0,255,0), 4)

        face_img = np.squeeze(face_img)
        face_img = cv2.cvtColor(face_img, cv2.COLOR_BGR2GRAY)
        face_img = cv2.resize(face_img, (48, 48))
    
    return img, region, face_img

## Emotion prediction

In [None]:
def predict_emotion(model, img, transform, device):
    model.eval()
    model = model.to(device)
    
    img = transform(img)
    img = torch.unsqueeze(img, dim=0)
    img = img.to(device)
        
    with torch.no_grad():
        logits = model(img)
        probs = F.softmax(logits)
        # preds = torch.argmax(probs, dim=-1)
        # pred = preds.cpu().item()
    
    # return label_encoder.classes_[pred]
    return torch.squeeze(probs).cpu().numpy()

In [None]:
def display_emotion_predictions(img, region, emotion_probabilities, label_encoder):
    """
    https://github.com/serengil/deepface/blob/master/deepface/commons/realtime.py
    """
    mood_items = []
    emotion_labels = label_encoder.classes_
    sum_of_predictions = emotion_probabilities.sum()
    # print(emotion_probabilities.shape)
    for i in range(0, len(emotion_labels)):
        mood_item = []
        emotion_label = emotion_labels[i]
        emotion_prediction = 100 * emotion_probabilities[i] / sum_of_predictions
        mood_item.append(emotion_label)
        mood_item.append(emotion_prediction)
        mood_items.append(mood_item)

    emotion_df = pd.DataFrame(mood_items, columns = ["emotion", "score"])
    emotion_df = emotion_df.sort_values(by = ["score"], ascending=False).reset_index(drop=True)

    #background of mood box
    resolution_x = img.shape[1]; resolution_y = img.shape[0]
    overlay = img.copy()
    opacity = 0.4

    x = region[0]; y = region[1]
    w = region[2]; h = region[3]
    # pivot_img_size = 112
    pivot_img_size = 150
    # text_size = 70
    text_size = 100
    
    if x+w+pivot_img_size < resolution_x:
        #right
        img = cv2.rectangle(img
            #, (x+w,y+20)
            , (x+w,y)
            , (x+w+pivot_img_size, y+h)
            , (64,64,64),cv2.FILLED)

        img = cv2.addWeighted(overlay, opacity, img, 1 - opacity, 0, img)

    elif x-pivot_img_size > 0:
        #left
        img = cv2.rectangle(img
            #, (x-pivot_img_size,y+20)
            , (x-pivot_img_size,y)
            , (x, y+h)
            , (64,64,64),cv2.FILLED)

        img = cv2.addWeighted(overlay, opacity, img, 1 - opacity, 0, img)

    for index, instance in emotion_df.iterrows():
        emotion_label = "%s " % (instance['emotion'])
        emotion_score = instance['score']/100

        bar_x = 35 #this is the size if an emotion is 100%
        bar_x = int(bar_x * emotion_score)

        if x+w+pivot_img_size < resolution_x:

            text_location_y = y + 20 + (index+1) * 20
            text_location_x = x+w

            if text_location_y < y + h:
                img = cv2.putText(img, emotion_label, (text_location_x, text_location_y), 
                                  cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)

                img = cv2.rectangle(img
                    , (x+w+text_size, y + 13 + (index+1) * 20)
                    , (x+w+text_size+bar_x, y + 13 + (index+1) * 20 + 5)
                    , (255,255,255), cv2.FILLED)

        elif x-pivot_img_size > 0:

            text_location_y = y + 20 + (index+1) * 20
            text_location_x = x-pivot_img_size

            if text_location_y <= y+h:
                img = cv2.putText(img, emotion_label, (text_location_x, text_location_y), 
                            cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)

                img = cv2.rectangle(img
                    , (x-pivot_img_size+text_size, y + 13 + (index+1) * 20)
                    , (x-pivot_img_size+text_size+bar_x, y + 13 + (index+1) * 20 + 5)
                    , (255,255,255), cv2.FILLED)
    
    return img

## Face processing

In [None]:
def process_frame(frame):
    img, region, face_img = detect_align_preprocess_face(frame)
    
    if face_img is not None:
        
        emotion_probabilities = predict_emotion(model, face_img, test_transform, device)
        display_emotion_predictions(img, region, emotion_probabilities, label_encoder)
        
        # sys.stdout.write(prediction + '\r')
    else:
        face_img = np.zeros((48,48)).astype(np.uint8)
        
    # Display the resulting frame
    face_img = cv2.resize(face_img, (240, 240), interpolation = cv2.INTER_AREA)
    cv2.imshow('frame', img)
    cv2.imshow('face', face_img)
    

## Camera capture

In [None]:
try:
    cap = cv2.VideoCapture(0)
    if not cap.isOpened():
        print("Cannot open camera")
        exit()
    
    while True:
        # Capture frame-by-frame
        ret, frame = cap.read()
        # if frame is read correctly ret is True
        if not ret:
            print("Can't receive frame (stream end?). Exiting ...")
            break

        # Our operations on the frame come here    
        process_frame(frame)

        if cv2.waitKey(1) == ord('q'): # break by pressing 'q' key
            break
finally:
    # When everything done, release the capture
    cap.release()
    cv2.destroyAllWindows()