# preparation

## imports

In [1]:
# environment imports
import sys, os, shutil, glob, random
import warnings
warnings.filterwarnings('ignore')

In [245]:
import numpy as np
import math, timm, time
import pandas as pd

# PyTorch import block
import torch
import torch.nn as nn
import torch.nn.functional as F

from   torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence
from   torch.utils.data import Dataset, DataLoader, TensorDataset
from   torch.utils.data.dataloader import default_collate
from   torch.optim.optimizer import Optimizer
from   torch.nn import TransformerEncoderLayer
from   torch.nn import Parameter

# Torchvision import block
import torchvision
import torchvision.transforms as transforms

# einpos import block
import einops
from   einops import rearrange, repeat
from   einops.layers.torch import Rearrange

from   tqdm.notebook import tqdm
from   scipy import stats

from   typing import Optional, Tuple
import copy
from   IPython.display import clear_output

In [246]:
import matplotlib.pyplot as plt
import seaborn as sns

In [3]:
print(f'current packages version:\n* torch: {torch.__version__};' \
      + f'\n* timm: {timm.__version__};\n* torchvision: {torchvision.__version__}')

current packages version:
* torch: 2.2.1;
* timm: 0.6.12;
* torchvision: 0.17.1


## variables

In [4]:
DATA_DIR, LOCAL_DIR, FE_MODELS_DIR = './data', './', './models/feature_extractors'

TRAIN_MP4_DIR, TRAIN_WAV_DIR = f'{DATA_DIR}/train/mp4', f'{DATA_DIR}/train/wav'
VALID_MP4_DIR, VALID_WAV_DIR = f'{DATA_DIR}/val/mp4', f'{DATA_DIR}/val/wav'

TRAIN_MP4_FEATURES_DIR, TRAIN_WAV_FEATURES_DIR = f'{DATA_DIR}/train/mp4_features', f'{DATA_DIR}/train/wav_features'
VALID_MP4_FEATURES_DIR, VALID_WAV_FEATURES_DIR = f'{DATA_DIR}/val/mp4_features', f'{DATA_DIR}/val/wav_features'

In [5]:
ABAW5_MODELS_CHECKPOINTS = './models/abaw_checkpoints'

In [6]:
DEVICE = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
print(f'Current connected device is {DEVICE}')

Current connected device is cuda


## classes

In [7]:
def get_prob(
    features:np.ndarray=None, classifier_weights:np.ndarray=None, classifier_bias:np.ndarray=None, 
    logits:bool=True
) -> np.ndarray:
    '''
    Function for the getting probabilities of the classes of the feature_extraction model.

            Parameters:
                features (np.ndarray): Current extracted features;
                classifier_weights (np.ndarray): Classifier weights;
                classifier_bias(np.ndarray): Classifier bias;
                logits (bool): Get the logits or not.
    '''
    xs = np.dot(features, np.transpose(classifier_weights)) + classifier_bias

    if logits:
        return xs
    else:
        e_x = np.exp(xs - np.max(xs, axis=1)[:,np.newaxis])
        return e_x / e_x.sum(axis=1)[:, None]

def stack_npy_files(directory: str=None):
    """
    Open all .npy files in the given directory and stack them into one big array.
    
            Parameters:
                directory (str): The directory containing the .npy files.
    """
    npy_files = [file for file in os.listdir(directory) if file.endswith('.npy')]
    if not npy_files:
        print("No .npy files found in the directory.")
        return None
    
    stacked_array = np.concatenate([np.load(os.path.join(directory, file)) for file in npy_files], axis=0)
    return stacked_array

