# useful variables

In [3]:
print('choose data type:\n0 - cropped, 1 - cropped_aligned')
choice = int(input())
if choice not in [0, 1]:
    print('error... value out of bounds...')
else:
    if not choice:
        data_root = './cropped_data/cropped'
        data_type = 'cropped'
    else:
        data_root = './cropped-aligned_data/cropped_aligned'
        data_type = 'cropped-aligned'

choose data type:
0 - cropped, 1 - cropped_aligned
1


In [4]:
import os

train_annotation_file = r'./annotations/training_set_annotations.txt'
val_annotation_file = r'./annotations/validation_set_annotations.txt'
model_path = r'./models/efficientnet_affectnet.pt'
logits = True
batch_size = 1024
weights_dir = f'./new_weights/{data_type}/batch={batch_size}, logits={logits}/'

if os.path.exists(weights_dir):
    print(f'!path "{weights_dir}" is already exists')
    pass
else:
    print(f'!path "{weights_dir}" was created')
    os.mkdir(weights_dir)

!path "./new_weights/cropped-aligned/batch=1024, logits=True/" is already exists


# imports


In [40]:
import numpy as np
import math
import random

import timm
import torch
from torch.utils.data import Dataset, DataLoader, TensorDataset
from torch.optim.optimizer import Optimizer
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms
from torch.optim.optimizer import Optimizer
import torch.nn.functional as F
from timm.loss import AsymmetricLossSingleLabel
import facenet_pytorch
from facenet_pytorch import MTCNN
from torch.quantization import QuantStub, DeQuantStub
from torchmetrics import HingeLoss, F1Score

from PIL import Image
from PIL import ImageFile
import cv2

from tqdm.notebook import tqdm
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.metrics import f1_score
from sklearn.metrics import recall_score, f1_score, accuracy_score, confusion_matrix, precision_score
from sklearn.preprocessing import StandardScaler
from sklearn.utils.class_weight import compute_class_weight
import sklearn

import sys
import warnings
warnings.filterwarnings('ignore')

In [6]:
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
print(f'Connected device is {device}')

Connected device is cuda


