## Import libraries

In [None]:
# basic
import os
import time
import copy
import pandas as pd
import numpy as np
import random

# plot
import matplotlib.pyplot as plt
import pylab
from tqdm import tqdm

# pytorch
import torch
import torch.nn as nn
import torch.utils.data as data
import torch.optim as optim
from torch.autograd import Variable
import torchvision
from torchvision import datasets, models, transforms

# custom
from affect_loader import AffectFolder
import tvgg
import tdense
import tresnet
from custom_models import DTAN, Zhang

## Hyper-parameters

In [None]:
N_frames = 12
batch_size = 64
img_size = 64

## Load paths into dataframes

Since the labels are different from dataset to dataset, part of the cleaning has to be done manually and the pretraining has to be done differently depending on the target training dataset. The current parameters are set to use AffectNet as pretraining for a training on CK+.

In [None]:
root_path = os.path.join('/home', 'nii', 'Documents', 'AffectNet')
list_path = os.path.join(root_path, 'Manually_Annotated_file_lists')
train_name = 'training.csv'
val_name = 'validation.csv'
training_df = pd.read_csv(os.path.join(list_path, train_name))
val_df = pd.read_csv(os.path.join(list_path, val_name))

# CK+ classes =  {0:'anger', 1:'contempt', 2:'disgust', 3:'fear', 4:'happy', 5:'sadness', 6:'surprise'}
# Oulu-CASIA classes = classes =  {0:'anger', 1:'disgust', 2:'fear', 3:'happy', 4:'sadness', 5:'surprise'}
# AffectNet classes : 0: neutral, 1: happy, 2: sad, 3: surprise 4: fear, 5: disgust, 6: anger
    # 7: contempt, 8: none, 9:uncertain, 10:non-face
# map 1->4, 2->5, 3->6, 4->3, 5->2, 6->0, 7->1, discard 0,8,9,10
# map 1->3, 2->4, 3->5, 4->2, 5->1, 6->0, discard 0,7,8,9,10

aff_2_ck = {1:4, 2:5, 3:6, 4:3, 5:2, 6:0, 7:1}
*aff_2_ou = {1:3, 2:4, 3:5, 4:2, 5:1, 6:0}
non_emotions = [0,8,9,10]
#non_emotions = [0,7,8,9,10]

training_df = training_df[~training_df['expression'].isin(non_emotions)].dropna()
training_df.expression = training_df.expression.map(aff_2_ck)
val_df = val_df[~val_df['expression'].isin(non_emotions)]
val_df.expression = val_df.expression.map(aff_2_ck)

# split into val and test
length = len(val_df)
samples = random.sample(range(length), length)
val_samples = samples[:int(length/2)]
test_samples = samples[int(length/2):]
test_df = val_df.iloc[test_samples]
val_df = val_df.iloc[val_samples]

## Make image loaders

In [None]:
image_path = os.path.join(root_path, 'Manually_Annotated_Images')

sets = ['train', 'val', 'test']
dataframes = {'train': training_df, 'val': val_df, 'test': test_df}

data_transforms = {'train': transforms.Compose(
    [transforms.Resize((img_size,img_size)),
     transforms.RandomHorizontalFlip(),
     transforms.RandomRotation(15),
     transforms.Grayscale(),
     transforms.ToTensor(),
     transforms.Normalize((0.5, ), (0.225, ))]),
                  
                   'val': transforms.Compose(
    [transforms.Resize((img_size,img_size)),
     transforms.Grayscale(),
     transforms.ToTensor(),
     transforms.Normalize((0.5, ), (0.225, ))]),
     
                    'test': transforms.Compose(
    [transforms.Resize((img_size,img_size)),
     transforms.Grayscale(),
     transforms.ToTensor(),
     transforms.Normalize((0.5, ), (0.225, ))])}


image_datasets = {x: AffectFolder(image_path, dataframes[x], transform=data_transforms[x])
                  for x in sets}

dataloaders = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=batch_size,
                                             shuffle=True, num_workers=4)
              for x in sets}
dataset_sizes = {x: len(image_datasets[x]) for x in sets}

## Training and Testing functions