class MultiModalityDataset(Dataset):
    """Dataset class for the multi-modality"""
    def __init__(
        self, data_info_path: str=None, split: str='Train', visual_feature_extractor_head: timm.models.EfficientNet=None,
        HSEmotion_feature_dir_path: str=None, HSEmotion_scores: bool=True, HSEmotion_seq_length: int=64,
        emotion2vec_feature_dir_path: str=None, emotion2vec_seq_length: int=64, use_rolling_mean: bool=True,
        rolling_mean_step: int=8, OpenFace_feature_dir_path: str=None, OpenFace_seq_length: int=64
    ):
        super(MultiModalityDataset, self).__init__()
        self.data_info, self.split = pd.read_csv(data_info_path), split
        self.data_info = self.data_info[self.data_info['Split'] == self.split]
        self.target_labels = ['Adoration', 'Amusement', 'Anxiety', 'Disgust','Empathic-Pain', 'Fear', 'Surprise']
        
        # visual-block
        self.visual_feature_extractor_head, self.HSEmotion_feature_dir_path = visual_feature_extractor_head, \
            HSEmotion_feature_dir_path
        self.HSEmotion_scores, self.HSEmotion_seq_length = HSEmotion_scores, HSEmotion_seq_length
        
        if self.HSEmotion_scores:
            self.HSEmotion_weights = self.visual_feature_extractor_head.classifier.weight.cpu().data.numpy()
            self.HSEmotion_bias = self.visual_feature_extractor_head.classifier.bias.cpu().data.numpy()
            
        # acoustic-block
        self.emotion2vec_feature_dir_path, self.emotion2vec_seq_length = emotion2vec_feature_dir_path, \
            emotion2vec_seq_length
        self.use_rolling_mean, self.rolling_mean_step = use_rolling_mean, rolling_mean_step
        
        # AU-block
        self.OpenFace_feature_dir_path, self.OpenFace_seq_length = OpenFace_feature_dir_path, OpenFace_seq_length
        
        self.file_name_padding = 5
        self.__init_inputs()
        
    def __init_inputs(self):
        print(f'Configure dataset from directories')
        self.inputs, self.labels = [], []
        
        for index, row_values in tqdm(self.data_info.iterrows(), total=self.data_info.shape[0]):
            modality_inputs = {}
            
            current_ID = str(row_values['ID'])
            current_file_ID = '0' * (self.file_name_padding - len(current_ID)) + current_ID
            
            HSEmotion_dir_path = f'{self.HSEmotion_feature_dir_path}/{current_file_ID}_batched_features'
            emotion2vec_dir_path = f'{self.emotion2vec_feature_dir_path}/{current_file_ID}_features'
            OpenFace_feature_path = f'{self.OpenFace_feature_dir_path}/{current_file_ID}.csv'
            
            if len(os.listdir(HSEmotion_dir_path)) < 1 or len(os.listdir(emotion2vec_dir_path)) < 1 or \
                not os.path.exists(OpenFace_feature_path): continue
                
            HSEmotion_features = stack_npy_files(directory=HSEmotion_dir_path)
            emotion2vec_features = np.load(f'{emotion2vec_dir_path}/feature-extraction-model_features.npy')
            OpenFace_features = pd.read_csv(OpenFace_feature_path).iloc[::5, 5:].iloc[:, 142:].values
            
            if self.HSEmotion_scores:
                scores = []
                for row in range(HSEmotion_features.shape[0]):
                    row_visual_feature = HSEmotion_features[row, :]
                    current_scores = get_prob(row_visual_feature, self.HSEmotion_weights, self.HSEmotion_bias)
                    scores.append(current_scores)

                scores = np.stack(scores, axis=0)
                HSEmotion_features = np.concatenate((HSEmotion_features, scores), axis=1)
            
            # initialize rolling mean features
            if self.use_rolling_mean:
                dataframe = pd.DataFrame(emotion2vec_features)
                emotion2vec_features = dataframe.groupby(dataframe.index // self.rolling_mean_step).mean().values
                del dataframe
            
            HSEmotion_features_length = HSEmotion_features.shape[0]
            emotion2vec_features_length = emotion2vec_features.shape[0]
            OpenFace_features_length = OpenFace_features.shape[0]
            
            if self.HSEmotion_seq_length is not None:
                if HSEmotion_features_length > self.HSEmotion_seq_length: 
                    HSEmotion_features = HSEmotion_features[:self.HSEmotion_seq_length]
                else:
                    HSEmotion_features = np.pad(
                        HSEmotion_features, pad_width=((0, self.HSEmotion_seq_length - HSEmotion_features_length),(0,0)))
               
            if self.emotion2vec_seq_length is not None:
                if emotion2vec_features_length > self.emotion2vec_seq_length: 
                    emotion2vec_features = emotion2vec_features[:self.emotion2vec_seq_length]
                else:
                    emotion2vec_features = np.pad(
                        emotion2vec_features, pad_width=((0, self.emotion2vec_seq_length - emotion2vec_features_length),(0,0)))
        
            if self.OpenFace_seq_length is not None:
                if OpenFace_features_length > self.OpenFace_seq_length: 
                    OpenFace_features = OpenFace_features[:self.OpenFace_seq_length]
                else:
                    OpenFace_features = np.pad(
                        OpenFace_features, pad_width=((0, self.OpenFace_seq_length - OpenFace_features_length),(0,0)))
            
            modality_inputs['visual'] = HSEmotion_features
            modality_inputs['acoustic'] = emotion2vec_features
            modality_inputs['AUs'] = OpenFace_features
            self.inputs.append(modality_inputs)
            
            current_labels = row_values[self.target_labels].values
            self.labels.append(current_labels)
            
            torch.cuda.empty_cache()
            
    def __len__(self):
        return len(self.labels)
    
    def __getitem__(self, index: int=None):
        modality_input = self.inputs[index]
        labels = self.labels[index]
        
        return modality_input, labels

## functions

In [8]:
def seed_everything(seed:int=None) -> None:
    '''
    Function seed every random asprect.

            Parameters:
                seed (int): The seed number.
    '''
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.deterministic = True
    
def configure_feature_extraction_model_visual(
    feature_extractor_model_path: str=None, device: torch.device=None, return_initial: bool=True
) -> np.ndarray:
    '''
    Function configure feature-extraction model
    
            Parameters:
                extraction_model_path (str): path to feature-extraction model;
                device (torch.device): torch device (default=torch.cuda);
                return_initial (bool): Return the initial model or not
            Returns:
                features-extraction model
    '''
    feature_extractor_model = torch.load(feature_extractor_model_path)
    feature_extractor_model.classifier = torch.nn.Identity()
    feature_extractor_model.to(device)
    feature_extractor_model.eval()

    if return_initial:
        return feature_extractor_model, torch.load(feature_extractor_model_path)
    else:
        return feature_extractor_model
    
import numpy as np
from scipy.stats import pearsonr

def calc_pearsons(predictions:np.array=None, ground_truth:np.array=None):
    '''
    Function calculates Pearson's Correlation Coefficient.
    
            Parameters:
                predictions (np.array): Model's forecasts;
                ground_truth (np.array): The fact.
    '''
    # Replace NaN values with 0
    predictions = np.nan_to_num(predictions, 1e-7)
    ground_truth = np.nan_to_num(ground_truth, 1e-7)
    
    pcc = pearsonr(predictions, ground_truth)
    return pcc[0]

def mean_pearsons(predictions:np.array=None, ground_truth:np.array=None, n_classes:int=7):
    '''
    Function calculates mean PCC between predictions and fact.
    
            Parameters:
                predictions (np.array): Model's forecasts;
                ground_truth (np.array): The fact;
                n_classes (int): number of classes.
    '''
    predictions, ground_truth = predictions.detach().cpu().numpy(), ground_truth.detach().cpu().numpy()
    predictions = np.nan_to_num(predictions, 1e-7)
    ground_truth = np.nan_to_num(ground_truth, 1e-7)
    
    class_wise_pcc = np.array([calc_pearsons(predictions[:, i], ground_truth[:, i]) for i in range(n_classes)])
    mean_pcc = np.mean(class_wise_pcc)
    
    return mean_pcc, class_wise_pcc

In [9]:
def train_one_epoch_multi_modal(
    model: nn.Module=None, train_dataloader: DataLoader=None, optimizer: torch.optim.Optimizer=None, 
    scheduler: torch.optim.lr_scheduler=None, device: torch.device=DEVICE, mode: str=None, criterion: nn.Module=None
):
    train_loss, train_apcc = 0, 0
    model.train()
    
    for step, current_batch in enumerate(tqdm(train_dataloader, desc='train...')) :
        features, labels = current_batch
        stacked_labels = np.stack(labels, axis=0)
        stakced_labels = np.array(stacked_labels, dtype=np.float)
        
        visual_features = torch.tensor(data=[feature['visual'] for feature in features], dtype=torch.float).to(device)
        acoustic_features = torch.tensor(data=[feature['acoustic'] for feature in features], dtype=torch.float).to(device)
        AUs_features = torch.tensor(data=[feature['AUs'] if isinstance(feature['AUs'], np.ndarray) else feature['AUs'].values for feature in features], dtype=torch.float).to(device)
        labels = torch.tensor(data=stakced_labels, dtype=torch.float).to(device)
        
        optimizer.zero_grad()
        
        model_outputs = model(visual_features, acoustic_features, AUs_features)
        batch_loss = criterion(labels, model_outputs)
        batch_loss.backward()
        
        train_loss += batch_loss.item()
        
        batch_mean_pcc, _ = mean_pearsons(model_outputs, labels)
        train_apcc += batch_mean_pcc
        
        optimizer.step()
            
    train_loss = train_loss / len(train_dataloader)
    train_apcc = train_apcc / len(train_dataloader)
    
    return train_loss, train_apcc

In [10]:
def eval_one_epoch_multi_modal(
    model: nn.Module=None, valid_dataloader: DataLoader=None, criterion: nn.Module=None, 
    scheduler: torch.optim.lr_scheduler=None, device: torch.device=DEVICE, mode: str=None,
    model_checkpoint_dir: str=None, best_validation_score: float=None, model_suffix: str=None
):
    valid_loss, valid_apcc = 0, 0
    model.eval()
    
    with torch.no_grad():
        for step, current_batch in enumerate(tqdm(valid_dataloader, desc='valid...')) :
            features, labels = current_batch
            stacked_labels = np.stack(labels, axis=0)
            stakced_labels = np.array(stacked_labels, dtype=np.float)

            visual_features = torch.tensor(data=[feature['visual'] for feature in features], dtype=torch.float).to(device)
            acoustic_features = torch.tensor(data=[feature['acoustic'] for feature in features], dtype=torch.float).to(device)
            AUs_features = torch.tensor(data=[feature['AUs'] if isinstance(feature['AUs'], np.ndarray) else feature['AUs'].values for feature in features], dtype=torch.float).to(device)
            labels = torch.tensor(data=stakced_labels, dtype=torch.float).to(device)
            
            model_outputs = model(visual_features, acoustic_features, AUs_features)
            batch_loss = criterion(labels, model_outputs)
            valid_loss += batch_loss.item()

            batch_mean_pcc, _ = mean_pearsons(model_outputs, labels)
            valid_apcc += batch_mean_pcc
            
    valid_loss = valid_loss / len(valid_dataloader)
    valid_apcc = valid_apcc / len(valid_dataloader)
    
    if valid_apcc >= best_validation_score:
        torch.save(model.state_dict(), f'{model_checkpoint_dir}/{model_suffix}_{valid_apcc:.4f}.pt')
    
    return valid_loss, valid_apcc

# Model

## dataset

In [11]:
seed_everything(2002)
_, feature_extractor_model = configure_feature_extraction_model_visual(
    feature_extractor_model_path=f'{FE_MODELS_DIR}/efficientnet_affectnet.pt', device=DEVICE, return_initial=True)

In [60]:
start_time_extraction = time.time()
train_dataset = MultiModalityDataset(
    data_info_path=f'{DATA_DIR}/preprocessed_data_info.csv', split='Train', 
    
    # visual-block
    visual_feature_extractor_head=feature_extractor_model, 
    HSEmotion_feature_dir_path=TRAIN_MP4_FEATURES_DIR + '-HSEmotion-aligned',
    HSEmotion_scores=True, HSEmotion_seq_length=64,
    
    # acoustic-block
    emotion2vec_feature_dir_path=TRAIN_WAV_FEATURES_DIR + '-feature-extraction-model',
    emotion2vec_seq_length=64, use_rolling_mean=True, rolling_mean_step=10,
    
    # AU-block
    OpenFace_feature_dir_path=f'{DATA_DIR}/train/mp4_features-OpenFace',
    OpenFace_seq_length=64
)

valid_dataset = MultiModalityDataset(
    data_info_path=f'{DATA_DIR}/preprocessed_data_info.csv', split='Val', 
    
    # visual-block
    visual_feature_extractor_head=feature_extractor_model, 
    HSEmotion_feature_dir_path=VALID_MP4_FEATURES_DIR + '-HSEmotion-aligned',
    HSEmotion_scores=True, HSEmotion_seq_length=64,
    
    # acoustic-block
    emotion2vec_feature_dir_path=VALID_WAV_FEATURES_DIR + '-feature-extraction-model',
    emotion2vec_seq_length=64, use_rolling_mean=True, rolling_mean_step=10,
    
    # AU-block
    OpenFace_feature_dir_path=f'{DATA_DIR}/val/mp4_features-OpenFace',
    OpenFace_seq_length=64
)

elapsed_time = time.time() - start_time_extraction
print(f'Dataset were configured in {np.round((elapsed_time/60), 0)} minutes')

Configure dataset from directories


  0%|          | 0/15806 [00:00<?, ?it/s]

Configure dataset from directories


  0%|          | 0/4657 [00:00<?, ?it/s]

Dataset were configured in 20.0 minutes


## dataloader

In [61]:
class MultiModalCollator(object):
    """Class for appropriate collate_fn"""
    def __init__(self, sequence_based: bool=False):
        super().__init__()
        self.sequence_based = sequence_based

    def __call__(self, batch: torch.Tensor=None):
        '''
        Select a specfic number of images randomly for the time being.
        
                Parameters:
                    batch (torch.Tensor): Current batch
        '''
        stacked_batch = np.stack(batch, axis=0)
        features = stacked_batch[:, 0]
        labels = stacked_batch[:, 1]
        
        return features, labels

In [102]:
collate_fn = MultiModalCollator(sequence_based=False)
train_dataloader = DataLoader(dataset=train_dataset, batch_size=64, shuffle=True, collate_fn=collate_fn, drop_last=True)
valid_dataloader = DataLoader(dataset=valid_dataset, batch_size=64, shuffle=False, collate_fn=collate_fn, drop_last=True)

## model

In [103]:
import torch
import torch.nn as nn
from torch.nn.utils import weight_norm
from torch.nn.init import xavier_uniform_, constant_

def clones(module, N):
    return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])

