<h1><center> Multi-Branch Network for Imagery Emotion Prediction </center></h1>
<center> Using various source information, including faces, bodies, and scene contexts to predict both discrete and continuous emotions in an image</center>

<h1>Project context</h1>

aaa
bbb
ccc
ddd

# I. Connect to Google Drive:

In [1]:
# Linking Google drive to use preprocessed data
from google.colab import drive

# This will prompt for authorization.
drive.mount('/content/drive')
#/content/drive/My Drive//

Mounted at /content/drive


# II. General Import:

In [2]:
import matplotlib.pyplot as plt
import numpy as np
import os
from PIL import Image
import scipy.io
from sklearn.metrics import average_precision_score, precision_recall_curve

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchsummary import summary
from torchvision import transforms
import torchvision.models as models
from torch.optim.lr_scheduler import StepLR

print ('completed cell')

completed cell


In [3]:
#define global variables

isSwinT = False   #variable for checking if using SwinT backbone
isBFER = False     #variable for checking if using B-FER backbone
num_context_features = 0  #store number of features that Context Feature Extraction branch extract
num_body_features = 0     #store number of features that Body Feature Extraction branch extract
num_face_features = 0     #store number of features that Face Feature Extraction branch extract

#III. MODEL

The behind image shows the architecture of our proposed **Multi-Branch Network (MBN)**. The network is divided into two main parts. The first part extracts features from the body, the face, and context of the image, referred to as **body image**, **face image**, and **context image**. It consists of three branches to exploit emotions from subjects and background. We remark that the face region is extracted from the body image. The second part is a fusion network combining the features extracted from the three branches to predict the discrete emotions and VAD values of each person in the image.

<img src = "https://raw.githubusercontent.com/BaoNinh2808/Server-Client/main/Proposed%20Method.png" width = "90%">

Our proposed network consists of three feature extraction branches, utilizing different deep learning models trained on suitable datasets to efficiently make predictions on human and scene images.

You can see the picture to know about what backbone you can choose.

<img src = "https://raw.githubusercontent.com/BaoNinh2808/Server-Client/main/model_options.png" width = "90%">



**Let's Choose Backbone for Each Branch:** For each branch just choose one.

If you run automatically all cell, we will assign backbone for you. The default is:  
*   Context Branch: Resnet-50(Places365)
*   Body Branch:    SwinT(Retrain on Emotic)
*   Face Branch:    B-FER

The combination of these backbones also give the best result in all combinations.

## 1 . Body Branch:


### a. Resnet-18 (ImageNet weight):