In [7]:
def seed_everything(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.deterministic = True

seed_everything(1996)

# utils

In [8]:
class EvolvedSignMomentumOptimizer(Optimizer):
    def __init__(self, params, lr=1e-4, betas=(.9, .99), weight_decay=.0):
        if not 0.0 <= lr:
            raise ValueError('Invalid learning rate: {}'.format(lr))
        if not 0.0 <= betas[0] < 1.0:
            raise ValueError('Invalid beta parameter at index 0: {}'.format(betas[0]))
        if not 0.0 <= betas[1] < 1.0:
            raise ValueError('Invalid beta parameter at index 1: {}'.format(betas[1]))
        
        defaults = dict(lr=lr, betas=betas, weight_decay=weight_decay)
        super().__init__(params, defaults)

    @torch.no_grad()
    def step(self, closure=None):
        loss = None
        if closure is not None:
            with torch.enable_grad():
                loss = closure()

        for group in self.param_groups:
            for p in group['params']:
                if p.grad is None:
                    continue

            # Perform stepweight decay
            p.data.mul_(1 - group['lr'] * group['weight_decay'])

            grad = p.grad
            state = self.state[p]
            # State initialization
            if len(state) == 0:
              # Exponential moving average of gradient values
              state['exp_avg'] = torch.zeros_like(p)

            exp_avg = state['exp_avg']
            beta1, beta2 = group['betas']

            # Weight update
            update = exp_avg * beta1 + grad * (1 - beta1)
            p.add_(torch.sign(update), alpha=-group['lr'])
            # Decay the momentum running average coefficient
            exp_avg.mul_(beta2).add_(grad, alpha=1 - beta2)

        return loss

In [9]:
def compute_CCC(prediction, ground_truth):
    assert len(prediction) == len(ground_truth)
    
    eps = 1e-8
    
    n_objects = len(prediction)
    ground_truth = ground_truth.view(-1)
    prediction = prediction.view(-1)

    prediction_mean = torch.sum(prediction) / n_objects
    ground_truth_mean = torch.sum(ground_truth) / n_objects

    prediction_var = (prediction - prediction_mean)
    ground_truth_var = (ground_truth - ground_truth_mean)
    
    numerator = 2*torch.dot(prediction_var, ground_truth_var)
    denominator = (torch.dot(prediction_var, prediction_var) + torch.dot(ground_truth_var, ground_truth_var) + torch.pow(prediction_mean - ground_truth_mean, 2) + eps)

    ccc = numerator / denominator
    
    return ccc

In [10]:
def evaluate_model(prediction, ground_truth, task):
    if task == 'VA':
        CCC_va = compute_CCC(prediction[:, 0].to(device), ground_truth[:, 0].to(device))
        CCC_ar = compute_CCC(prediction[:, 1].to(device), ground_truth[:, 1].to(device))   
        
        competition_part = 0.5 * (CCC_va + CCC_ar)
    elif task == 'EX':
        ground_truth = ground_truth.clone().cpu().detach().numpy()
        prediction = prediction.clone().cpu().detach().numpy()
        
        competition_part = f1_score(ground_truth, prediction, average='macro')
    elif task == 'AU':
        ground_truth = ground_truth.clone().cpu().detach().numpy()
        prediction = prediction.clone().cpu().detach().numpy()
        
        all_F1 = []
        for t in range(12):
            ground_truth_ = ground_truth[:, t]
            prediction_ = prediction[:, t]
            all_F1.append(f1_score(ground_truth_, prediction_, zero_division=0))

        competition_part = np.mean(all_F1)

    return competition_part

In [11]:
class ABAWCCCLoss(nn.Module):
    def __init__(self):
        super(ABAWCCCLoss, self).__init__()
        self.eps = 1e-8

    def forward(self, prediction, ground_truth):
        assert len(prediction) == len(ground_truth)
        
        prediction_va, prediction_ar = prediction[:, 0], prediction[:, 1]
        ground_truth_va, ground_truth_ar = ground_truth[:, 0], ground_truth[:, 1]
        
        CCC_va = compute_CCC(prediction_va, ground_truth_va)
        CCC_ar = compute_CCC(prediction_ar, ground_truth_ar)
        
        loss = 1 - 0.5 * (CCC_va + CCC_ar)
        loss.requires_grad_ = True
        
        return loss

In [12]:
class ABAWMTLLoss(nn.Module):
    def __init__(self, expression_weights, action_unit_weights):
        super(ABAWMTLLoss, self).__init__()
        self.va_criterion = ABAWCCCLoss()
        self.ex_criterion = nn.CrossEntropyLoss(weight=expression_weights)
        self.au_criterion = nn.BCEWithLogitsLoss(pos_weight=action_unit_weights.reshape(-1, 12))
        
    def forward(self, prediction, ground_truth):
        assert len(prediction) == len(ground_truth)
        va_input, ex_input, au_input = ground_truth[:, :2].to(device), \
            ground_truth[:, 3].to(device), ground_truth[:, 3:].to(device)
        ex_input = ex_input.type(torch.LongTensor).to(device)
        
        va_output, ex_output, au_output = ground_truth[:, :2].to(device), \
            prediction[:, 2:10].to(device), prediction[:, 10:].to(device)
        
        
        va_loss = self.va_criterion(va_output, va_input)
        ex_loss = self.ex_criterion(ex_output, ex_input)
        au_loss = self.au_criterion(au_output, au_input)    
        
        total_loss = va_loss + ex_loss + au_loss
        total_loss.requires_grad_ = True
        
        return total_loss

In [13]:
def train_one_epoch(model, criterion, optimizer, train_dataloader, task, val_dataloader,
                        current_weights_name, best_competition_part):
    model.train()
    running_loss = 0
    number_of_objects = len(train_dataloader)
    to_train = True
    
    for i, (extracted_features, ground_truth) in enumerate(tqdm(train_dataloader, 0)):
        extracted_features, ground_truth = extracted_features.to(device), ground_truth.to(device)
        optimizer.zero_grad()
        
        if task == 'VA':
            valence_output, arousal_output = model(extracted_features)
            prediction = torch.concat((valence_output, arousal_output), dim=1)
            prediction_ = prediction
        elif task == 'EX':
            ground_truth = ground_truth.unsqueeze(1).long()
            prediction = model(extracted_features).unsqueeze(2)
            _, prediction_ = torch.max(prediction.data, 1)
        elif task == 'AU':
            prediction = model(extracted_features)
            prediction_ = ((prediction >= 0.5) * 1)
        
        iteration_loss = criterion(prediction, ground_truth)
        iteration_loss.backward()
        running_loss += iteration_loss.item()
        
        optimizer.step()
        
        if i == 0:
            epoch_prediction = prediction_
            epoch_ground_truth = ground_truth
        else:
            epoch_prediction = torch.concat((epoch_prediction, prediction_), dim=0)
            epoch_ground_truth = torch.concat((epoch_ground_truth, ground_truth), dim=0)
        
        _, iteration_competition_part = eval_one_epoch(model=model, criterion=criterion, 
            val_dataloader=val_dataloader, task=task, to_print=False)
        
        if iteration_competition_part > best_competition_part:
            if current_weights_name == '':
                pass
            else:
                os.remove(current_weights_name)
            best_competition_part = iteration_competition_part
            torch.save(model.state_dict(), weights_dir+f'{task}_{best_competition_part:3f}.pt')
            current_weights_name = weights_dir+f'{task}_{best_competition_part:3f}.pt'
        
        if i % 10 == 0:
            print(f'best {task} competition part = {best_competition_part:3f}, \
current {task} competition part = {iteration_competition_part:3f}')
            
    epoch_loss = running_loss / number_of_objects
    competition_part = evaluate_model(epoch_prediction, epoch_ground_truth, task=task)
    
    print(f'train evaluations:')
    print(f'task {task}: loss = {epoch_loss:3f}, {task} part = {competition_part:3f}')
    
    return current_weights_name, best_competition_part, to_train

In [14]:
def eval_one_epoch(model, criterion, val_dataloader, task, to_print=True):
    model.eval()
    running_loss = 0
    number_of_objects = len(val_dataloader)
    
    with torch.no_grad():
        for i, (extracted_features, ground_truth) in enumerate(val_dataloader, 0):
            extracted_features, ground_truth = extracted_features.to(device), ground_truth.to(device)

            if task == 'VA':
                valence_output, arousal_output = model(extracted_features)
                prediction = torch.concat((valence_output, arousal_output), dim=1)
                prediction_ = prediction
            elif task == 'EX':
                ground_truth = ground_truth.unsqueeze(1).long()
                prediction = model(extracted_features).unsqueeze(2)
                _, prediction_ = torch.max(prediction.data, 1)
            elif task == 'AU':
                prediction = model(extracted_features)
                prediction_ = ((prediction >= 0.5) * 1)

            iteration_loss = criterion(prediction, ground_truth)
            running_loss += iteration_loss.item()

            if i == 0:
                epoch_prediction = prediction_
                epoch_ground_truth = ground_truth
            else:
                epoch_prediction = torch.concat((epoch_prediction, prediction_), dim=0)
                epoch_ground_truth = torch.concat((epoch_ground_truth, ground_truth), dim=0)
            
    epoch_loss = running_loss / number_of_objects
    competition_part = evaluate_model(epoch_prediction, epoch_ground_truth, task=task)
    
    if to_print:
        print(f'validation evaluations:')
        print(f'task {task}: loss = {epoch_loss:3f}, {task} part = {competition_part:3f}')
    
    return epoch_loss, competition_part

In [15]:
def get_emo_weights(y_train):
    unique, counts = torch.unique(y_train, return_counts=True)
    n_classes = len(unique)
    class_weight = 1 / counts
    class_weight /= class_weight.min()
    class_weight = class_weight.to(device)
    class_weight = class_weight.to(torch.float)

    return class_weight

In [16]:
def get_action_unit_weights(targets):
    action_unit_positive_weights = [1 / torch.sum(targets[:, i] == 1).item() * (targets.shape[0] / 2) for i in range(targets.shape[1])]
    action_unit_negative_weights = [1 / torch.sum(targets[:, i] == 0).item() * (targets.shape[0] / 2) for i in range(targets.shape[1])]

    action_unit_positive_weights = torch.tensor(data=action_unit_positive_weights,
        dtype=torch.float, device=device).unsqueeze(1)
    action_unit_negative_weights = torch.tensor(data=action_unit_negative_weights,
        dtype=torch.float, device=device).unsqueeze(1)
    action_units_weights = torch.concat((action_unit_negative_weights, action_unit_positive_weights), dim=1)
    return action_unit_positive_weights.squeeze(0), action_unit_negative_weights.squeeze(0), action_units_weights

# data


In [17]:
transforms = transforms.Compose([
        transforms.ColorJitter(),
        transforms.RandomHorizontalFlip(),
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),
]) 