class Chomp1d(nn.Module):
    def __init__(self, chomp_size: int=None):
        super(Chomp1d, self).__init__()
        self.chomp_size = chomp_size

    def forward(self, x: torch.Tensor=None):
        return x[:, :, :-self.chomp_size].contiguous()


class TemporalBlock(nn.Module):
    def __init__(
        self, n_inputs: int=None, n_outputs: int=None, kernel_size: int=None, 
        stride: int=None, dilation: int=None, padding: int=None, dropout: float=0.2
    ):
        super(TemporalBlock, self).__init__()
        self.conv1 = weight_norm(nn.Conv1d(n_inputs, n_outputs, kernel_size,
                                           stride=stride, padding=padding, dilation=dilation))
        self.chomp1 = Chomp1d(padding)
        self.relu1 = nn.ReLU()
        self.dropout1 = nn.Dropout(dropout)

        self.conv2 = weight_norm(nn.Conv1d(n_outputs, n_outputs, kernel_size,
                                           stride=stride, padding=padding, dilation=dilation))
        self.chomp2 = Chomp1d(padding)
        self.relu2 = nn.ReLU()
        self.dropout2 = nn.Dropout(dropout)

        self.net = nn.Sequential(self.conv1, self.chomp1, self.relu1, self.dropout1,
                                 self.conv2, self.chomp2, self.relu2, self.dropout2)
        self.downsample = nn.Conv1d(n_inputs, n_outputs, 1) if n_inputs != n_outputs else None
        self.relu = nn.ReLU()
        self.init_weights()

    def init_weights(self):
        self.conv1.weight.data.normal_(0, 0.01)
        self.conv2.weight.data.normal_(0, 0.01)
        if self.downsample is not None:
            self.downsample.weight.data.normal_(0, 0.01)

    def forward(self, x: torch.Tensor=None):
        out = self.net(x)
        res = x if self.downsample is None else self.downsample(x)
        return self.relu(out + res)