In [93]:
model_body = models.resnet18(weights=models.ResNet18_Weights.IMAGENET1K_V1)
print (summary(model_body, (3,224,224), device="cpu"))

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1         [-1, 64, 112, 112]           9,408
       BatchNorm2d-2         [-1, 64, 112, 112]             128
              ReLU-3         [-1, 64, 112, 112]               0
         MaxPool2d-4           [-1, 64, 56, 56]               0
            Conv2d-5           [-1, 64, 56, 56]          36,864
       BatchNorm2d-6           [-1, 64, 56, 56]             128
              ReLU-7           [-1, 64, 56, 56]               0
            Conv2d-8           [-1, 64, 56, 56]          36,864
       BatchNorm2d-9           [-1, 64, 56, 56]             128
             ReLU-10           [-1, 64, 56, 56]               0
       BasicBlock-11           [-1, 64, 56, 56]               0
           Conv2d-12           [-1, 64, 56, 56]          36,864
      BatchNorm2d-13           [-1, 64, 56, 56]             128
             ReLU-14           [-1, 64,

In [94]:
num_body_features = list(model_body.children())[-1].in_features

### b. Resnet-50 (ImageNet weight)

In [95]:
model_body = models.resnet50(weights=models.ResNet50_Weights.IMAGENET1K_V1)
print (summary(model_body, (3,224,224), device="cpu"))

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1         [-1, 64, 112, 112]           9,408
       BatchNorm2d-2         [-1, 64, 112, 112]             128
              ReLU-3         [-1, 64, 112, 112]               0
         MaxPool2d-4           [-1, 64, 56, 56]               0
            Conv2d-5           [-1, 64, 56, 56]           4,096
       BatchNorm2d-6           [-1, 64, 56, 56]             128
              ReLU-7           [-1, 64, 56, 56]               0
            Conv2d-8           [-1, 64, 56, 56]          36,864
       BatchNorm2d-9           [-1, 64, 56, 56]             128
             ReLU-10           [-1, 64, 56, 56]               0
           Conv2d-11          [-1, 256, 56, 56]          16,384
      BatchNorm2d-12          [-1, 256, 56, 56]             512
           Conv2d-13          [-1, 256, 56, 56]          16,384
      BatchNorm2d-14          [-1, 256,

In [96]:
num_body_features = list(model_body.children())[-1].in_features

### c. Resnet-50 (Emotic weight)

In [97]:
model_body = torch.load('/content/drive/MyDrive/VA-prediction/models/body_train_lr001_b24_crossEtropy/model1.pth')
print (summary(model_body, (3,128,128), device="cpu"))

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1           [-1, 64, 64, 64]           9,408
       BatchNorm2d-2           [-1, 64, 64, 64]             128
              ReLU-3           [-1, 64, 64, 64]               0
         MaxPool2d-4           [-1, 64, 32, 32]               0
            Conv2d-5           [-1, 64, 32, 32]           4,096
       BatchNorm2d-6           [-1, 64, 32, 32]             128
              ReLU-7           [-1, 64, 32, 32]               0
            Conv2d-8           [-1, 64, 32, 32]          36,864
       BatchNorm2d-9           [-1, 64, 32, 32]             128
             ReLU-10           [-1, 64, 32, 32]               0
           Conv2d-11          [-1, 256, 32, 32]          16,384
      BatchNorm2d-12          [-1, 256, 32, 32]             512
           Conv2d-13          [-1, 256, 32, 32]          16,384
      BatchNorm2d-14          [-1, 256,

In [98]:
num_body_features = list(model_body.children())[-1].in_features

### d. SwinT (ImageNet weight)

In [99]:
model_body = models.swin_t(weights = 'DEFAULT')
print (summary(model_body, (3,128,128), device="cpu"))
isSwinT = True

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1           [-1, 96, 32, 32]           4,704
           Permute-2           [-1, 32, 32, 96]               0
         LayerNorm-3           [-1, 32, 32, 96]             192
         LayerNorm-4           [-1, 32, 32, 96]             192
ShiftedWindowAttention-5           [-1, 32, 32, 96]               0
   StochasticDepth-6           [-1, 32, 32, 96]               0
         LayerNorm-7           [-1, 32, 32, 96]             192
            Linear-8          [-1, 32, 32, 384]          37,248
              GELU-9          [-1, 32, 32, 384]               0
          Dropout-10          [-1, 32, 32, 384]               0
           Linear-11           [-1, 32, 32, 96]          36,960
          Dropout-12           [-1, 32, 32, 96]               0
  StochasticDepth-13           [-1, 32, 32, 96]               0
SwinTransformerBlock-14           [

In [100]:
num_body_features = list(model_body.children())[-1].in_features

### e. SwinT (Emotic weight)

In [101]:
model_body = torch.load("/content/drive/MyDrive/VA-prediction/models/SwinT_EMOTIC.pth", map_location=lambda storage, loc: storage)
print (summary(model_body, (3,128,128), device="cpu"))
isSwinT = True

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1           [-1, 96, 32, 32]           4,704
           Permute-2           [-1, 32, 32, 96]               0
         LayerNorm-3           [-1, 32, 32, 96]             192
         LayerNorm-4           [-1, 32, 32, 96]             192
ShiftedWindowAttention-5           [-1, 32, 32, 96]               0
   StochasticDepth-6           [-1, 32, 32, 96]               0
         LayerNorm-7           [-1, 32, 32, 96]             192
            Linear-8          [-1, 32, 32, 384]          37,248
              GELU-9          [-1, 32, 32, 384]               0
          Dropout-10          [-1, 32, 32, 384]               0
           Linear-11           [-1, 32, 32, 96]          36,960
          Dropout-12           [-1, 32, 32, 96]               0
  StochasticDepth-13           [-1, 32, 32, 96]               0
SwinTransformerBlock-14           [

In [102]:
num_body_features = list(model_body.children())[-1].in_features

## 2 . Context Branch:

### a. Resnet-18 (Places365 weight)

In [103]:
# Get Resnet18 model trained on places dataset.
store_path = "./places"
if not os.path.exists(store_path):
    os.mkdir(store_path)

file_path = "./places/resnet18_places365.pth.tar"
if not os.path.exists(file_path):
    !wget http://places2.csail.mit.edu/models_places365/resnet18_places365.pth.tar -O ./places/resnet18_places365.pth.tar

In [104]:
# the architecture to use
arch = 'resnet18'
model_weight = os.path.join('./places', 'resnet18_places365.pth.tar')

# create the network architecture
model = models.__dict__[arch](num_classes=365)

#model_weight = '%s_places365.pth.tar' % arch

checkpoint = torch.load(model_weight, map_location=lambda storage, loc: storage) # model trained in GPU could be deployed in CPU machine like this!
state_dict = {str.replace(k,'module.',''): v for k,v in checkpoint['state_dict'].items()} # the data parallel layer will add 'module' before each layer name
model.load_state_dict(state_dict)
model.eval()

model.cpu()
torch.save(model.state_dict(), './places/resnet18_state_dict.pth')
print ('completed cell')

completed cell


In [105]:
model_path_places = './places'

model_context = models.__dict__[arch](num_classes=365)
context_state_dict = torch.load(os.path.join(model_path_places, 'resnet18_state_dict.pth'))
model_context.load_state_dict(context_state_dict)
print (summary(model_context, (3,224,224), device="cpu"))

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1         [-1, 64, 112, 112]           9,408
       BatchNorm2d-2         [-1, 64, 112, 112]             128
              ReLU-3         [-1, 64, 112, 112]               0
         MaxPool2d-4           [-1, 64, 56, 56]               0
            Conv2d-5           [-1, 64, 56, 56]          36,864
       BatchNorm2d-6           [-1, 64, 56, 56]             128
              ReLU-7           [-1, 64, 56, 56]               0
            Conv2d-8           [-1, 64, 56, 56]          36,864
       BatchNorm2d-9           [-1, 64, 56, 56]             128
             ReLU-10           [-1, 64, 56, 56]               0
       BasicBlock-11           [-1, 64, 56, 56]               0
           Conv2d-12           [-1, 64, 56, 56]          36,864
      BatchNorm2d-13           [-1, 64, 56, 56]             128
             ReLU-14           [-1, 64,

In [106]:
num_context_features = list(model_context.children())[-1].in_features

### b. Resnet-50 (Places365 weight)

In [107]:
# Get Resnet50 model trained on places dataset
store_path = "./places"
if not os.path.exists(store_path):
    os.mkdir(store_path)

file_path = "./places/resnet50_places365.pth.tar"
if not os.path.exists(file_path):
    !wget http://places2.csail.mit.edu/models_places365/resnet50_places365.pth.tar -O ./places/resnet50_places365.pth.tar

In [108]:
# the architecture to use
arch50 = 'resnet50'
model_weight = os.path.join('./places', 'resnet50_places365.pth.tar')

# create the network architecture
model = models.__dict__[arch50](num_classes=365)

#model_weight = '%s_places365.pth.tar' % arch

checkpoint = torch.load(model_weight, map_location=lambda storage, loc: storage) # model trained in GPU could be deployed in CPU machine like this!
state_dict = {str.replace(k,'module.',''): v for k,v in checkpoint['state_dict'].items()} # the data parallel layer will add 'module' before each layer name
model.load_state_dict(state_dict)
model.eval()

model.cpu()
torch.save(model.state_dict(), './places/resnet50_state_dict.pth')
print ('completed cell')

completed cell


In [109]:
model_path_places = './places'

model_context = models.__dict__[arch50](num_classes=365)
context_state_dict = torch.load(os.path.join(model_path_places, 'resnet50_state_dict.pth'))
model_context.load_state_dict(context_state_dict)

print (summary(model_context, (3,224,224), device="cpu"))

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1         [-1, 64, 112, 112]           9,408
       BatchNorm2d-2         [-1, 64, 112, 112]             128
              ReLU-3         [-1, 64, 112, 112]               0
         MaxPool2d-4           [-1, 64, 56, 56]               0
            Conv2d-5           [-1, 64, 56, 56]           4,096
       BatchNorm2d-6           [-1, 64, 56, 56]             128
              ReLU-7           [-1, 64, 56, 56]               0
            Conv2d-8           [-1, 64, 56, 56]          36,864
       BatchNorm2d-9           [-1, 64, 56, 56]             128
             ReLU-10           [-1, 64, 56, 56]               0
           Conv2d-11          [-1, 256, 56, 56]          16,384
      BatchNorm2d-12          [-1, 256, 56, 56]             512
           Conv2d-13          [-1, 256, 56, 56]          16,384
      BatchNorm2d-14          [-1, 256,

In [110]:
num_context_features = list(model_context.children())[-1].in_features

## 3 . Face Branch:
The key to predicting emotion in a human image is facial expression. we employ two pre-trained models on the FER-2013 dataset by [Shangeth](https://github.com/shangeth/Facial-Emotion-Recognition-PyTorch-ONNX) and
[Balmukund](https://www.kaggle.com/code/balmukund/fer-2013-pytorch-implementation?fbclid=IwAR3xaZrtY7-RDZiXHGcjf6ytJ5Nk4wMDxGhsQs0pg2R0ul7GNv7lgS3ePI8), denoted by S-FER and B-FER, respectively.


### a. S-FER

In [111]:
face_model_path = "/content/drive/MyDrive/VA-prediction/models/FER_trained_model.pt"

In [112]:
import torch.nn as nn
import torch

class Face_Emotion_CNN(nn.Module):
  def __init__(self):
    super(Face_Emotion_CNN, self).__init__()
    self.cnn1 = nn.Conv2d(in_channels=1, out_channels=8, kernel_size=3)
    self.cnn2 = nn.Conv2d(in_channels=8, out_channels=16, kernel_size=3)
    self.cnn3 = nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3)
    self.cnn4 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3)
    self.cnn5 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3)
    self.cnn6 = nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3)
    self.cnn7 = nn.Conv2d(in_channels=256, out_channels=256, kernel_size=3)
    self.relu = nn.ReLU()
    self.pool1 = nn.MaxPool2d(2, 1)
    self.pool2 = nn.MaxPool2d(2, 2)
    self.cnn1_bn = nn.BatchNorm2d(8)
    self.cnn2_bn = nn.BatchNorm2d(16)
    self.cnn3_bn = nn.BatchNorm2d(32)
    self.cnn4_bn = nn.BatchNorm2d(64)
    self.cnn5_bn = nn.BatchNorm2d(128)
    self.cnn6_bn = nn.BatchNorm2d(256)
    self.cnn7_bn = nn.BatchNorm2d(256)
    self.fc1 = nn.Linear(1024, 512)
    self.fc2 = nn.Linear(512, 256)
    self.fc3 = nn.Linear(256, 7)
    self.dropout = nn.Dropout(0.3)
    self.log_softmax = nn.LogSoftmax(dim=1)

  def forward(self, x):
    x = self.relu(self.pool1(self.cnn1_bn(self.cnn1(x))))
    x = self.relu(self.pool1(self.cnn2_bn(self.dropout(self.cnn2(x)))))
    x = self.relu(self.pool1(self.cnn3_bn(self.cnn3(x))))
    x = self.relu(self.pool1(self.cnn4_bn(self.dropout(self.cnn4(x)))))
    x = self.relu(self.pool2(self.cnn5_bn(self.cnn5(x))))
    x = self.relu(self.pool2(self.cnn6_bn(self.dropout(self.cnn6(x)))))
    x = self.relu(self.pool2(self.cnn7_bn(self.dropout(self.cnn7(x)))))

    x = x.view(x.size(0), -1)

    x = self.relu(self.dropout(self.fc1(x)))
    x = self.relu(self.dropout(self.fc2(x)))
    x = self.log_softmax(self.fc3(x))
    return x

  def count_parameters(self):
    return sum(p.numel() for p in self.parameters() if p.requires_grad)

In [113]:
def load_trained_model(model_path):
    model = Face_Emotion_CNN()
    model.load_state_dict(torch.load(model_path, map_location=lambda storage, loc: storage), strict=False)
    return model

In [114]:
model_face = load_trained_model(face_model_path)

In [115]:
print(summary(model_face, (1, 48,48), device="cpu"))

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1            [-1, 8, 46, 46]              80
       BatchNorm2d-2            [-1, 8, 46, 46]              16
         MaxPool2d-3            [-1, 8, 45, 45]               0
              ReLU-4            [-1, 8, 45, 45]               0
            Conv2d-5           [-1, 16, 43, 43]           1,168
           Dropout-6           [-1, 16, 43, 43]               0
       BatchNorm2d-7           [-1, 16, 43, 43]              32
         MaxPool2d-8           [-1, 16, 42, 42]               0
              ReLU-9           [-1, 16, 42, 42]               0
           Conv2d-10           [-1, 32, 40, 40]           4,640
      BatchNorm2d-11           [-1, 32, 40, 40]              64
        MaxPool2d-12           [-1, 32, 39, 39]               0
             ReLU-13           [-1, 32, 39, 39]               0
           Conv2d-14           [-1, 64,

In [116]:
class Face_Emotion_CNN_new(nn.Module):
  def __init__(self):
    super(Face_Emotion_CNN_new, self).__init__()
    self.cnn1 = nn.Conv2d(in_channels=1, out_channels=8, kernel_size=3)
    self.cnn2 = nn.Conv2d(in_channels=8, out_channels=16, kernel_size=3)
    self.cnn3 = nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3)
    self.cnn4 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3)
    self.cnn5 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3)
    self.cnn6 = nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3)
    self.cnn7 = nn.Conv2d(in_channels=256, out_channels=256, kernel_size=3)
    self.relu = nn.ReLU()
    self.pool1 = nn.MaxPool2d(2, 1)
    self.pool2 = nn.MaxPool2d(2, 2)
    self.cnn1_bn = nn.BatchNorm2d(8)
    self.cnn2_bn = nn.BatchNorm2d(16)
    self.cnn3_bn = nn.BatchNorm2d(32)
    self.cnn4_bn = nn.BatchNorm2d(64)
    self.cnn5_bn = nn.BatchNorm2d(128)
    self.cnn6_bn = nn.BatchNorm2d(256)
    self.cnn7_bn = nn.BatchNorm2d(256)
    self.fc1 = nn.Linear(1024, 512)
    self.fc2 = nn.Linear(512, 256)
    self.dropout = nn.Dropout(0.3)

  def forward(self, x):
    x = self.relu(self.pool1(self.cnn1_bn(self.cnn1(x))))
    x = self.relu(self.pool1(self.cnn2_bn(self.dropout(self.cnn2(x)))))
    x = self.relu(self.pool1(self.cnn3_bn(self.cnn3(x))))
    x = self.relu(self.pool1(self.cnn4_bn(self.dropout(self.cnn4(x)))))
    x = self.relu(self.pool2(self.cnn5_bn(self.cnn5(x))))
    x = self.relu(self.pool2(self.cnn6_bn(self.dropout(self.cnn6(x)))))
    x = self.relu(self.pool2(self.cnn7_bn(self.dropout(self.cnn7(x)))))

    x = x.view(x.size(0), -1)

    x = self.relu(self.dropout(self.fc1(x)))
    x = self.fc2(x)
    return x

  def count_parameters(self):
    return sum(p.numel() for p in self.parameters() if p.requires_grad)

In [117]:
state_dict = model_face.state_dict()
del state_dict['fc3.weight']
del state_dict['fc3.bias']

In [118]:
model_face = Face_Emotion_CNN_new()

In [119]:
model_face.load_state_dict(state_dict)

<All keys matched successfully>

In [120]:
print(summary(model_face, (1, 48,48), device="cpu"))

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1            [-1, 8, 46, 46]              80
       BatchNorm2d-2            [-1, 8, 46, 46]              16
         MaxPool2d-3            [-1, 8, 45, 45]               0
              ReLU-4            [-1, 8, 45, 45]               0
            Conv2d-5           [-1, 16, 43, 43]           1,168
           Dropout-6           [-1, 16, 43, 43]               0
       BatchNorm2d-7           [-1, 16, 43, 43]              32
         MaxPool2d-8           [-1, 16, 42, 42]               0
              ReLU-9           [-1, 16, 42, 42]               0
           Conv2d-10           [-1, 32, 40, 40]           4,640
      BatchNorm2d-11           [-1, 32, 40, 40]              64
        MaxPool2d-12           [-1, 32, 39, 39]               0
             ReLU-13           [-1, 32, 39, 39]               0
           Conv2d-14           [-1, 64,

In [121]:
num_face_features = 256

### b. B-FER

In [122]:
isBFER = True

In [123]:
class Net(nn.Module):
    def __init__(self, dropout):
        super(Net, self).__init__()
        dropout_value = dropout
        # Input Block
        self.convblock1 = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=32, kernel_size=(3, 3), padding=1, bias=False),
            nn.ReLU(),
            nn.BatchNorm2d(32),
            # nn.Dropout(dropout_value)
        )

        self.convblock2 = nn.Sequential(
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=(3, 3), padding=1, bias=False),
            nn.ReLU(),
            nn.BatchNorm2d(64),
            # nn.Dropout(dropout_value)
        )

        # TRANSITION BLOCK 1
        self.pool1 = nn.MaxPool2d(2, 2) # output_size = 24 RF=7
        self.convblock3 = nn.Sequential(
            nn.Conv2d(in_channels=64, out_channels=128, kernel_size=(3, 3), padding=1, bias=False),
            nn.ReLU(),
            nn.BatchNorm2d(128),
            # nn.Dropout(dropout_value)
        )

        self.convblock4 = nn.Sequential(
            nn.Conv2d(in_channels=128, out_channels=256, kernel_size=(3, 3), padding=1, bias=False),
            nn.ReLU(),
            nn.BatchNorm2d(256),
        )

        self.convblock5 = nn.Sequential(
            nn.Conv2d(in_channels=256, out_channels=512, kernel_size=(1, 1), padding=1 , bias=False),
            nn.ReLU(),
            nn.BatchNorm2d(512),
            # nn.Dropout(dropout_value)
        )

        # TRANSITION BLOCK 2
        self.pool2 = nn.MaxPool2d(2, 2) # output_size = 12 RF=20

        # CONVOLUTION BLOCK 2
        self.convblock6 = nn.Sequential(
            nn.Conv2d(in_channels=512, out_channels=1024, kernel_size=(3, 3), padding=1, bias=False),
            nn.ReLU(),
            nn.BatchNorm2d(1024),
            # nn.Dropout(dropout_value)
        )

        self.convblock7 = nn.Sequential(
            nn.Conv2d(in_channels=1024, out_channels=1024, kernel_size=(3, 3), padding=1, bias=False),
            nn.ReLU(),
            nn.BatchNorm2d(1024),
            # nn.Dropout(dropout_value)
        )

        # TRANSITION BLOCK 3
        self.pool3 = nn.MaxPool2d(2, 2) # output_size =6 RF=32

        self.convblock8 = nn.Sequential(
             nn.Conv2d(in_channels=1024, out_channels=512, kernel_size=(3, 3), padding=1, bias=False),
             nn.ReLU(),
             nn.BatchNorm2d(512),
             # nn.Dropout(dropout_value)
         )

        self.convblock9 = nn.Sequential(
             nn.Conv2d(in_channels=512, out_channels=256, kernel_size=(3, 3), padding=0, bias=False),
             nn.ReLU(),
             nn.BatchNorm2d(256),
             # nn.Dropout(dropout_value)
         )
        # self.pool2 = nn.MaxPool2d(2, 2) # output_size = 2
        self.gap = nn.Sequential(
            nn.AvgPool2d(kernel_size=4)
        )
        self.convblock10 = nn.Sequential(
            nn.Conv2d(in_channels=256, out_channels=7, kernel_size=(1, 1), padding=0, bias=False)
        )

    def forward(self, x):
        x = self.convblock1(x)
        x = self.convblock2(x)
        x = self.pool1(x)
        x = self.convblock3(x)
        x = self.convblock4(x)
        x = self.convblock5(x)
        x = self.pool2(x)
        x = self.convblock6(x)
        x = self.convblock7(x)
        x = self.pool3(x)
        x = self.convblock8(x)
        x = self.convblock9(x)
        x = self.gap(x)
        x = self.convblock10(x)
        x = x.view(-1, 7)
        return F.log_softmax(x, dim=-1)