In [18]:
model_weights = {}
load = torch.load(model_path)

weights = load.classifier.weight.cpu().data.numpy()
bias = load.classifier.bias.cpu().data.numpy()
model_weights['efficientnet_based'] = {'weights': weights, 'bias': bias}
print(f'model ({model_path}) was correctly load\n')

model (./models/efficientnet_affectnet.pt) was correctly load



$ features = Xw^{T} + b$ \
*Logit Function*:
$ L = ln\frac{p}{1 - p}$*, where* $p = \frac{1}{1 + e^{-L}}$ 

In [19]:
def get_prob(features, classifier_weights, classifier_bias, logits=True):
    xs = np.dot(features, np.transpose(classifier_weights)) + classifier_bias

    if logits:
        return xs
    else:
        e_x = np.exp(xs - np.max(xs, axis=1)[:,np.newaxis])
        return e_x / e_x.sum(axis=1)[:, None]

In [20]:
def get_global_features(model_path):
    global_features = []
    images = []
    images_names = []
    feature_extractor_model = torch.load(model_path)
    if isinstance(feature_extractor_model, dict):
        if 'inception_resnet' in correct_path:
            feature_extractor_model = facenet_pytorch.InceptionResnetV1(pretrained='vggface2')
            feature_extractor_model.logits = torch.nn.Identity()
            feature_extractor_model.last_bn = torch.nn.Identity()
            feature_extractor_model.last_linear = torch.nn.Identity()

        else:
            print("!densenet doesn't match keys")
            return global_features, images_names
    else:
        feature_extractor_model.classifier = torch.nn.Identity()
    feature_extractor_model.to(device)
    feature_extractor_model.eval()
    for dir in tqdm(os.listdir(data_root)):
        frames_dir = os.path.join(data_root, dir)
        for image_name in os.listdir(frames_dir):
            image_name = os.path.join(frames_dir, image_name)
            if image_name.lower().endswith('.jpg'):
                image = Image.open(image_name)
                image_tensor = transforms(image)
                if image.size:
                    images_names.append(image_name)
                    images.append(image_tensor)
                    if len(images) >= 64:
                        with torch.no_grad():
                            features = feature_extractor_model(torch.stack(images, dim=0).to(device))
                        features = features.data.cpu().numpy()

                        if len(global_features):
                            global_features = np.concatenate((global_features, features), axis=0)
                        else:
                            global_features = features
                        
                        # reset images
                        images.clear()
                        
    if len(images): # get all the remains
        features = feature_extractor_model(torch.stack(images, dim=0).to(device))
        features = features.data.cpu().numpy() 

    if len(global_features):
        global_features = np.concatenate((global_features, features), axis=0)
    else:
        global_features = features 

    images.clear()
    return global_features, images_names

In [21]:
global_features, images_names = get_global_features(model_path)

  0%|          | 0/307 [00:00<?, ?it/s]

In [22]:
scores = get_prob(global_features, weights, bias, logits=logits)
filename2featuresAll = {}
print(f'!saving {model_path} features, scores and images_names, with logits={logits}')
filename2featuresAll = {img_name:(global_feature,score) for img_name,global_feature,score in zip(images_names,global_features,scores)}

!saving ./models/efficientnet_affectnet.pt features, scores and images_names, with logits=True


In [23]:
def get_features_and_target(annotation_file, filename2featuresAll):
    with open(annotation_file) as f:
        mtl_lines = f.read().splitlines()
    n_missed=0
    features, y_VA, y_EX, y_AU, y_AR = [], [], [], [], []
    mask_VA, mask_EX, mask_AU, mask_AR = [], [], [], []
    for line in mtl_lines[1:]:
        target_values = line.split(',')
        image_name = os.path.join(data_root, target_values[0].replace('/', '\\'))
        image_name.replace('\\', '\\\\')
        valence_value = float(target_values[1])
        arousal_value = float(target_values[2])
        expression_value = int(target_values[3])
        au_values = [int(au_value) for au_value in target_values[4:]]
        
        VA_threshold = -5
        EX_threshold = -1

        mask_va = (valence_value > VA_threshold and arousal_value > VA_threshold)
        if not mask_va:
            valence_value = arousal_value = 0

        # mask_ar = (arousal_value > VA_threshold)
        # if not mask_ar:
        #     arousal_value = 0
        
        mask_ex = (expression_value > EX_threshold)
        if not mask_ex:
            expression_value = 0
            
        mask_au = min(au_values) >= 0
        if not mask_au:
            au_values = [0]*len(au_values)

        if mask_va or mask_ex or mask_au:
            if image_name in filename2featuresAll:
                features.append(np.concatenate((filename2featuresAll[image_name][0], 
                                                    filename2featuresAll[image_name][1])))
                y_VA.append((valence_value, arousal_value))
                mask_VA.append(mask_va)

                # y_AR.append(arousal_value)
                # mask_AR.append(mask_ar)
                
                y_EX.append(expression_value)
                mask_EX.append(mask_ex)
                
                y_AU.append(au_values)
                mask_AU.append(mask_au)
            else:
                n_missed += 1

    features = np.array(features)
    y_VA = np.array(y_VA)
    # y_AR = np.array(y_AR)
    y_EX = np.array(y_EX)
    y_AU = np.array(y_AU)

    mask_VA = np.array(mask_VA).astype(np.float32)
    # mask_AR = np.array(mask_AR).astype(np.float32)
    mask_EX = np.array(mask_EX).astype(np.float32)
    mask_AU = np.array(mask_AU).astype(np.float32)

    print(f'shapes:\n\
            features = {features.shape}\n\
            valence = {y_VA.shape}\n\
            expression = {y_EX.shape}\n\
            aus = {y_AU.shape}\n')
    
    assert features.shape[0] == y_VA.shape[0] == y_EX.shape[0] == y_AU.shape[0]
    print(f'assert passed...\nnum_missed: {n_missed}')
    
    # return features, y_VA, y_EX, y_AU, mask_VA, mask_EX, mask_AU, y_AR, mask_AR
    return features, y_VA, y_EX, y_AU, mask_VA, mask_EX, mask_AU