class TemporalConvNet(nn.Module):
    def __init__(self, num_inputs: int=None, num_channels: int=None, kernel_size: int=2, dropout: float=0.2):
        super(TemporalConvNet, self).__init__()
        layers = []
        num_channels = num_channels
        num_levels = len(num_channels)
        self.out_channels = None
        
        for i in range(num_levels):
            dilation_size = 2 ** i
            in_channels = num_inputs if i == 0 else num_channels[i-1]
            out_channels = num_channels[i]
            layers += [
                TemporalBlock(
                    in_channels, out_channels, kernel_size, stride=1, dilation=dilation_size,
                    padding=(kernel_size-1) * dilation_size, dropout=dropout
                )]
            
            self.out_channels = out_channels

        self.network = nn.Sequential(*layers)

    def forward(self, x: torch.Tensor=None):
        return self.network(x.transpose(1, 2)).transpose(1, 2) * math.sqrt(self.out_channels)

In [104]:
def pair(t):
    return t if isinstance(t, tuple) else (t, t)

class PreNorm(nn.Module):
    def __init__(self, dim, fn):
        super().__init__()
        self.norm = nn.LayerNorm(dim)
        self.fn = fn
    def forward(self, x, **kwargs):
        return self.fn(self.norm(x), **kwargs)