In [124]:
model_face = Net(1.0)

In [125]:
model_face.load_state_dict(torch.load("/content/drive/MyDrive/VA-prediction/models/FER_2013_Kaggle.pth", map_location=lambda storage, loc: storage), strict=False)

<All keys matched successfully>

In [126]:
print(summary(model_face, (3, 48,48), device="cpu"))

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1           [-1, 32, 48, 48]             864
              ReLU-2           [-1, 32, 48, 48]               0
       BatchNorm2d-3           [-1, 32, 48, 48]              64
            Conv2d-4           [-1, 64, 48, 48]          18,432
              ReLU-5           [-1, 64, 48, 48]               0
       BatchNorm2d-6           [-1, 64, 48, 48]             128
         MaxPool2d-7           [-1, 64, 24, 24]               0
            Conv2d-8          [-1, 128, 24, 24]          73,728
              ReLU-9          [-1, 128, 24, 24]               0
      BatchNorm2d-10          [-1, 128, 24, 24]             256
           Conv2d-11          [-1, 256, 24, 24]         294,912
             ReLU-12          [-1, 256, 24, 24]               0
      BatchNorm2d-13          [-1, 256, 24, 24]             512
           Conv2d-14          [-1, 512,

In [127]:
model_face = nn.Sequential(*(list(model_face.children())[:-1]))

In [128]:
print(summary(model_face, (3, 48,48), device="cpu"))

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1           [-1, 32, 48, 48]             864
              ReLU-2           [-1, 32, 48, 48]               0
       BatchNorm2d-3           [-1, 32, 48, 48]              64
            Conv2d-4           [-1, 64, 48, 48]          18,432
              ReLU-5           [-1, 64, 48, 48]               0
       BatchNorm2d-6           [-1, 64, 48, 48]             128
         MaxPool2d-7           [-1, 64, 24, 24]               0
            Conv2d-8          [-1, 128, 24, 24]          73,728
              ReLU-9          [-1, 128, 24, 24]               0
      BatchNorm2d-10          [-1, 128, 24, 24]             256
           Conv2d-11          [-1, 256, 24, 24]         294,912
             ReLU-12          [-1, 256, 24, 24]               0
      BatchNorm2d-13          [-1, 256, 24, 24]             512
           Conv2d-14          [-1, 512,

In [129]:
num_face_features = 256

# IV. Dataset:

In this paper, we use the EMOTIC dataset - a database of images of people in real environments, annotated with their apparent emotions. The images are annotated with an extended list of 26 emotion categories combined with the three common continuous dimensions Valence, Arousal and Dominance.

It consists of approximately 23,500 images collected from websites, social media, and other public datasets. Each image contains one or many people, and each person is labeled with Gender (Male or Female), Age (adult, kid, or teenager), and VAD (valence-arousal-dominance) values ranging from 0 to 10 and labeled to some of 26 discrete emotion categories. The 26 discrete emotion categories include: *Peace, Affection, Esteem, Anticipation, Engagement, Confidence, Happiness, Pleasure, Excitement, Surprise, Sympathy, Doubt/Confusion, Disconnection, Fatigue, Embarrassment, Yearning, Disapproval, Aversion, Annoyance, Anger, Sensitivity, Sadness, Disquietment, Fear, Pain, Suffering.*

<img src="https://raw.githubusercontent.com/BaoNinh2808/Server-Client/main/Dataset_Sample.png" width="50%" height="50%"/>

## 1 . Data Normalization:

In [41]:
context_mean = [0.4690646, 0.4407227, 0.40508908]
context_std = [0.2514227, 0.24312855, 0.24266963]

body_mean = [0.43832874, 0.3964344, 0.3706214]
body_std = [0.24784276, 0.23621225, 0.2323653]

if (isSwinT):
  body_mean = [0.485, 0.456, 0.406]
  body_std = [0.229, 0.224, 0.225]

face_mean = [0.507395516207, 0.507395516207, 0.507395516207]
face_std = [0.255128989415, 0.255128989415, 0.255128989415]

context_norm = [context_mean, context_std]
body_norm = [body_mean, body_std]
face_norm = [face_mean, face_std]

train_transform = transforms.Compose([transforms.ToPILImage(),
                                      transforms.RandomHorizontalFlip(),
                                      transforms.ColorJitter(brightness=0.4, contrast=0.4, saturation=0.4),
                                      transforms.ToTensor()])

test_transform = transforms.Compose([transforms.ToPILImage(),
                                     transforms.ToTensor()])

face_train_transform = transforms.Compose([transforms.ToPILImage(),
                                      transforms.RandomHorizontalFlip(),
                                      transforms.ColorJitter(brightness=0.4, contrast=0.4, saturation=0.4),
                                      transforms.ToTensor()])

face_test_transform = transforms.Compose([transforms.ToPILImage(),
                                     transforms.ToTensor()])

if (isSwinT):
    train_transform = transforms.Compose([transforms.ToPILImage(),
                                      transforms.Resize(size=[232], interpolation=transforms.InterpolationMode.BICUBIC),
                                      transforms.CenterCrop(size=[224]),
                                      transforms.RandomHorizontalFlip(),
                                      transforms.ColorJitter(brightness=0.4, contrast=0.4, saturation=0.4),
                                      transforms.ToTensor()])


    test_transform = transforms.Compose([transforms.ToPILImage(),
                                     transforms.Resize(size=[232], interpolation=transforms.InterpolationMode.BICUBIC),
                                      transforms.CenterCrop(size=[224]),
                                     transforms.ToTensor()])

## 2 . Create Class for Preprocessing Data:

In [42]:
class Emotic_PreDataset(Dataset):
  ''' Custom Emotic dataset class. Use preprocessed data stored in npy files. '''
  def __init__(self, x_context, x_body, x_face, y_cat, y_cont, context_transform, body_transform, face_transform, context_norm, body_norm, face_norm):
    super(Emotic_PreDataset,self).__init__()
    self.x_context = x_context
    self.x_body = x_body
    self.x_face = x_face
    self.y_cat = y_cat
    self.y_cont = y_cont
    self.context_transform = context_transform
    self.body_transform = body_transform
    self.face_transform = face_transform
    self.context_norm = transforms.Normalize(context_norm[0], context_norm[1])  # Normalizing the context image with context mean and context std
    self.body_norm = transforms.Normalize(body_norm[0], body_norm[1])           # Normalizing the body image with body mean and body std
    self.face_norm = transforms.Normalize(face_norm[0], face_norm[1])           # Normalizing the face image with face mean and face std
  def __len__(self):
    return len(self.y_cont)

  def __getitem__(self, index):
    image_context = self.x_context[index]
    image_body = self.x_body[index]
    image_face = self.x_face[index]
    cat_label = self.y_cat[index]
    cont_label = self.y_cont[index]
    # , torch.tensor(cat_label, dtype=torch.float32)
    return self.context_norm(self.context_transform(image_context)), self.body_norm(self.body_transform(image_body)), self.face_norm(self.face_transform(image_face)), torch.tensor(cat_label, dtype=torch.float32),  torch.tensor(cont_label, dtype=torch.float32)/10.0

print ('completed cell')

completed cell


## 3 . Preprocess and Load Data:

In [43]:
# # Change data_src variable as per your drive
data_src = '/content/drive/MyDrive/VA-prediction/dataset'

#train
train_context = np.load(os.path.join(data_src,'pre', 'train_context_arr.npy'))
train_body = np.load(os.path.join(data_src,'pre','train_body_arr.npy'))

train_cat = np.load(os.path.join(data_src,'pre','train_cat_arr.npy'))
train_cont = np.load(os.path.join(data_src,'pre','train_cont_arr.npy'))


#val
val_context = np.load(os.path.join(data_src,'pre','val_context_arr.npy'))
val_body = np.load(os.path.join(data_src,'pre','val_body_arr.npy'))

val_cat = np.load(os.path.join(data_src,'pre','val_cat_arr.npy'))
val_cont = np.load(os.path.join(data_src,'pre','val_cont_arr.npy'))


#test
test_context = np.load(os.path.join(data_src,'pre','test_context_arr.npy'))
test_body = np.load(os.path.join(data_src,'pre','test_body_arr.npy'))\

test_cat = np.load(os.path.join(data_src,'pre','test_cat_arr.npy'))
test_cont = np.load(os.path.join(data_src,'pre','test_cont_arr.npy'))

#Face data
train_face =  np.stack((np.load(os.path.join(data_src,'pre','train_face_arr.npy')),) * 3, axis=-1)
val_face = np.stack((np.load(os.path.join(data_src,'pre','val_face_arr.npy')),) * 3, axis=-1)
test_face = np.stack((np.load(os.path.join(data_src,'pre','test_face_arr.npy')),) * 3, axis=-1)

# Categorical emotion classes
cat = ['Affection', 'Anger', 'Annoyance', 'Anticipation', 'Aversion', 'Confidence', 'Disapproval', 'Disconnection',
       'Disquietment', 'Doubt/Confusion', 'Embarrassment', 'Engagement', 'Esteem', 'Excitement', 'Fatigue', 'Fear',
       'Happiness', 'Pain', 'Peace', 'Pleasure', 'Sadness', 'Sensitivity', 'Suffering', 'Surprise', 'Sympathy', 'Yearning']

cat2ind = {}
ind2cat = {}
for idx, emotion in enumerate(cat):
  cat2ind[emotion] = idx
  ind2cat[idx] = emotion

print ('train ', 'context ', train_context.shape, 'body', train_body.shape, 'cat ', train_cat.shape, 'cont', train_cont.shape)
print ('val ', 'context ', val_context.shape, 'body', val_body.shape, 'cat ', val_cat.shape, 'cont', val_cont.shape)
print ('test ', 'context ', test_context.shape, 'body', test_body.shape, 'cat ', test_cat.shape, 'cont', test_cont.shape)
print ('completed cell')

train  context  (23266, 224, 224, 3) body (23266, 128, 128, 3) cat  (23266, 26) cont (23266, 3)
val  context  (3315, 224, 224, 3) body (3315, 128, 128, 3) cat  (3315, 26) cont (3315, 3)
test  context  (7203, 224, 224, 3) body (7203, 128, 128, 3) cat  (7203, 26) cont (7203, 3)
completed cell


In [44]:
batch_size = 26

train_dataset = Emotic_PreDataset(train_context, train_body, train_face, train_cat, train_cont, \
                                  train_transform, train_transform, face_train_transform, context_norm, body_norm, face_norm)
val_dataset = Emotic_PreDataset(val_context, val_body, val_face, val_cat, val_cont, \
                                train_transform, train_transform, face_train_transform, context_norm, body_norm, face_norm)
test_dataset = Emotic_PreDataset(test_context, test_body, test_face, test_cat, test_cont, \
                                 test_transform, test_transform, face_test_transform, context_norm, body_norm, face_norm)

train_loader = DataLoader(train_dataset, batch_size, shuffle=True, drop_last=True)
val_loader = DataLoader(val_dataset, batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size, shuffle=False)

print ('train loader ', len(train_loader), 'val loader ', len(val_loader), 'test', len(test_loader))
print ('completed cell')

train loader  894 val loader  128 test 278
completed cell


# IV. Loss Function & Evaluate Metric:

## 1 . Loss Function:

In [45]:
class DiscreteLoss(nn.Module):
  ''' Class to measure loss between categorical emotion predictions and labels.'''
  def __init__(self, weight_type='mean', device=torch.device('cpu')):
    super(DiscreteLoss, self).__init__()
    self.weight_type = weight_type
    self.device = device
    if self.weight_type == 'mean':
      self.weights = torch.ones((1,26))/26.0
      self.weights = self.weights.to(self.device)
    elif self.weight_type == 'static':
      self.weights = torch.FloatTensor([0.1435, 0.1870, 0.1692, 0.1165, 0.1949, 0.1204, 0.1728, 0.1372, 0.1620,
         0.1540, 0.1987, 0.1057, 0.1482, 0.1192, 0.1590, 0.1929, 0.1158, 0.1907,
         0.1345, 0.1307, 0.1665, 0.1698, 0.1797, 0.1657, 0.1520, 0.1537]).unsqueeze(0)
      self.weights = self.weights.to(self.device)

  def forward(self, pred, target):
    if self.weight_type == 'dynamic':
      self.weights = self.prepare_dynamic_weights(target)
      self.weights = self.weights.to(self.device)
    loss = (((pred - target)**2) * self.weights)
    return loss.sum()

  def prepare_dynamic_weights(self, target):
    target_stats = torch.sum(target, dim=0).float().unsqueeze(dim=0).cpu()
    weights = torch.zeros((1,26))
    weights[target_stats != 0 ] = 1.0/torch.log(target_stats[target_stats != 0].data + 1.2)
    weights[target_stats == 0] = 0.0001
    return weights


class ContinuousLoss_L2(nn.Module):
  ''' Class to measure loss between continuous emotion dimension predictions and labels. Using l2 loss as base. '''
  def __init__(self, margin=1):
    super(ContinuousLoss_L2, self).__init__()
    self.margin = margin

  def forward(self, pred, target):
    labs = torch.abs(pred - target)
    loss = labs ** 2
    loss[ (labs < self.margin) ] = 0.0
    return loss.sum()


class ContinuousLoss_SL1(nn.Module):
  ''' Class to measure loss between continuous emotion dimension predictions and labels. Using smooth l1 loss as base. '''
  def __init__(self, margin=1):
    super(ContinuousLoss_SL1, self).__init__()
    self.margin = margin

  def forward(self, pred, target):
    labs = torch.abs(pred - target)
    loss = 0.5 * (labs ** 2)
    loss[ (labs > self.margin) ] = labs[ (labs > self.margin) ] - 0.5
    return loss.sum()

print ('completed cell')

completed cell


## 2 . Evaluate Metric:

In [46]:
def test_scikit_ap(cat_preds, cat_labels):
  ap = np.zeros(26, dtype=np.float32)
  for i in range(26):
    ap[i] = average_precision_score(cat_labels[i, :], cat_preds[i, :])
  print ('ap', ap, ap.shape, ap.mean())
  return ap.mean()


def test_emotic_vad(cont_preds, cont_labels):
  vad = np.zeros(3, dtype=np.float32)
  for i in range(3):
    vad[i] = np.mean(np.abs(cont_preds[i, :] - cont_labels[i, :]))
  print ('vad', vad, vad.shape, vad.mean())
  return vad.mean()


def get_thresholds(cat_preds, cat_labels):
  thresholds = np.zeros(26, dtype=np.float32)
  for i in range(26):
    p, r, t = precision_recall_curve(cat_labels[i, :], cat_preds[i, :])
    for k in range(len(p)):
      if p[k] == r[k]:
        thresholds[i] = t[k]
        break
  np.save('./thresholds.npy', thresholds)
  return thresholds

print ('completed cell')

completed cell


# VI. Model + Optimizer

Continue, define type of our experiment. If you want to predict Dicrete Emotions, let set **'isVADPrediction = False'**. Else if you want to predict VAD value, let set **'isVADPrediction = True'**

In [83]:
#determine Prediction type - Categorizes Prediction/VAD Prediction
isVADPrediction = False

In [84]:
class Fusion(nn.Module):
  ''' Fusion Model'''
  def __init__(self, num_context_features, num_body_features, num_face_features):
    super(Fusion,self).__init__()
    self.num_context_features = num_context_features
    self.num_body_features = num_body_features
    self.num_face_features = num_face_features
    self.fc1 = nn.Linear((self.num_context_features + self.num_body_features + self.num_face_features), 256)
    self.bn1 = nn.BatchNorm1d(256)
    self.d1 = nn.Dropout(p=0.5)
    self.fc_cat = nn.Linear(256, 26)
    self.fc_cont = nn.Linear(256, 3)
    self.relu = nn.ReLU()


  def forward(self, x_context, x_body, x_face):
    context_features = x_context.view(-1, self.num_context_features)
    body_features = x_body.view(-1, self.num_body_features)
    face_features = x_face.view(-1, self.num_face_features)
    fuse_features = torch.cat((context_features, body_features, face_features), 1)
    fuse_out = self.fc1(fuse_features)
    fuse_out = self.bn1(fuse_out)
    fuse_out = self.relu(fuse_out)
    fuse_out = self.d1(fuse_out)
    cat_out = self.fc_cat(fuse_out)
    cont_out = self.fc_cont(fuse_out)
    if (isVADPrediction == False):
        return cat_out
    return cont_out

print ('completed cell')

completed cell


In [130]:
fusion_model = Fusion(num_context_features, num_body_features, num_face_features)
model_context = nn.Sequential(*(list(model_context.children())[:-1]))
model_body = nn.Sequential(*(list(model_body.children())[:-1]))
model_face = model_face

for param in fusion_model.parameters():
  param.requires_grad = True
for param in model_context.parameters():
  param.requires_grad = False
for param in model_body.parameters():
  param.requires_grad = False
for param in model_face.parameters():
  param.requires_grad = False
print ('completed cell')

completed cell


Let’s first define our device as the first visible cuda device if we have CUDA available:

In [131]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [132]:
# Define relevant variables
learning_rate = 0.001
weight_decay = 5e-4
step_size = 7
gamma = 0.1

opt = optim.Adam((list(fusion_model.parameters()) + list(model_context.parameters()) + \
                  list(model_body.parameters()) + list(model_face.parameters())), lr=learning_rate, weight_decay=weight_decay)

scheduler = StepLR(opt, step_size=7, gamma=gamma)

#Chose loss function for Discrete Emotion Prediction Trainning and Continous Emotion Prediction trainning
disc_loss = DiscreteLoss('dynamic', device)
cont_loss_L2 = ContinuousLoss_L2()

print ('completed cell')

completed cell


# VII. Trainning + Testing Functions:

**Discrete Emotion Prediction:**

In [63]:
def test_disc(models, device, data_loader, num_images):
    model_context, model_body, model_face, fusion_model = models
    cat_preds = np.zeros((num_images, 26))
    cat_labels = np.zeros((num_images, 26))

    with torch.no_grad():
        model_context.to(device)
        model_body.to(device)
        model_face.to(device)
        fusion_model.to(device)
        model_context.eval()
        model_body.eval()
        model_face.eval()
        fusion_model.eval()

        indx = 0
        print ('starting testing')
        for images_context, images_body, images_face, labels_cat, labels_cont in iter(data_loader):
            images_context = images_context.to(device)
            images_body = images_body.to(device)
            images_face = images_face.to(device)
            if (isBFER == False):
                images_face = torch.mean(images_face, dim=1, keepdim=True).to(device)

            labels_cat = labels_cat.to(device)
            labels_cont = labels_cont.to(device)


            pred_context = model_context(images_context)
            pred_body = model_body(images_body)
            pred_face = model_face(images_face)
            pred_cat = fusion_model(pred_context, pred_body, pred_face)

            cat_preds[ indx : (indx + pred_cat.shape[0]), :] = pred_cat.to("cpu").data.numpy()
            cat_labels[ indx : (indx + labels_cat.shape[0]), :] = labels_cat.to("cpu").data.numpy()
            indx = indx + pred_cat.shape[0]

    cat_preds = cat_preds.transpose()
    cat_labels = cat_labels.transpose()
    print ('completed testing')
    ap_mean = test_scikit_ap(cat_preds, cat_labels)
    return ap_mean

print ('completed cell')

completed cell


In [53]:
def train_disc(epochs, model_path, opt, scheduler, models, disc_loss, cont_loss, cat_loss_param=1.0, cont_loss_param=0.0, train_length = train_dataset.__len__(), val_length = val_dataset.__len__()):
  if not os.path.exists(model_path):
    os.makedirs(model_path)

  min_loss = np.inf
  min_mae = np.inf

  train_loss = list()
  val_loss = list()
  train_mae = list()
  val_mae = list()

  model_context, model_body, model_face, fusion_model = models

  for e in range(epochs):
    running_loss = 0.0

    fusion_model.to(device)
    model_context.to(device)
    model_body.to(device)
    model_face.to(device)

    fusion_model.train()
    model_context.train()
    model_body.train()
    model_face.train()

    train_cat_preds = np.zeros((train_length, 26))
    train_cat_labels = np.zeros((train_length, 26))
    indx = 0

    for images_context, images_body, images_face, labels_cat, labels_cont in iter(train_loader):
      images_context = images_context.to(device)
      images_body = images_body.to(device)
      images_face = images_face.to(device)
      if (isBFER == False):
            images_face = torch.mean(images_face, dim=1, keepdim=True).to(device)
      labels_cat = labels_cat.to(device)

      opt.zero_grad()

      pred_context = model_context(images_context)
      pred_body = model_body(images_body)
      pred_face = model_face(images_face)

      pred_cat = fusion_model(pred_context, pred_body, pred_face)
      cat_loss_batch = disc_loss(pred_cat, labels_cat)
      loss = (cat_loss_param * cat_loss_batch)
      running_loss += loss.item()


      loss.backward()
      opt.step()

      train_cat_preds[ indx : (indx + pred_cat.shape[0]), :] = pred_cat.to("cpu").data.numpy()
      train_cat_labels[ indx : (indx + labels_cat.shape[0]), :] = labels_cat.to("cpu").data.numpy()
      indx = indx + pred_cat.shape[0]

    if e % 1 == 0:
      print ('epoch = %d training loss = %.4f' %(e, running_loss))
      train_loss.append(running_loss)
      train_cat_preds = train_cat_preds.transpose()
      train_cat_labels = train_cat_labels.transpose()
      train_mae.append(test_scikit_ap(train_cat_preds, train_cat_labels))
      print ('epoch = %d training AP = %.4f' %(e, train_mae[-1]))


    running_loss = 0.0
    fusion_model.eval()
    model_context.eval()
    model_body.eval()
    model_face.eval()

    val_cat_preds = np.zeros((val_length, 26))
    val_cat_labels = np.zeros((val_length, 26))
    indx = 0
    with torch.no_grad():
      for images_context, images_body, images_face, labels_cat, labels_cont in iter(val_loader):
        images_context = images_context.to(device)
        images_body = images_body.to(device)
        images_face = images_face.to(device)
        if (isBFER == False):
            images_face = torch.mean(images_face, dim=1, keepdim=True).to(device)
        labels_cat = labels_cat.to(device)

        pred_context = model_context(images_context)
        pred_body = model_body(images_body)
        pred_face = model_face(images_face)

        pred_cat = fusion_model(pred_context, pred_body, pred_face)
        cat_loss_batch = disc_loss(pred_cat, labels_cat)
        loss =  (cat_loss_param * cat_loss_batch)
        running_loss += loss.item()

        val_cat_preds[ indx : (indx + pred_cat.shape[0]), :] = pred_cat.to("cpu").data.numpy()
        val_cat_labels[ indx : (indx + labels_cat.shape[0]), :] = labels_cat.to("cpu").data.numpy()
        indx = indx + pred_cat.shape[0]
      if e % 1 == 0:
        print ('epoch = %d validation loss = %.4f' %(e, running_loss))
        val_loss.append(running_loss)
        val_cat_preds = val_cat_preds.transpose()
        val_cat_labels = val_cat_labels.transpose()
        val_mae.append(test_scikit_ap(val_cat_preds, val_cat_labels))
        print ('epoch = %d validation AP = %.4f' %(e, val_mae[-1]))

    scheduler.step()
    print('')
    if val_loss[-1] < min_loss:
        min_loss = val_loss[-1]
        # saving models for lowest loss
        print ('saving model at epoch e = %d' %(e))
        fusion_model.to("cpu")
        model_context.to("cpu")
        model_body.to("cpu")
        model_face.to("cpu")
        torch.save(fusion_model, os.path.join(model_path, 'model_fusion.pth'))
        torch.save(model_context, os.path.join(model_path, 'model_context.pth'))
        torch.save(model_body, os.path.join(model_path, 'model_body.pth'))
        torch.save(model_face, os.path.join(model_path, 'model_face.pth'))

  print ('completed training')

  #statistic graphic
  f, [[ax1, ax2], [ax3, ax4]] = plt.subplots(2, 2, figsize = (15, 10))
  f.suptitle('Multi-Branch Network for Imagery Emotion Prediction')
  ax1.plot(range(0,len(train_loss)),train_loss, color='Blue')
  ax2.plot(range(0,len(val_loss)),val_loss, color='Red')
  ax1.legend(['train loss'])
  ax2.legend(['val loss'])

  ax3.plot(range(0,len(train_mae)),train_mae, color='Blue')
  ax4.plot(range(0,len(val_mae)),val_mae, color='Red')
  ax3.legend(['train mAP'])
  ax4.legend(['val mAP'])

print ('completed cell')

completed cell


**Continous Emotion Prediction:**


In [54]:
def test_cont(models, device, data_loader, num_images):
    model_context, model_body, model_face, fusion_model = models
    cont_preds = np.zeros((num_images, 3))
    cont_labels = np.zeros((num_images, 3))

    with torch.no_grad():
        model_context.to(device)
        model_body.to(device)
        model_face.to(device)
        fusion_model.to(device)
        model_context.eval()
        model_body.eval()
        model_face.eval()
        fusion_model.eval()

        indx = 0
        print ('starting testing')
        for images_context, images_body, images_face, labels_cat, labels_cont in iter(data_loader):
            images_context = images_context.to(device)
            images_body = images_body.to(device)
            images_face = images_face.to(device)
            if (isBFER == False):
              images_face = torch.mean(images_face, dim=1, keepdim=True).to(device)


            pred_context = model_context(images_context)
            pred_body = model_body(images_body)
            pred_face = model_face(images_face)
            pred_cont = fusion_model(pred_context, pred_body, pred_face)

            cont_preds[ indx : (indx + pred_cont.shape[0]), :] = pred_cont.to("cpu").data.numpy() * 10
            cont_labels[ indx : (indx + labels_cont.shape[0]), :] = labels_cont.to("cpu").data.numpy() * 10
            indx = indx + pred_cont.shape[0]

    cont_preds = cont_preds.transpose()
    cont_labels = cont_labels.transpose()

    print ('completed testing')
    vad_mean = test_emotic_vad(cont_preds, cont_labels)
    return vad_mean

print ('completed cell')

completed cell


In [55]:
def train_cont(epochs, model_path, opt, scheduler, models, disc_loss, cont_loss, cat_loss_param=0, cont_loss_param=1.0, train_length = train_dataset.__len__(), val_length = val_dataset.__len__()):
  if not os.path.exists(model_path):
    os.makedirs(model_path)

  min_loss = np.inf

  train_loss = list()
  val_loss = list()
  train_mae = list()
  val_mae = list()
  model_context, model_body, model_face, fusion_model = models

  for e in range(epochs):
    running_loss = 0.0

    model_context.to(device)
    model_body.to(device)
    model_face.to(device)
    fusion_model.to(device)

    model_context.train()
    model_body.train()
    model_face.train()
    fusion_model.train()

    train_cont_preds = np.zeros((train_length, 3))
    train_cont_labels = np.zeros((train_length, 3))
    indx = 0

    for images_context, images_body, images_face, labels_cat, labels_cont in iter(train_loader):
      images_context = images_context.to(device)
      images_body = images_body.to(device)
      images_face = images_face.to(device)
      if (isBFER == False):
            images_face = torch.mean(images_face, dim=1, keepdim=True).to(device)
      labels_cat = labels_cat.to(device)
      labels_cont = labels_cont.to(device)

      opt.zero_grad()

      pred_context = model_context(images_context)
      pred_body = model_body(images_body)
      pred_face = model_face(images_face)
      pred_cont = fusion_model(pred_context, pred_body, pred_face)
      cont_loss_batch = cont_loss(pred_cont * 10, labels_cont * 10)
      loss = cont_loss_param * cont_loss_batch
      running_loss += loss.item()
      loss.backward()
      opt.step()

      train_cont_preds[ indx : (indx + pred_cont.shape[0]), :] = pred_cont.to("cpu").data.numpy() * 10
      train_cont_labels[ indx : (indx + labels_cont.shape[0]), :] = labels_cont.to("cpu").data.numpy() * 10
      indx = indx + pred_cont.shape[0]

    if e % 1 == 0:
      print ('epoch = %d training loss = %.4f' %(e, running_loss))
    train_loss.append(running_loss)
    train_cont_preds = train_cont_preds.transpose()
    train_cont_labels = train_cont_labels.transpose()
    train_mae.append(test_emotic_vad(train_cont_preds, train_cont_labels))
    print ('epoch = %d training MAE = %.4f' %(e, train_mae[-1]))

    running_loss = 0.0
    model_context.eval()
    model_body.eval()
    model_face.eval()
    fusion_model.eval()

    val_cont_preds = np.zeros((val_length, 3))
    val_cont_labels = np.zeros((val_length, 3))
    indx = 0

    with torch.no_grad():
      for images_context, images_body, images_face, labels_cat, labels_cont in iter(val_loader):
        images_context = images_context.to(device)
        images_body = images_body.to(device)
        images_face = images_face.to(device)
        if (isBFER == False):
            images_face = torch.mean(images_face, dim=1, keepdim=True).to(device)
        labels_cat = labels_cat.to(device)
        labels_cont = labels_cont.to(device)

        pred_context = model_context(images_context)
        pred_body = model_body(images_body)
        pred_face = model_face(images_face)
        pred_cont = fusion_model(pred_context, pred_body, pred_face)
        cont_loss_batch = cont_loss(pred_cont * 10, labels_cont * 10)
        loss = cont_loss_param * cont_loss_batch
        running_loss += loss.item()

        val_cont_preds[ indx : (indx + pred_cont.shape[0]), :] = pred_cont.to("cpu").data.numpy() * 10
        val_cont_labels[ indx : (indx + labels_cont.shape[0]), :] = labels_cont.to("cpu").data.numpy() * 10
        indx = indx + pred_cont.shape[0]
      if e % 1 == 0:
        print ('epoch = %d validation loss = %.4f' %(e, running_loss))
    val_loss.append(running_loss)
    val_cont_preds = val_cont_preds.transpose()
    val_cont_labels = val_cont_labels.transpose()
    val_mae.append(test_emotic_vad(val_cont_preds, val_cont_labels))
    print ('epoch = %d val MAE= %.4f' %(e, val_mae[-1]))
    scheduler.step()

    if val_loss[-1] < min_loss:
        min_loss = val_loss[-1]
        # saving models for lowest loss
        print ('saving model at epoch e = %d' %(e))
        fusion_model.to("cpu")
        model_context.to("cpu")
        model_body.to("cpu")
        model_face.to("cpu")
        torch.save(fusion_model, os.path.join(model_path, 'model_fusion.pth'))
        torch.save(model_context, os.path.join(model_path, 'model_context.pth'))
        torch.save(model_body, os.path.join(model_path, 'model_body.pth'))
        torch.save(model_face, os.path.join(model_path, 'model_face.pth'))

  print ('completed training')

  #statistic graphic
  f, [[ax1, ax2], [ax3, ax4]] = plt.subplots(2, 2, figsize = (15, 10))
  f.suptitle('Multi-Branch Network for Imagery Emotion Prediction')
  ax1.plot(range(0,len(train_loss)),train_loss, color='Blue')
  ax2.plot(range(0,len(val_loss)),val_loss, color='Red')
  ax1.legend(['train loss'])
  ax2.legend(['val loss'])

  ax3.plot(range(0,len(train_mae)),train_mae, color='Blue')
  ax4.plot(range(0,len(val_mae)),val_mae, color='Red')
  ax3.legend(['train MAE'])
  ax4.legend(['val MAE'])

print ('completed cell')

completed cell


# VIII. Trainning:


In [133]:
def trainning(epochs, model_path, opt, scheduler, models, disc_loss, cont_loss, cat_loss_param=1.0, cont_loss_param=0.0, train_length = train_dataset.__len__(), val_length = val_dataset.__len__()):
  if (isVADPrediction):
      train_cont(epochs, model_path, opt, scheduler, models, disc_loss, cont_loss, cat_loss_param=1.0, cont_loss_param=0.0, train_length = train_dataset.__len__(), val_length = val_dataset.__len__())
  else:
      train_disc(epochs, model_path, opt, scheduler, models, disc_loss, cont_loss, cat_loss_param=1.0, cont_loss_param=0.0, train_length = train_dataset.__len__(), val_length = val_dataset.__len__())

In [134]:
#define number of epochs and path to store model after trainning - (recommend at least 15 epochs for regression)
epochs = 1
path_to_store_model = "./models"

In [None]:
#trainning
trainning(epochs, path_to_store_model, opt, scheduler, [model_context, model_body, model_face, fusion_model], disc_loss=disc_loss, cont_loss=cont_loss_L2, cat_loss_param=1.0, cont_loss_param=1.0)

# IX. Testing:

In [91]:
# Load trained model for testing
model_path = path_to_store_model
model_context = torch.load(os.path.join(model_path, 'model_context.pth'))
model_body = torch.load(os.path.join(model_path, 'model_body.pth'))
model_face = torch.load(os.path.join(model_path, 'model_face.pth'))
fusion_model = torch.load(os.path.join(model_path, 'model_fusion.pth'))

model_context.eval()
model_body.eval()
model_face.eval()
fusion_model.eval()

print ('completed cell')

completed cell


In [64]:
#testing
if (isVADPrediction):
    test_mae = test_cont([model_context, model_body, model_face, fusion_model], device, test_loader, test_dataset.__len__())
    print ('testing MAE=%.4f' %(test_mae))
else:
    test_map = test_disc([model_context, model_body, model_face, fusion_model], device, test_loader, test_dataset.__len__())
    print ('testing mAP=%.4f' %(test_map))

starting testing
completed testing
ap [0.2920199  0.11064112 0.14707923 0.5522721  0.0583308  0.73740953
 0.1299824  0.24145244 0.17172313 0.1765055  0.02029791 0.8611955
 0.1432784  0.6885479  0.0925881  0.0419534  0.70323604 0.05236943
 0.22835895 0.4108984  0.15388381 0.0500508  0.17935905 0.07206114
 0.1101446  0.07119898] (26,) 0.2498784
testing mAP=0.2499