In [24]:
seed_everything(1996)

train_features, train_y_VA, train_y_EX, train_y_AU, \
train_mask_VA,train_mask_EX, train_mask_AU = get_features_and_target(annotation_file=train_annotation_file, 
                                                                     filename2featuresAll=filename2featuresAll)

val_features, val_y_VA, val_y_EX, val_y_AU, \
val_mask_VA, val_mask_EX, val_mask_AU = get_features_and_target(annotation_file=val_annotation_file, 
                                                                filename2featuresAll=filename2featuresAll)

shapes:
            features = (142333, 1290)
            valence = (142333, 2)
            expression = (142333,)
            aus = (142333, 12)

assert passed...
num_missed: 0
shapes:
            features = (26876, 1290)
            valence = (26876, 2)
            expression = (26876,)
            aus = (26876, 12)

assert passed...
num_missed: 0


In [25]:
seed_everything(1996)

X_VA_train, y_VA_train = torch.tensor(train_features[train_mask_VA == 1], dtype=torch.float32), torch.tensor(train_y_VA[train_mask_VA == 1], dtype=torch.float32)
X_VA_val, y_VA_val = torch.tensor(val_features[val_mask_VA == 1], dtype=torch.float32), torch.tensor(val_y_VA[val_mask_VA == 1], dtype=torch.float32)

VA_train_dataset = TensorDataset(X_VA_train, y_VA_train)
VA_val_dataset = TensorDataset(X_VA_val, y_VA_val)

VA_trainloader = DataLoader(VA_train_dataset, batch_size=batch_size, shuffle=True)
VA_valloader = DataLoader(VA_val_dataset, batch_size=len(X_VA_val), shuffle=False)

X_EX_train, y_EX_train = torch.tensor(train_features[train_mask_EX == 1], dtype=torch.float32), torch.tensor(train_y_EX[train_mask_EX == 1], dtype=torch.float32)
X_EX_val, y_EX_val = torch.tensor(val_features[val_mask_EX == 1], dtype=torch.float32), torch.tensor(val_y_EX[val_mask_EX == 1], dtype=torch.float32)

EX_train_dataset = TensorDataset(X_EX_train, y_EX_train)
EX_val_dataset = TensorDataset(X_EX_val, y_EX_val)

EX_trainloader = DataLoader(EX_train_dataset, batch_size=batch_size, shuffle=True)
EX_valloader = DataLoader(EX_val_dataset, batch_size=len(X_EX_val), shuffle=False)

X_AU_train, y_AU_train = torch.tensor(train_features[train_mask_AU == 1], dtype=torch.float32), torch.tensor(train_y_AU[train_mask_AU == 1], dtype=torch.float32)
X_AU_val, y_AU_val = torch.tensor(val_features[val_mask_AU == 1], dtype=torch.float32), torch.tensor(val_y_AU[val_mask_AU == 1], dtype=torch.float32)

AU_train_dataset = TensorDataset(X_AU_train, y_AU_train)
AU_val_dataset = TensorDataset(X_AU_val, y_AU_val)

AU_trainloader = DataLoader(AU_train_dataset, batch_size=batch_size, shuffle=True)
AU_valloader = DataLoader(AU_val_dataset, batch_size=len(X_AU_val), shuffle=False)

In [26]:
print(f'number of legit observations:\n\
train examples:\n\
valence-arousal features: {len(X_VA_train)}, valence-arousal targets: {len(y_VA_train)}\n\
expression features: {len(X_EX_train)}, expression targets: {len(y_EX_train)}\n\
action unit features: {len(X_AU_train)}, action unit targets: {len(y_AU_train)}\n\n\
validation examples:\n\
valence-arousal features: {len(X_VA_val)}, valence-arousal targets: {len(y_VA_val)}\n\
expression features: {len(X_EX_val)}, expression targets: {len(y_EX_val)}\n\
action unit features: {len(X_AU_val)}, action unit targets: {len(y_AU_val)}\n')

number of legit observations:
train examples:
valence-arousal features: 103917, valence-arousal targets: 103917
expression features: 90645, expression targets: 90645
action unit features: 103316, action unit targets: 103316

validation examples:
valence-arousal features: 26876, valence-arousal targets: 26876
expression features: 15440, expression targets: 15440
action unit features: 26876, action unit targets: 26876



# models

In [67]:
class valence_arousal(nn.Module):
    def __init__(self):
        super().__init__()
        self.in_features = 1290
        self.hidden = nn.Linear(in_features=self.in_features, out_features=self.in_features)
        self.hidden_activation = nn.LeakyReLU()
        self.hidden_batchnorm = nn.BatchNorm1d(num_features=self.in_features)
        self.hidden_dropout = nn.Dropout(p=0.55)
        
        self.valence_head = nn.Linear(in_features=self.in_features, out_features=1)
        self.arousal_head = nn.Linear(in_features=self.in_features, out_features=1)
        
        self.head_activation = nn.Tanh()
    
    def forward(self, extracted_features):
        output = self.hidden(extracted_features)
        output = self.hidden_batchnorm(output)
        output = self.hidden_activation(output)
        output = self.hidden_dropout(output)
        
        valence_output = self.head_activation(self.valence_head(output))
        arousal_output = self.head_activation(self.arousal_head(output))
        
        return valence_output, arousal_output