class FeedForward(nn.Module):
    def __init__(self, dim, hidden_dim, dropout = 0.):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(dim, hidden_dim),
            nn.Dropout(dropout),
            nn.LeakyReLU(),    
            nn.Linear(hidden_dim, dim),
            nn.Dropout(dropout),
        )
    def forward(self, x):
        return self.net(x)

class Attention(nn.Module):
    def __init__(self, dim, heads = 8, dim_head = 64, dropout = 0.):
        super().__init__()
        inner_dim = dim_head *  heads
        project_out = not (heads == 1 and dim_head == dim)

        self.heads = heads
        self.scale = dim_head ** -0.5

        self.attend = nn.Softmax(dim = -1)
        self.to_qkv = nn.Linear(dim, inner_dim * 3, bias = False)

        self.to_out = nn.Sequential(
            nn.Linear(inner_dim, dim),
            nn.Dropout(dropout)
        ) if project_out else nn.Identity()

    def forward(self, x):
        qkv = self.to_qkv(x).chunk(3, dim = -1)
        q, k, v = map(lambda t: rearrange(t, 'b n (h d) -> b h n d', h = self.heads), qkv)

        dots = torch.matmul(q, k.transpose(-1, -2)) * self.scale

        attn = self.attend(dots)

        out = torch.matmul(attn, v)
        out = rearrange(out, 'b h n d -> b n (h d)')
        return self.to_out(out)

class Transformer(nn.Module):
    def __init__(self, dim, depth=4, heads=6, dim_head=128, mlp_dim=126, dropout = 0.):
        super().__init__()
        self.layers = nn.ModuleList([])
        for _ in range(depth):
            self.layers.append(nn.ModuleList([
                PreNorm(dim, Attention(dim, heads = heads, dim_head = dim_head, dropout = dropout)),
                PreNorm(dim, FeedForward(dim, mlp_dim, dropout = dropout))
            ]))
    def forward(self, x):
        for attn, ff in self.layers:
            x = attn(x) + x
            x = ff(x) + x
        return x
    
class MultiModalEncoder(nn.Module):
    def __init__(self, layer, N, modal_num):
        super(MultiModalEncoder, self).__init__()

        self.modal_num = modal_num
        self.layers = layer
        self.norm = nn.ModuleList()

        for i in range(self.modal_num):
            self.norm.append(LayerNorm(layer[0].size))

    def forward(self, x, mask):
        for layer in self.layers:
            x = layer(x, mask)

        _x = torch.chunk(x, self.modal_num, dim=-1)
        _x_list = []

        for i in range(self.modal_num):
            _x_list.append(self.norm[i](_x[i]))

        x = torch.cat(_x_list, dim=-1)

        return x
    
class MultiModalAttention(nn.Module):
    def __init__(self, h, d_model, modal_num, dropout=0.1):
        super(MultiModalAttention, self).__init__()
        assert d_model % h == 0

        self.d_k = d_model // h
        self.h = h
        self.modal_num = modal_num
        self.mm_linears = nn.ModuleList()

        for i in range(self.modal_num):
            linears = clones(nn.Linear(d_model, d_model), 4)
            self.mm_linears.append(linears)

        self.dropout = nn.Dropout(p=dropout)

    def forward(self, query, key, value, mask=None):
        query = torch.chunk(query, self.modal_num, dim=-1)
        key = torch.chunk(key, self.modal_num, dim=-1)
        value = torch.chunk(value, self.modal_num, dim=-1)

        if mask is not None:
            mask = mask.unsqueeze(1)

        nbatches = query[0].size(0)
        _query_list = []
        _key_list = []
        _value_list = []

        for i in range(self.modal_num):
            _query_list.append(self.mm_linears[i][0](
                query[i]).view(nbatches, -1, self.h, self.d_k))

            _key_list.append(self.mm_linears[i][1](
                key[i]).view(nbatches, -1, self.h, self.d_k))

            _value_list.append(self.mm_linears[i][2](
                value[i]).view(nbatches, -1, self.h, self.d_k))

        mm_query = torch.stack(_query_list, dim=-2)
        mm_key = torch.stack(_key_list, dim=-2)
        mm_value = torch.stack(_value_list, dim=-2)

        x, _ = attention(mm_query, mm_key, mm_value,mask=mask, dropout=self.dropout)
        x = x.transpose(-2, -3).contiguous().view(nbatches, - 1, self.modal_num, self.h * self.d_k)
        _x = torch.chunk(x, self.modal_num, dim=-2)

        _x_list = []

        for i in range(self.modal_num):
            _x_list.append(self.mm_linears[i][-1](_x[i].squeeze()))

        x = torch.cat(_x_list, dim=-1)

        return x
    