In [None]:
def train_model(model, criterion, optimizer, num_epochs=25, k=5, alpha=3):
    since = time.time()

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0
    best_loss = 1e6
    
    stop_criterion = True
    
    train_accuracy = []
    val_accuracy = []
    train_loss = []
    val_loss = []
    
    #for epoch in range(num_epochs):
    epoch = 0
    while stop_criterion:
        #print('Epoch {}/{}'.format(epoch, num_epochs - 1))
        print('Epoch {}'.format(epoch))
        print('-' * 10)

        # Each epoch has a training and validation phase
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train(True)  # Set model to training mode
            else:
                model.train(False)  # Set model to evaluate mode

            running_loss = 0.0
            running_corrects = 0.0

            # Iterate over data.
            for data in tqdm(dataloaders[phase]):
                # get the inputs
                inputs, labels = data

                # wrap them in Variable
                if use_gpu:
                    inputs = Variable(inputs.cuda())
                    labels = Variable(labels.cuda())
                else:
                    inputs, labels = Variable(inputs), Variable(labels)

                # zero the parameter gradients
                optimizer.zero_grad()
                
                # reshape for all input filters
                batch_size = inputs.size(0)
                inputs = inputs.view(batch_size, 1, img_size, img_size).repeat(1, N_frames, 1, 1)
                
                # for DTAN
                #inputs = inputs.view(batch_size, N_frames, 1, img_size, img_size)

                # forward
                outputs = model(inputs)
                _, preds = torch.max(outputs.data, 1)
                
                loss = criterion(outputs, labels)
                
                # backward + optimize only if in training phase
                if phase == 'train':
                    loss.backward()
                    optimizer.step()

                # statistics
                running_loss += loss.data[0] * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects / dataset_sizes[phase]
            
            print('{} Loss: {:.4f} Acc: {:.4f}'.format(phase, epoch_loss, epoch_acc))
            
            if phase == 'train':
                train_accuracy.append(epoch_acc)
                train_loss.append(epoch_loss)
            else:
                val_accuracy.append(epoch_acc)
                val_loss.append(epoch_loss)
                GL = 100 * (epoch_loss/best_loss - 1)
                Pk = 1000 * (sum(train_loss[-k:]) / (k*min(train_loss[-k:])) - 1)
                PQ = GL / Pk
                
                if PQ > alpha:
                    stop_criterion = False
                
            # deep copy the model
            if phase == 'val' and epoch_acc >= best_acc:
                best_acc = epoch_acc
                best_loss = epoch_loss
                best_model_wts = copy.deepcopy(model.state_dict())
                
        epoch += 1

        print()
        
    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
    print('Best val Acc: {:4f}'.format(best_acc))

    # load best model weights
    model.load_state_dict(best_model_wts)
    return model, best_acc, train_accuracy, train_loss, val_accuracy, val_loss


def test_model(model, criterion):
    
    model.train(False)
    
    running_loss = 0.0
    running_corrects = 0.0

    # Iterate over data.
    for data in dataloaders['test']:
        # get the inputs
        inputs, labels = data

        # wrap them in Variable
        if use_gpu:
            inputs = Variable(inputs.cuda())
            labels = Variable(labels.cuda())
        else:
            inputs, labels = Variable(inputs), Variable(labels)

        # reshape for all input filters
        batch_size = inputs.size(0)
        inputs = inputs.view(batch_size, 1, img_size, img_size).repeat(1, N_frames, 1, 1)    
            
        # forward
        outputs = model(inputs)
        _, preds = torch.max(outputs.data, 1)
        
        loss = criterion(outputs, labels)
        
        # statistics
        running_loss += loss.data[0] * inputs.size(0)
        running_corrects += torch.sum(preds == labels.data)

    total_loss = running_loss / dataset_sizes['test']
    total_acc = running_corrects / dataset_sizes['test']

    print('{} Loss: {:.4f} Acc: {:.4f}'.format('test', total_loss, total_acc))

    return total_loss, total_acc

## Initialize model

In Pytorch, to reinitialize a model, one must first delete it to clear the RAM/GPU memory, else more and more memory will be used to sustain multiple models.

In [None]:
del model

One can pick from various models to use for spatial features extraction. Training should be done on GPU for increased speed, however debugging should be done on CPU.

In [None]:
#model = tvgg.vgg11_bn(num_classes=6, n_frames=N_frames)
model = tdense.densenet121(num_classes=7)
#model = DTAN(n_frames=N_frames, n_classes=7)
#model = Zhang(n_frames=N_frames, n_classes=7)
#model = tresnet.resnet18(num_frames=N_frames, num_classes=7)

for param in model.parameters():
    param.requires_grad = True

use_gpu = torch.cuda.is_available()
#use_gpu = False
parameters = model.parameters()

if use_gpu:
    model = model.cuda()

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(parameters, lr=0.001, weight_decay=5e-5)

## Train and test model

In [None]:
model, _, train_acc, train_loss, val_acc, val_loss = train_model(model, criterion, optimizer, num_epochs=11)

In [None]:
test_loss, test_accuracy = test_model(model, criterion)

In [None]:
# save model
save_folder = 'model'
save_path = os.path.join(save_folder, 'dense.pt')
torch.save(model.state_dict(), save_path)

## Plot accuracy and loss

In [None]:
plt.figure(figsize=(12,6))
plt.plot(train_acc, color='r', label='training')
plt.plot(val_acc, color='b', label='validation')
plt.xlabel('epochs')
plt.ylabel('accuracy')
plt.title('VGG training')
pylab.legend(loc='lower right')
plt.tight_layout()
plt.show()

In [None]:
plt.figure(figsize=(12,6))
plt.plot(train_loss,color='r', label='training')
plt.plot(val_loss,color='b', label='validation')
plt.xlabel('epochs')
plt.ylabel('loss')
plt.title('VGG training')
pylab.legend(loc='lower right')
plt.tight_layout()
plt.show()

In [None]:
model