In [68]:
class expression(nn.Module):
    def __init__(self):
        super().__init__()
        self.in_features = 1290
        self.hidden = nn.Linear(in_features=self.in_features, out_features=self.in_features)
        self.hidden_activation = nn.LeakyReLU()
        self.hidden_batchnorm = nn.BatchNorm1d(num_features=self.in_features)
        self.hidden_dropout = nn.Dropout(p=0.4)
        
        self.expression_head = nn.Linear(in_features=self.in_features, out_features=8)
        
        self.head_activation = nn.Softmax(dim=1)
    
    def forward(self, extracted_features):
        output = self.hidden(extracted_features)
        output = self.hidden_batchnorm(output)
        output = self.hidden_activation(output)
        output = self.hidden_dropout(output)
        
        expression_output = self.head_activation(self.expression_head(output))
        
        return expression_output

In [69]:
class action_unit(nn.Module):
    def __init__(self):
        super().__init__()
        self.in_features = 1290
        self.hidden = nn.Linear(in_features=self.in_features, out_features=self.in_features)
        self.hidden_activation = nn.LeakyReLU()
        self.hidden_batchnorm = nn.BatchNorm1d(num_features=self.in_features)
        self.hidden_dropout = nn.Dropout(p=0.6)
        
        self.action_unit_head = nn.Linear(in_features=self.in_features, out_features=12)
        
        self.head_activation = nn.Sigmoid()
    
    def forward(self, extracted_features):
        output = self.hidden(extracted_features)
        output = self.hidden_batchnorm(output)
        output = self.hidden_activation(output)
        output = self.hidden_dropout(output)
        
        action_unit_output = self.head_activation(self.action_unit_head(output))
        
        return action_unit_output

In [70]:
class EXAU(nn.Module):
    def __init__(self, pretrained_ex=False):
        super().__init__()
        self.in_features = 1290
        self.expression_model = expression()
        self.pretrained_ex = pretrained_ex
        if pretrained_ex:
            best_ex_weights = './new_weights/cropped-aligned/batch=1024, logits=True/EX_0.370579.pt'
            self.expression_model.load_state_dict(torch.load(best_ex_weights))
            self.expression_model.expression_head = nn.Identity()
            self.expression_model.eval()
        self.hidden = nn.Linear(in_features=self.in_features, out_features=self.in_features)
        self.hidden_activation = nn.LeakyReLU()
        self.hidden_batchnorm = nn.BatchNorm1d(num_features=self.in_features)
        self.hidden_dropout = nn.Dropout(p=0.35)
        
        self.action_unit_head = nn.Linear(in_features=self.in_features*2, out_features=12)
        self.head_activation = nn.Sigmoid()
        
    def forward(self, extracted_features):
        output = self.hidden(extracted_features)
        output = self.hidden_batchnorm(output)
        output = self.hidden_activation(output)
        output = self.hidden_dropout(output)
        
        if self.pretrained_ex:
            ex_output = self.expression_model(extracted_features)
            output = torch.concat((output, ex_output), dim=1)
            
        action_unit_output = self.head_activation(self.action_unit_head(output))
        
        return action_unit_output

In [133]:
class VAEX(nn.Module):
    def __init__(self, pretrained_va=False):
        super().__init__()
        self.in_features = 1290
        self.valence_arousal_model = valence_arousal()
        self.pretrained_va = pretrained_va
        if pretrained_va:
            best_va_weights = './new_weights/cropped-aligned/batch=1024, logits=True/VA_0.469680.pt'
            self.valence_arousal_model.load_state_dict(torch.load(best_va_weights))
            self.valence_arousal_model.valence_head = nn.Identity()
            self.valence_arousal_model.arousal_head = nn.Identity()
            self.valence_arousal_model.head_activation = nn.Identity()
            self.valence_arousal_model.eval()
        self.hidden = nn.Linear(in_features=self.in_features, out_features=self.in_features)
        self.hidden_activation = nn.LeakyReLU()
        self.hidden_batchnorm = nn.BatchNorm1d(num_features=self.in_features)
        self.hidden_dropout = nn.Dropout(p=0.4)
        
        self.expression_head = nn.Linear(in_features=self.in_features*3, out_features=8)
        self.head_activation = nn.Softmax(dim=1)
    
    def forward(self, extracted_features):
        output = self.hidden(extracted_features)
        output = self.hidden_batchnorm(output)
        output = self.hidden_activation(output)
        output = self.hidden_dropout(output)
        
        if self.pretrained_va:
            valence_output, arousal_output = self.valence_arousal_model(extracted_features)
            va_output = torch.concat((valence_output, arousal_output), dim=1)
            output = torch.concat((output, va_output), dim=1)
        
        expression_output = self.head_activation(self.expression_head(output))
        
        return expression_output

In [175]:
class EXVA(nn.Module):
    def __init__(self, pretrained_ex=False):
        super().__init__()
        self.in_features = 1290
        self.expression_model = expression()
        self.pretrained_ex = pretrained_ex
        if pretrained_ex:
            best_ex_weights = './new_weights/cropped-aligned/batch=1024, logits=True/EX_0.370579.pt'
            self.expression_model.load_state_dict(torch.load(best_ex_weights))
            self.expression_model.expression_head = nn.Identity()
            self.expression_model.eval()
        self.hidden = nn.Linear(in_features=self.in_features, out_features=self.in_features)
        self.hidden_activation = nn.LeakyReLU()
        self.hidden_batchnorm = nn.BatchNorm1d(num_features=self.in_features)
        self.hidden_dropout = nn.Dropout(p=0.65)
        
        self.valence_head = nn.Linear(in_features=self.in_features*2, out_features=1)
        self.arousal_head = nn.Linear(in_features=self.in_features*2, out_features=1)
        self.head_activation = nn.Tanh()
        
    def forward(self, extracted_features):
        output = self.hidden(extracted_features)
        output = self.hidden_batchnorm(output)
        output = self.hidden_activation(output)
        output = self.hidden_dropout(output)
        
        if self.pretrained_ex:
            ex_output = self.expression_model(extracted_features)
            output = torch.concat((output, ex_output), dim=1)
            
        valence_output = self.head_activation(self.valence_head(output))
        arousal_output = self.head_activation(self.arousal_head(output))
        
        return valence_output, arousal_output

In [176]:
va_model = valence_arousal().to(device)
va_optimizer = torch.optim.Adam(va_model.parameters(), lr=1e-3)


ex_model = expression().to(device)
ex_optimizer = torch.optim.Adam(ex_model.parameters(), lr=1e-3)