class MultiModalEncoderLayer(nn.Module):
    def __init__(self, size, modal_num, mm_atten, mt_atten, feed_forward, dropout):
        super(MultiModalEncoderLayer, self).__init__()

        self.modal_num = modal_num
        self.mm_atten = mm_atten
        self.mt_atten = mt_atten
        self.feed_forward = feed_forward

        mm_sublayer = MultiModalSublayerConnection(size, modal_num, dropout)
        mt_sublayer = nn.ModuleList()

        for i in range(modal_num):
            mt_sublayer.append(SublayerConnection(size, dropout))

        ff_sublayer = nn.ModuleList()

        for i in range(modal_num):
            ff_sublayer.append(SublayerConnection(size, dropout))

        self.sublayer = nn.ModuleList()
        self.sublayer.append(mm_sublayer)
        self.sublayer.append(mt_sublayer)
        self.sublayer.append(ff_sublayer)

        self.size = size

    def forward(self, x, mask):
        x = self.sublayer[0](x, lambda x: self.mm_atten(x, x, x))
        _x = torch.chunk(x, self.modal_num, dim=-1)

        _x_list = []

        for i in range(self.modal_num):
            feature = self.sublayer[1][i](_x[i], lambda x: self.mt_atten[i](x, x, x, mask=None))
            feature = self.sublayer[2][i](feature, self.feed_forward[i])

            _x_list.append(feature)

        x = torch.cat(_x_list, dim=-1)

        return x

class ModalEncoder(nn.Module):
    def __init__(self, dim=768, heads=6, depth=2, dim_head=64, dropout=0.3, mlp_dim=512, hidden_dim=512) -> None: 
        super().__init__() 
        self.att = Attention(dim = dim, heads = heads, dim_head = dim_head, dropout = dropout)
        self.trans = Transformer(dim = dim, depth = depth, heads = heads, dim_head = dim_head, mlp_dim = mlp_dim, dropout = dropout)
    def forward(self, x):
        att = self.att(x)
        trans = self.trans(x)
        x = torch.cat((x, att, trans), 2)   
        return x
    
from torch.autograd import Variable
class PositionalEncoding(nn.Module):
    def __init__(self, d_model: int=None, dropout: int=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1)
        
        v = torch.arange(0, d_model, 2).type(torch.float)
        v = v * -(math.log(1000.0) / d_model)
        div_term = torch.exp(v)

        pe[:, 0::2] = torch.sin(position.type(torch.float) * div_term)
        pe[:, 1::2] = torch.cos(position.type(torch.float) * div_term)
        pe = pe.unsqueeze(0).to(DEVICE)

        self.register_buffer('pe', pe)

    def forward(self, x: torch.Tensor=None):
        x = x + Variable(self.pe[:, :x.size(1)], requires_grad=False)
        return self.dropout(x)

In [105]:
class MultiHeadedAttention(nn.Module):
    def __init__(self, h, d_model, dropout=0.1):
        super(MultiHeadedAttention, self).__init__()
        assert d_model % h == 0

        self.d_k = d_model // h
        self.h = h
        self.linears = clones(nn.Linear(d_model, d_model), 4)
        self.dropout = nn.Dropout(p=dropout)

    def forward(self, query, key, value, mask=None):
        if mask is not None:
            mask = mask.unsqueeze(1)

        nbatches = query.size(0)
        query, key, value = [l(x).view(nbatches, -1, self.h, self.d_k).transpose(1, 2) 
            for l, x in zip(self.linears, (query, key, value))]

        x, _ = attention(query, key, value, mask=mask, dropout=self.dropout)
        x = x.transpose(1, 2).contiguous().view(
            nbatches, -1, self.h * self.d_k)

        return self.linears[-1](x)
    
class PositionwiseFeedForward(nn.Module):
    def __init__(self, d_model, d_ff, dropout=0.1):
        super(PositionwiseFeedForward, self).__init__()

        self.w_1 = nn.Linear(d_model, d_ff)
        self.w_2 = nn.Linear(d_ff, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        return self.w_2(self.dropout(F.relu(self.w_1(x))))
    
class MultiModalSublayerConnection(nn.Module):
    def __init__(self, size, modal_num, dropout):
        super(MultiModalSublayerConnection, self).__init__()

        self.modal_num = modal_num
        self.norm = nn.ModuleList()
        
        for i in range(self.modal_num):
            self.norm.append(LayerNorm(size))

        self.dropout = nn.Dropout(dropout)

    def forward(self, x, sublayer):
        residual = x
        _x_list = []
        _x = torch.chunk(x, self.modal_num, -1)

        for i in range(self.modal_num):
            _x_list.append(self.norm[i](_x[i]))

        x = torch.cat(_x_list, dim=-1)
        return self.dropout(sublayer(x)) + residual
    
class LayerNorm(nn.Module):
    "Construct a layernorm module (See citation for details)."
    def __init__(self, features, eps=1e-6):
        super(LayerNorm, self).__init__()

        self.a_2 = nn.Parameter(torch.ones(features)).to(DEVICE)
        self.b_2 = nn.Parameter(torch.zeros(features)).to(DEVICE)
        self.eps = eps

    def forward(self, x):
        mean = x.mean(-1, keepdim=True)
        std = x.std(-1, keepdim=True)

        return self.a_2 * (x - mean) / (std + self.eps) + self.b_2
    
class SublayerConnection(nn.Module):
    def __init__(self, size, dropout):
        super(SublayerConnection, self).__init__()

        self.norm = LayerNorm(size)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, sublayer):
        return x + self.dropout(sublayer(self.norm(x)))
    

def attention(query, key, value, mask=None, dropout=None):
    d_k = query.size(-1)
    scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)

    if mask is not None:
        scores = scores.masked_fill(mask == 0, -1e9)

    p_attn = F.softmax(scores, dim=-1)
    if dropout is not None:
        p_attn = dropout(p_attn)

    return torch.matmul(p_attn, value), p_attn

In [133]:
class SelectItem(nn.Module):
    def __init__(self, item_index):
        super(SelectItem, self).__init__()
        self._name = 'selectitem'
        self.item_index = item_index

    def forward(self, inputs):
        return inputs[self.item_index]
    
class SeqBatchNorm(nn.Module):
    def __init__(self, num_features: int = 1290):
        super(SeqBatchNorm, self).__init__()
        self._name = 'reshapetensor'
        self.shape_position = (0, 2, 1)
        self.batchnorm = nn.BatchNorm1d(num_features=num_features)

    def forward(self, inputs):
        reshaped_inputs = inputs.permute(self.shape_position)
        batched_normed = self.batchnorm(reshaped_inputs)
        return batched_normed.permute(self.shape_position)

In [163]:
class Model_3906(nn.Module): # 0.3906
    def __init__(self, num_features: list=[1290, 768], d_model: int=128, tcn_levels: int=5, blocks_num: int=4):
        super(Model_3906, self).__init__()
        self.modal_num = len(num_features)
        
        self.input = nn.ModuleList()
        for i in range(self.modal_num):
            if i == 0:
                self.input.append(
                    nn.Sequential(
                        nn.LayerNorm(num_features[i]),
                        PositionalEncoding(d_model=num_features[i], dropout=0.3),
                        ModalEncoder(dim=num_features[i], dropout=0.2),
                        nn.Dropout(p=0.5),
                        nn.Linear(in_features=num_features[i] * 3, out_features=d_model, bias=False)
                    )
                )
            else:
                self.input.append(
                    nn.Sequential(
                        nn.LayerNorm(num_features[i]),
                        TemporalConvNet(num_inputs=num_features[i], num_channels=[d_model]),
#                         PositionalEncoding(d_model=d_model, dropout=0.2),
                        ModalEncoder(dim=d_model, dropout=0.4),
                        nn.Linear(in_features=d_model * 3, out_features=d_model * 2, bias=False),
                        nn.Dropout(p=0.1),
                        nn.ReLU(),
                        nn.Linear(in_features=d_model * 2, out_features=d_model, bias=False)
                    )
                )
                
        self.dropout_embed = nn.Dropout(p=0.4)
        multimodal_encoder_layer = nn.ModuleList()
        for i in range(8):
            mm_attention = MultiModalAttention(
                h=4, d_model=d_model, modal_num=self.modal_num, dropout=0.5)
            
            mt_attention, feed_forward = nn.ModuleList(), nn.ModuleList()
            for j in range(self.modal_num):
                mt_attention.append(MultiHeadedAttention(
                    h=4, d_model=d_model, dropout=0.5))
                feed_forward.append(PositionwiseFeedForward(
                    d_model=d_model, d_ff=512, dropout=0.4))
            
            multimodal_encoder_layer.append(MultiModalEncoderLayer(
                size=d_model, modal_num=self.modal_num, mm_atten=mm_attention, mt_atten=mt_attention,
                feed_forward=feed_forward, dropout=0.5
            ))
        self.encoder = MultiModalEncoder(layer=multimodal_encoder_layer, N=8, modal_num=self.modal_num)
        
        self.regress = nn.Sequential(
            nn.Linear(in_features=512, out_features=512 // 2),
            nn.ReLU(),
            nn.LayerNorm(512 // 2),
            nn.Linear(in_features=512 // 2, out_features=7)
        )
        
        self.final_activation = nn.Sigmoid()
        
    def forward(self, v_input: torch.Tensor=None, a_input: torch.Tensor=None, au_input: torch.Tensor=None):
        batch_size, _, _ = v_input.shape
        va_input = [v_input, a_input]
        
        _x_list = []
        for i in range(self.modal_num):
            if i == 0:
                _x_list.append(self.input[i](va_input[i]))
            else:
                _x_list.append(self.input[i](va_input[i]))
            
        x = torch.cat(_x_list, dim=-1)
        x = self.dropout_embed(x)
        
        out = self.encoder(x, mask=None)
        outs = self.final_activation(self.regress(torch.cat((out, x), dim=-1)))
        return outs.mean(dim=1)

In [111]:
class Model_3975(nn.Module): # 0.3975
    def __init__(
        self, modalities_features: dict={'visual': 1290, 'acoustic': 768, 'AUs': 34}, d_model: int=128
    ):
        super(Model_3975, self).__init__()
        self.modal_num = 2
        
        self.input = nn.ModuleDict()
        for modality_type, num_features in modalities_features.items():
            if modality_type == 'visual':
                self.input[modality_type] = nn.Sequential(
                        nn.LayerNorm(num_features + modalities_features['AUs']),
                        PositionalEncoding(d_model=num_features + modalities_features['AUs'], dropout=0.5),
                        ModalEncoder(dim=num_features + modalities_features['AUs'], dropout=0.5),
                        nn.Linear(in_features=(num_features + modalities_features['AUs']) * 3, out_features=d_model)
                    )
                
            elif modality_type == 'acoustic':
                self.input[modality_type] = nn.Sequential(
                        nn.LayerNorm(num_features),
                        TemporalConvNet(num_inputs=num_features, num_channels=[d_model]),
#                         PositionalEncoding(d_model=d_model, dropout=0.2),
                        ModalEncoder(dim=d_model, dropout=0.4),
                        nn.Linear(in_features=d_model * 3, out_features=d_model * 2),
                        nn.Dropout(p=0.1),
                        nn.ReLU(),
                        nn.Linear(in_features=d_model * 2, out_features=d_model)
                    )
                
        self.dropout_embed = nn.Dropout(p=0.4)
        multimodal_encoder_layer = nn.ModuleList()
        for i in range(8):
            mm_attention = MultiModalAttention(
                h=4, d_model=d_model, modal_num=self.modal_num, dropout=0.5)
            
            mt_attention, feed_forward = nn.ModuleList(), nn.ModuleList()
            for j in range(self.modal_num):
                mt_attention.append(MultiHeadedAttention(
                    h=4, d_model=d_model, dropout=0.5))
                feed_forward.append(PositionwiseFeedForward(
                    d_model=d_model, d_ff=512, dropout=0.4))
            
            multimodal_encoder_layer.append(MultiModalEncoderLayer(
                size=d_model, modal_num=self.modal_num, mm_atten=mm_attention, mt_atten=mt_attention,
                feed_forward=feed_forward, dropout=0.5
            ))
        self.encoder = MultiModalEncoder(layer=multimodal_encoder_layer, N=8, modal_num=self.modal_num)
        
        self.regress = nn.Sequential(
            nn.Linear(in_features=d_model * 4, out_features=d_model * 4 // 2),
            nn.ReLU(),
            nn.LayerNorm(d_model * 4 // 2),
            nn.Linear(in_features=d_model * 4 // 2, out_features=7)
        )
        
        self.final_activation = nn.Sigmoid()
        
    def forward(self, v_input: torch.Tensor=None, a_input: torch.Tensor=None, au_input: torch.Tensor=None):
        batch_size, _, _ = v_input.shape
        input_dict = {'visual': torch.concat((v_input, au_input[:, :, :-1]), dim=-1), 'acoustic': a_input}
        
        _x_list = []
        for modality_type, input_tensor in input_dict.items():
            _x_list.append(self.input[modality_type](input_tensor))
            
        x = torch.cat(_x_list, dim=-1)
        x = self.dropout_embed(x)
        
        out = self.encoder(torch.cat(_x_list, dim=-1), mask=None)
        outs = self.final_activation(self.regress(torch.cat((out, x), dim=-1)))
        return outs.mean(dim=1)

In [174]:
class ModelsEnsemble(nn.Module): # 0.3975
    def __init__(self, models_checkpoint: list = None, models_class: list = None, n_targets: int = 7):
        super(ModelsEnsemble, self).__init__()
        self.pretrained_models = nn.ModuleList()
        for i in range(len(models_checkpoint)):
            current_pretrained_model = models_class[i]
            current_pretrained_model.load_state_dict(torch.load(models_checkpoint[i]))
            current_pretrained_model.eval()
            
            self.pretrained_models.append(current_pretrained_model)
            
        self.ensemble_evaluator = nn.Sequential(
            nn.Linear(in_features=n_targets * len(models_checkpoint), out_features=16),
            nn.LeakyReLU(),
            nn.Linear(in_features=16, out_features=n_targets)
        )
        self.final_activation = nn.Sigmoid()
    
    def forward(self, v_input: torch.Tensor=None, a_input: torch.Tensor=None, au_input: torch.Tensor=None):
        total_predictions = []
        for i in range(len(self.pretrained_models)):
            current_pretrained_model = self.pretrained_models[i]
            current_pretrained_model.eval()
            prediction = current_pretrained_model(v_input, a_input, au_input)
            total_predictions.append(prediction)
        
        total_predictions = torch.cat(total_predictions, dim=-1)
        outs = self.final_activation(self.ensemble_evaluator(total_predictions))
        return outs

In [175]:
models_checkpoint = [
    f'{ABAW5_MODELS_CHECKPOINTS}/multiModal_0.3975.pt', f'{ABAW5_MODELS_CHECKPOINTS}/multiModal_0.3933.pt',
    f'{ABAW5_MODELS_CHECKPOINTS}/multiModal_0.3906.pt'
]
models_class = [Model_3975().to(DEVICE), Model_3975().to(DEVICE), Model_3906().to(DEVICE)]

model = ModelsEnsemble(models_checkpoint=models_checkpoint, models_class=models_class).to(DEVICE)
regression_criterion, classification_criterion = nn.MSELoss(), nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4, weight_decay=1e-5)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 5, 0.5)

In [None]:
model = Model_3975().to(DEVICE)
model.load_state_dict(torch.load(f'{ABAW5_MODELS_CHECKPOINTS}/multiModal_0.3975.pt'))
model.eval()

model33 = Model_3975().to(DEVICE)
model33.load_state_dict(torch.load(f'{ABAW5_MODELS_CHECKPOINTS}/multiModal_0.3933.pt'))
model33.eval()

model06 = Model_3906().to(DEVICE)
model06.load_state_dict(torch.load(f'{ABAW5_MODELS_CHECKPOINTS}/multiModal_0.3906.pt'))
model06.eval()