au_model = action_unit().to(device)
au_optimizer = torch.optim.Adam(au_model.parameters(), lr=1e-3)

exau_model = EXAU(pretrained_ex=True).to(device)
exau_optimizer = torch.optim.Adam(exau_model.parameters(), lr=1e-3)

vaex_model = VAEX(pretrained_va=True).to(device)
vaex_optimizer = torch.optim.Adam(vaex_model.parameters(), lr=1e-3)

exva_model = EXVA(pretrained_ex=True).to(device)
exva_optimizer = torch.optim.Adam(exva_model.parameters(), lr=2e-3)

# train

In [177]:
train_epochs = 2

va_scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(va_optimizer, mode='min', factor=0.1, 
    patience=1, threshold=0.002, threshold_mode='abs')
va_criterion = ABAWCCCLoss()

ex_scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(ex_optimizer, mode='min', factor=0.1, 
    patience=1, threshold=0.002, threshold_mode='abs')
emotion_weights = get_emo_weights(y_EX_train)
ex_criterion = nn.CrossEntropyLoss(weight=emotion_weights)

au_scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(au_optimizer, mode='min', factor=0.1, 
    patience=1, threshold=0.002, threshold_mode='abs')
action_unit_positive_weights, _, _ = get_action_unit_weights(y_AU_train)
action_unit_positive_weights = action_unit_positive_weights.reshape(-1, 12)
au_criterion = nn.BCEWithLogitsLoss(pos_weight=action_unit_positive_weights)

exau_scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(exau_optimizer, mode='min', factor=0.1, 
    patience=1, threshold=0.002, threshold_mode='abs')
exau_criterion = nn.BCEWithLogitsLoss(pos_weight=action_unit_positive_weights)

vaex_scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(vaex_optimizer, mode='min', factor=0.1, 
    patience=1, threshold=0.002, threshold_mode='abs')
vaex_criterion = nn.CrossEntropyLoss(weight=emotion_weights)

exva_scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(exva_optimizer, mode='min', factor=0.1, 
    patience=1, threshold=0.002, threshold_mode='abs')
exva_criterion = ABAWCCCLoss()

In [None]:
current_va_weights_name = ''
current_ex_weights_name = ''
current_au_weights_name = ''
current_exau_weights_name = ''
current_vaex_weights_name = ''
current_exva_weights_name = ''

best_competition_part_va = 0
best_competition_part_ex = 0
best_competition_part_au = 0
best_competition_part_exau = 0
best_competition_part_vaex = 0
best_competition_part_exva = 0

train_va = False
train_ex = False
train_au = False
train_exau = False
train_vaex = False
train_exva = True

for epoch in range(train_epochs):
    if epoch in range(9):
        print(f'epoch №0{epoch+1} is currently running...')
    else:
        print(f'epoch №{epoch+1} is currently running...')
    
    # VALENCE-AROUSAL
    if train_va:
        current_va_weights_name, best_competition_part_va, train_va = train_one_epoch(model=va_model, 
            criterion=va_criterion, optimizer=va_optimizer, train_dataloader=VA_trainloader, 
            val_dataloader=VA_valloader, task='VA', current_weights_name=current_va_weights_name, 
            best_competition_part=best_competition_part_va)
        va_eval_loss, _ = eval_one_epoch(model=va_model, criterion=va_criterion, val_dataloader=VA_valloader, task='VA')

        previous_va_lr = va_optimizer.param_groups[0]['lr']
        va_scheduler.step(va_eval_loss)
    
    # EXPRESSION
    if train_ex:
        current_ex_weights_name, best_competition_part_ex, train_ex = train_one_epoch(model=ex_model, 
            criterion=ex_criterion, optimizer=ex_optimizer, train_dataloader=EX_trainloader, 
            val_dataloader=EX_valloader, task='EX', current_weights_name=current_ex_weights_name, 
            best_competition_part=best_competition_part_ex)
        ex_eval_loss, _ = eval_one_epoch(model=ex_model, criterion=ex_criterion, val_dataloader=EX_valloader, task='EX')

        previous_ex_lr = ex_optimizer.param_groups[0]['lr']
        ex_scheduler.step(ex_eval_loss)
    
    # ACTION UNIT
    if train_au:
        current_au_weights_name, best_competition_part_au, train_au = train_one_epoch(model=au_model, 
            criterion=au_criterion, optimizer=au_optimizer, train_dataloader=AU_trainloader, 
            val_dataloader=AU_valloader, task='AU', current_weights_name=current_au_weights_name, 
            best_competition_part=best_competition_part_au)
        au_eval_loss, _ = eval_one_epoch(model=au_model, criterion=au_criterion, val_dataloader=AU_valloader, task='AU')

        previous_au_lr = au_optimizer.param_groups[0]['lr']
        au_scheduler.step(au_eval_loss)
    
    if train_exau:
        current_exau_weights_name, best_competition_part_exau, _ = train_one_epoch(model=exau_model, 
                criterion=exau_criterion, optimizer=exau_optimizer, train_dataloader=AU_trainloader, 
                val_dataloader=AU_valloader, task='AU', current_weights_name=current_exau_weights_name, 
                best_competition_part=best_competition_part_exau)
        exau_eval_loss, _ = eval_one_epoch(model=exau_model, criterion=exau_criterion, val_dataloader=AU_valloader, task='AU')

        previous_exau_lr = exau_optimizer.param_groups[0]['lr']
        exau_scheduler.step(exau_eval_loss)
        
    if train_vaex:
        current_vaex_weights_name, best_competition_part_vaex, train_vaex = train_one_epoch(model=vaex_model, 
            criterion=vaex_criterion, optimizer=vaex_optimizer, train_dataloader=EX_trainloader, 
            val_dataloader=EX_valloader, task='EX', current_weights_name=current_vaex_weights_name, 
            best_competition_part=best_competition_part_vaex)
        vaex_eval_loss, _ = eval_one_epoch(model=vaex_model, criterion=vaex_criterion, val_dataloader=EX_valloader, task='EX')

        previous_vaex_lr = vaex_optimizer.param_groups[0]['lr']
        vaex_scheduler.step(vaex_eval_loss)
        
    if train_exva:
        current_exva_weights_name, best_competition_part_exva, train_exva = train_one_epoch(model=exva_model, 
            criterion=exva_criterion, optimizer=exva_optimizer, train_dataloader=VA_trainloader, 
            val_dataloader=VA_valloader, task='VA', current_weights_name=current_exva_weights_name, 
            best_competition_part=best_competition_part_exva)
        exva_eval_loss, _ = eval_one_epoch(model=exva_model, criterion=exva_criterion, val_dataloader=VA_valloader, task='VA')

        previous_exva_lr = exva_optimizer.param_groups[0]['lr']
        exva_scheduler.step(exva_eval_loss)
    
    # LEARNING RATE DECREASE
#     print(f'task VA: previous lr: {previous_va_lr}, scheduled lr: {va_optimizer.param_groups[0]["lr"]}')
#     print(f'task EX: previous lr: {previous_ex_lr}, scheduled lr: {ex_optimizer.param_groups[0]["lr"]}')
#     print(f'task AU: previous lr: {previous_au_lr}, scheduled lr: {au_optimizer.param_groups[0]["lr"]}')
#     print(f'task EXAU: previous lr: {previous_exau_lr}, scheduled lr: {exau_optimizer.param_groups[0]["lr"]}')
#     print(f'task VAEX: previous lr: {previous_vaex_lr}, scheduled lr: {vaex_optimizer.param_groups[0]["lr"]}')      
    print(f'task EXVA: previous lr: {previous_exva_lr}, scheduled lr: {exva_optimizer.param_groups[0]["lr"]}')          

# ensemble

In [31]:
class ensemble(nn.Module):
    def __init__(self, model_va, model_ex, model_au):
        super().__init__()
        self.model_va = model_va
        self.model_ex = model_ex
        self.model_au = model_au
        
        self.model_va.eval()
        self.model_ex.eval()
        self.model_au.eval()
        
    def forward(self, x):
        with torch.no_grad():
            va_output, ar_output = self.model_va(x)
            ex_output = self.model_ex(x)
            au_output = self.model_au(x)

        return va_output, ar_output, ex_output, au_output

In [47]:
def eval_ensemble(ensemble, val_features, val_targets, val_targets_mask, print_f1_ex_statistic=False,
                  print_f1_au_statistic=False):
    with torch.no_grad():
        va_output, ar_output, ex_output, au_output = ensemble(val_features)

        va_input = val_targets[0][val_targets_mask[0] == 1]
        va_output = torch.concat((va_output, ar_output), dim=1)
        
        va_competition_part = evaluate_model(va_output, va_input, task='VA')
        
        ex_input = val_targets[1][val_targets_mask[1] == 1].unsqueeze(1).long()
        ex_output = ex_output.unsqueeze(2)
        _, ex_output = torch.max(ex_output.data, 1)
        ex_output = ex_output[val_targets_mask[1] == 1]
        
        ex_competition_part = evaluate_model(ex_output, ex_input, task='EX')

        au_input = val_targets[2][val_targets_mask[2] == 1]
        au_output = ((au_output >= 0.5) * 1)
        au_output = au_output[val_targets_mask[2] == 1]
        
        au_competition_part = evaluate_model(au_output, au_input, task='AU')

        abaw_metric = va_competition_part + ex_competition_part + au_competition_part
    if print_f1_ex_statistic:
        print(sklearn.metrics.classification_report(ex_input.data.cpu().numpy(), ex_output.data.cpu().numpy()))
    if print_f1_au_statistic:
        print(sklearn.metrics.classification_report(au_input.data.cpu().numpy(), au_output.data.cpu().numpy()))
    print(f'ABAW result: {abaw_metric:3f}, valence-arousal: {va_competition_part:3f}, \
expression: {ex_competition_part:3f}, action unit: {au_competition_part:3f}')

In [37]:
seed_everything(1996)

val_features_ = torch.tensor(data=val_features, dtype=torch.float, device=device)
val_targets = [torch.tensor(data=data, dtype=torch.float, device=device) for data in [val_y_VA, val_y_EX, val_y_AU]]
val_targets_mask = [val_mask_VA, val_mask_EX, val_mask_AU]

In [None]:
print(f'current valence-arousal weights path: "{current_va_weights_name}",\n\
current expression weights path: "{current_ex_weights_name}",\n\
current action_unit weights path: "{current_au_weights_name}",\n')

In [790]:
valence_arousal_model = valence_arousal().to(device).eval()
valence_arousal_model.load_state_dict(torch.load(current_va_weights_name))

expression_model = expression().to(device).eval()
expression_model.load_state_dict(torch.load(current_ex_weights_name))

action_unit_model = action_unit().to(device).eval()
action_unit_model.load_state_dict(torch.load(current_au_weights_name))

ensemble = ensemble(model_va=valence_arousal_model, model_ex=expression_model, model_au=action_unit_model)

In [791]:
eval_ensemble(ensemble, val_features_, val_targets, val_targets_mask)

ABAW result: 1.330468, valence-arousal: 0.452668, expression: 0.370579, action unit: 0.507221


# best weights


In [138]:
all_va_weights = [weights_dir+weights for weights in os.listdir(weights_dir) if 'VA' in weights]
all_ex_weights = [weights_dir+weights for weights in os.listdir(weights_dir) if 'EX' in weights]
all_au_weights = [weights_dir+weights for weights in os.listdir(weights_dir) if 'AU' in weights]
all_exau_weights = [weights_dir+weights for weights in os.listdir(weights_dir) if 'exau' in weights]

max_va_part = 0
max_ex_part = 0
max_au_part = 0
max_exau_part = 0

best_va_weight = ''
best_ex_weight = ''
best_au_weight = ''
best_exau_weight = weights_dir+'exau_0.510846.pt'
best_vaex_weight = weights_dir+'vaex_0.383444.pt'

for va_weight, ex_weight, au_weight in zip(all_va_weights, all_ex_weights, all_au_weights):
    va_part = float(va_weight.split('_')[-1].split('.pt')[0])
    ex_part = float(ex_weight.split('_')[-1].split('.pt')[0])
    au_part = float(au_weight.split('_')[-1].split('.pt')[0])
    
    if va_part > max_va_part:
        max_va_part = va_part
        best_va_weight = va_weight
    if ex_part > max_ex_part:
        max_ex_part = ex_part
        best_ex_weight = ex_weight
    if au_part > max_au_part:
        max_au_part = au_part
        best_au_weight = au_weight

print(f'best VA weights: {best_va_weight},\n\
best EX weights: {best_ex_weight}\n\
best AU weights: {best_au_weight}\n\
best EXAU weights: {best_exau_weight}\n\
best VAEX weights: {best_vaex_weight}')

best VA weights: ./new_weights/cropped-aligned/batch=1024, logits=True/VA_0.469680.pt,
best EX weights: ./new_weights/cropped-aligned/batch=1024, logits=True/EX_0.370579.pt
best AU weights: ./new_weights/cropped-aligned/batch=1024, logits=True/AU_0.507221.pt
best EXAU weights: ./new_weights/cropped-aligned/batch=1024, logits=True/exau_0.510846.pt
best VAEX weights: ./new_weights/cropped-aligned/batch=1024, logits=True/vaex_0.383444.pt


In [139]:
valence_arousal_model = valence_arousal().to(device)
valence_arousal_model.load_state_dict(torch.load(best_va_weight))

expression_model = expression().to(device)
expression_model.load_state_dict(torch.load(best_ex_weight))

action_unit_model = action_unit().to(device)
action_unit_model.load_state_dict(torch.load(best_au_weight))

best_ensemble = ensemble(model_va=valence_arousal_model, model_ex=expression_model, model_au=action_unit_model)

In [140]:
eval_ensemble(best_ensemble, val_features_, val_targets, val_targets_mask, print_f1_au_statistic=True,
    print_f1_ex_statistic=True)

              precision    recall  f1-score   support

           0       0.28      0.44      0.34      1886
           1       0.16      0.47      0.23       487
           2       0.35      0.60      0.44       565
           3       0.31      0.38      0.34      1254
           4       0.56      0.49      0.52      3751
           5       0.70      0.41      0.52      1893
           6       0.15      0.26      0.19      1003
           7       0.56      0.28      0.38      4601

    accuracy                           0.39     15440
   macro avg       0.38      0.42      0.37     15440
weighted avg       0.48      0.39      0.41     15440

              precision    recall  f1-score   support

           0       0.62      0.54      0.58      5531
           1       0.42      0.46      0.44      2554
           2       0.59      0.53      0.56      5442
           3       0.68      0.47      0.56      8818
           4       0.77      0.63      0.69     13224
           5       0.76 

In [141]:
expression_action_unit_model = EXAU(pretrained_ex=True).to(device)
expression_action_unit_model.load_state_dict(torch.load(best_exau_weight))

exau_ensemble = ensemble(model_va=valence_arousal_model, model_ex=expression_model, model_au=expression_action_unit_model)

In [142]:
eval_ensemble(exau_ensemble, val_features_, val_targets, val_targets_mask, print_f1_au_statistic=True,
    print_f1_ex_statistic=True)

              precision    recall  f1-score   support

           0       0.28      0.44      0.34      1886
           1       0.16      0.47      0.23       487
           2       0.35      0.60      0.44       565
           3       0.31      0.38      0.34      1254
           4       0.56      0.49      0.52      3751
           5       0.70      0.41      0.52      1893
           6       0.15      0.26      0.19      1003
           7       0.56      0.28      0.38      4601

    accuracy                           0.39     15440
   macro avg       0.38      0.42      0.37     15440
weighted avg       0.48      0.39      0.41     15440

              precision    recall  f1-score   support

           0       0.64      0.52      0.57      5531
           1       0.42      0.48      0.45      2554
           2       0.59      0.55      0.57      5442
           3       0.68      0.50      0.58      8818
           4       0.75      0.70      0.72     13224
           5       0.75 

In [143]:
valence_arousal_expression_model = VAEX(pretrained_va=True).to(device)
valence_arousal_expression_model.load_state_dict(torch.load(best_vaex_weight))

vaex_exau_ensemble = ensemble(model_va=valence_arousal_model, model_ex=valence_arousal_expression_model, 
    model_au=expression_action_unit_model)

In [144]:
eval_ensemble(vaex_exau_ensemble, val_features_, val_targets, val_targets_mask, print_f1_au_statistic=True,
    print_f1_ex_statistic=True)

              precision    recall  f1-score   support

           0       0.33      0.33      0.33      1886
           1       0.17      0.37      0.23       487
           2       0.34      0.61      0.44       565
           3       0.32      0.37      0.34      1254
           4       0.55      0.54      0.54      3751
           5       0.68      0.48      0.56      1893
           6       0.17      0.32      0.22      1003
           7       0.50      0.34      0.40      4601

    accuracy                           0.41     15440
   macro avg       0.38      0.42      0.38     15440
weighted avg       0.46      0.41      0.43     15440

              precision    recall  f1-score   support

           0       0.64      0.52      0.57      5531
           1       0.42      0.48      0.45      2554
           2       0.59      0.55      0.57      5442
           3       0.68      0.50      0.58      8818
           4       0.75      0.70      0.72     13224
           5       0.75 

In [147]:
vaex_ensemble = ensemble(model_va=valence_arousal_model, model_ex=valence_arousal_expression_model, 
    model_au=action_unit_model)

In [148]:
eval_ensemble(vaex_ensemble, val_features_, val_targets, val_targets_mask, print_f1_au_statistic=True,
    print_f1_ex_statistic=True)

              precision    recall  f1-score   support

           0       0.33      0.33      0.33      1886
           1       0.17      0.37      0.23       487
           2       0.34      0.61      0.44       565
           3       0.32      0.37      0.34      1254
           4       0.55      0.54      0.54      3751
           5       0.68      0.48      0.56      1893
           6       0.17      0.32      0.22      1003
           7       0.50      0.34      0.40      4601

    accuracy                           0.41     15440
   macro avg       0.38      0.42      0.38     15440
weighted avg       0.46      0.41      0.43     15440

              precision    recall  f1-score   support

           0       0.62      0.54      0.58      5531
           1       0.42      0.46      0.44      2554
           2       0.59      0.53      0.56      5442
           3       0.68      0.47      0.56      8818
           4       0.77      0.63      0.69     13224
           5       0.76 