<a href="https://colab.research.google.com/github/hiyadullu/epics/blob/main/cloud_notebooks/CREMAD_run.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Emotion Recognition using Multimodal Deep Learning Approaches**

In this experiment we propose a novel Multimodal Architecture to predict 8 different human emotions. We use audio and video as inputs to our model. The dataset this notebook runs on is the SAVEE datatset


In [335]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [336]:
!pip install av

import numpy as np
np.float = float  # Fix deprecated np.float usage
np.int = int      # Fix deprecated np.int usage

import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader

try:
    from torchsummary import summary
except:
    !pip install torchsummary
    from torchsummary import summary

try:
    from torcheval.metrics.functional import multiclass_f1_score
except:
    !pip install torcheval
    from torcheval.metrics.functional import multiclass_f1_score

try:
    import skvideo.io
except:
    !pip install sk-video
    import skvideo.io


import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

import os

import re



import seaborn as sn

import torchvision
import torchvision.transforms as T
from torchvision.io import read_image, read_video

import torchaudio

import librosa
import librosa.display
import IPython.display as ipd
import matplotlib.pyplot as plt
from IPython.display import HTML, Video

import random

from sklearn.model_selection import train_test_split, KFold
from sklearn.metrics import confusion_matrix, classification_report

import gc

# !pip install pytorchvideo

from torchvision.transforms import Compose, Lambda, RandomCrop, RandomHorizontalFlip, Resize, ToTensor, ToPILImage, CenterCrop, ColorJitter, RandomPerspective

# for dirname, _, filenames in os.walk('/kaggle/input'):
#     for filename in filenames:
#         print(os.path.join(dirname, filename))

import inspect



## **Setting up environment & hyperparameters**

In [337]:
try:
    import wandb
    wandb.login()
except:
    !pip install wandb -q
    import wandb
    wandb.login()

In [338]:
# Set up device: use GPU or CPU
device = "cuda" if torch.cuda.is_available() else "cpu"
device

'cpu'

In [339]:
try:
    torch.cuda.get_device_name(0)
except:
    print("No CUDA. CPU available")

No CUDA. CPU available


In [340]:
# The path to the root directory of the dataset. Change this on your system
working_dir = "/content/drive/MyDrive/datasets/CREMA-D/"

In [341]:
# Hyperparameters. Tweak as you wish
hyperparams = {
    "lr": 0.0001, # Learning Rate
    "epochs": 30, # Number of Epochs
    "adam_betas": (0.98, 0.999), # B1 and B2 (weight decays) of ADAM
    "batch": 16, # Mini-batch size
    "sdg_momentum": 0.99, # Stochastic Gradient Descent momentum
    "sdg_weight_decay": 0.45, # Stochastic Gradient Descent weight decay,
    "num_features": 1024, # the no. of feature maps we will divide the image into
    "max_seq_len": 120
}

hyperparams

{'lr': 0.0001,
 'epochs': 30,
 'adam_betas': (0.98, 0.999),
 'batch': 16,
 'sdg_momentum': 0.99,
 'sdg_weight_decay': 0.45,
 'num_features': 1024,
 'max_seq_len': 120}

<br/>

As stated by RAVDESS: \

File naming convention

Each of the 1440 files has a unique filename. The filename consists of a 1-aprt numerical identifier (e.g., s03.wav). These identifiers define the stimulus characteristics:

Filename identifiers:

Audio files consist of audio WAV files sampled at 44.1 kHz

There are 15 sentences for each of the 7 emotion categories.
The initial letter(s) of the file name represents the emotion class, and the following digits represent the sentence number.
The letters 'a', 'd', 'f', 'h', 'n', 'sa' and 'su' represent 'anger', 'disgust', 'fear', 'happiness', 'neutral', 'sadness' and 'surprise' emotion classes respectively.


E.g., 'd03.wav' is the 3rd disgust sentence audio. \
E.g., 'd03.avi' is the 3rd disgust sentence video.

In [342]:
# A dict that maps the class name to our assigned index (uses: track emotion index for prediction)
class2idx = {
    "anger": 0,
    "disgust": 1,
    "fear": 2,
    "happy": 3,
    "neutral": 4,
    "sad": 5,
}

# A dict that maps the index to the class name (uses: decorate prediction)
idx2class = {v:k for k,v in class2idx.items()}

# A dict that maps the type given in the file name to our index(uses: dataset preparation)
tag2idx = {
    "ANG": 0,
    "DIS": 1,
    "FEA": 2,
    "HAP": 3,
    "NEU": 4,
    "SAD": 5,
}

In [343]:
idx2class

{0: 'anger', 1: 'disgust', 2: 'fear', 3: 'happy', 4: 'neutral', 5: 'sad'}

## **Defining the Transforms(Augmentation Techniques) and helper functions**

In [344]:
# Defining the transforms:
video_frame_transform = Compose([
#     ToPILImage(),
    Resize((252,252)),
    CenterCrop((184,184)),
#     ToTensor()
])

# change frame color randomly
video_frame_augment_color = Compose([
#     ToPILImage(),
    Resize((252,252)),
    CenterCrop((184,184)),
    ColorJitter(brightness=0.4, hue=0.3, saturation=0.4),
#     ToTensor()
])

# change frame prespective randomly
video_frame_augment_persp = Compose([
#     ToPILImage(),
    Resize((252,252)),
    CenterCrop((184,184)),
    RandomPerspective(distortion_scale=0.3, p=1.0),
#     ToTensor()
])


In [345]:
"""
    Defining the helper functions for the Audio mel-spectogram technique
"""

# Get the melspec of the audio as image/np 2d array
def wav2melSpec(AUDIO_PATH):
    audio, sr = librosa.load(AUDIO_PATH)
    return librosa.feature.melspectrogram(y=audio, sr=sr)


# Show the image spectogram
def imgSpec(ms_feature):
    fig, ax = plt.subplots()
    ms_dB = librosa.power_to_db(ms_feature, ref=np.max)
    print(ms_feature.shape)
    img = librosa.display.specshow(ms_dB, x_axis='time', y_axis='mel', ax=ax)
    fig.colorbar(img, ax=ax, format='%+2.0f dB')
    ax.set(title='Mel-frequency spectrogram');

# Hear the audio
def hear_audio(AUDIO_PATH):
    audio, sr = librosa.load(AUDIO_PATH)

    print("\t", end="")
    ipd.display(ipd.Audio(data=audio, rate=sr))


def show_video(video_path):
    from base64 import b64encode

    if os.path.isfile(video_path):
        ext = '.mp4'
    else:
        print("Error: Please check the path.")

    video_encoded = open(video_path, "rb").read()
    data = "data:video/mp4;base64," + b64encode(video_encoded).decode()

    video_tag = '<video width="400" height="300" controls alt="test" src="%s">' % data
    return HTML(data=video_tag)

# Show 1 example
def show_example(video_path, audio_path, prediction=None, actual=None, save_memory=False):
    if prediction is not None:
        print("Predicted Label:", idx2class[prediction])
    print("Actual Label:", idx2class[actual])

    if save_memory is False:
        print("Video path:", video_path)
        ipd.display(Video(video_path, embed=True, width=400, height=300))

        # display(show_video(video_path))
        print("Audio path:", audio_path)
        hear_audio(audio_path)

In [346]:
"""
    Defining the helper functions for the Audio MFCC technique
"""

# audio effects
def audio_effects(audio, sample_rate, augment=1):
    data = None
    if augment == 1:
        data = librosa.effects.percussive(y=audio)
    elif augment == 2:
        data = librosa.effects.pitch_shift(y=audio, sr=sample_rate, n_steps=3)
    return data


# normalize the audio wave
def normalize_audio(audio):
    audio = audio / np.max(np.abs(audio))
    return audio

def feature_extractor(file, augment=0, test=False):

    attempt = 0
    while True:
        try:
            data, sample_rate = librosa.load(file)
            break
        except:
            if attempt == 50:
                print("failed trying to find audio file", file)
                break
            print("Audio file not read. Trying again")
            attempt += 1

#     print(data.shape)

    if augment > 0:
        data = audio_effects(data, sample_rate, augment=augment)

    data = normalize_audio(data)

    # zero crossing rate
    zcr = librosa.feature.zero_crossing_rate(y=data)[0]
    zcr /= zcr.max()
    zcr = zcr[0:(0+128)]
    if len(zcr) < 128:
        zcr = librosa.util.fix_length(zcr, size=128)
#     result=np.vstack((result, zcr))


    # MFCC
    mfcc = np.mean(librosa.feature.mfcc(y=data, sr=sample_rate, n_mfcc=128).T, axis=0)
    mfcc /= mfcc.max()
#     result = np.vstack((result, mfcc))

    # Root Mean Square Value
    rms = librosa.feature.rms(y=data)[0]
    rms /= rms.max()
    rms = rms[0:(0+128)]
    if len(rms) < 128:
        rms = librosa.util.fix_length(rms, size=128)
#     result = np.vstack((result, rms))

    # MelSpectogram
    mel = librosa.feature.melspectrogram(y=data, sr=sample_rate)
    mel = librosa.amplitude_to_db(mel, ref = np.max)
    mel = np.mean(mel.T, axis=0)
    mel /= mel.sum()
#     result = np.vstack((result, mel))

    if test:
        return_dict = {
            "raw": data,
            "sr": sample_rate,
            "zcr": zcr,
            "mfcc": mfcc,
            "rms": rms,
            "mel": mel
        }
    else:
        return_dict = {
            "zcr": zcr,
            "mfcc": mfcc,
            "rms": rms,
            "mel": mel
        }
    return return_dict

In [347]:
def dict_to_tensor(dictionary):
    out_dict = {}
    for item in dictionary.items():
        if item[0] == "sr":
            out_dict[item[0]] = torch.tensor(item[1]).to(device)
        else:
            out_dict[item[0]] = torch.from_numpy(item[1]).to(device)

    return out_dict

<br/>

---

<br/>

## **Dataset Preparation**

In [348]:
def make_ds_as_list(path):
    audio = []
    video = []
    labels = []

    # renamed folders
    video_dir = os.path.join(path, "VideoFlash")
    audio_dir = os.path.join(path, "AudioWAV")

    video_files = sorted([f for f in os.listdir(video_dir) if f.endswith('.mp4')])
    audio_files = sorted([f for f in os.listdir(audio_dir) if f.endswith('.wav')])

    # lookup dictionary for audio files
    audio_bases_dict = {os.path.splitext(f)[0]: f for f in audio_files}

    matched_count = 0

    for video_file in video_files:
        video_base = os.path.splitext(video_file)[0]


        # CREMA-D filenames are like: 1001_DFA_ANG_XX
        prefix = '_'.join(video_base.split('_')[:4])

        matching_audio = audio_bases_dict.get(prefix, None)

        if matching_audio is None:
            continue  # Skip if no match

        # emotion tag is always 3rd chunk (e.g., "ANG")
        label_tag = video_base.split('_')[2].upper()
        label = tag2idx.get(label_tag, None)
        if label is None:
            continue

        video.append(os.path.join(video_dir, video_file))
        audio.append(os.path.join(audio_dir, matching_audio))
        labels.append(label)
        matched_count += 1

    print(f"DEBUG: {matched_count} matching audio-video-label triplets created.")

    return audio, video, labels


def make_dataframe(path):
    audio, video, labels = make_ds_as_list(path)
    df = pd.DataFrame({
        "audio_path": audio,
        "video_path": video,
        "label": labels,
        "augment": [0] * len(labels)   # default augment flag
    })
    return df




In [349]:
# Make the dataset
df = make_dataframe(working_dir)

# Check if data is valid
print(f"Total samples collected: {len(df)}")
print(df.head())

# Proceed with train-test split only if data is available
if len(df) > 0:
    from sklearn.model_selection import train_test_split

    train_df, temp_df = train_test_split(df, test_size=0.40, random_state=42)
    cv_df, test_df = train_test_split(temp_df, test_size=0.50, random_state=42)

    train_df['augment'] = 0
    cv_df['augment'] = 0
    test_df['augment'] = 0

    print(f"Train: {len(train_df)}, Validation: {len(cv_df)}, Test: {len(test_df)}")

else:
    raise Exception("No data found. Please check folder names and file format.")





DEBUG: 7442 matching audio-video-label triplets created.
Total samples collected: 7442
                                          audio_path  \
0  /content/drive/MyDrive/datasets/CREMA-D/AudioW...   
1  /content/drive/MyDrive/datasets/CREMA-D/AudioW...   
2  /content/drive/MyDrive/datasets/CREMA-D/AudioW...   
3  /content/drive/MyDrive/datasets/CREMA-D/AudioW...   
4  /content/drive/MyDrive/datasets/CREMA-D/AudioW...   

                                          video_path  label  augment  
0  /content/drive/MyDrive/datasets/CREMA-D/VideoF...      0        0  
1  /content/drive/MyDrive/datasets/CREMA-D/VideoF...      1        0  
2  /content/drive/MyDrive/datasets/CREMA-D/VideoF...      2        0  
3  /content/drive/MyDrive/datasets/CREMA-D/VideoF...      3        0  
4  /content/drive/MyDrive/datasets/CREMA-D/VideoF...      4        0  
Train: 4465, Validation: 1488, Test: 1489


<br>
<hr>

## Data checking and testing phase

### Do the transforms work?

In [350]:
# idx = 50


# # RAW AUDIO
# d = feature_extractor(non_augment_df["audio_path"][idx], augment=0, test=True)
# data = d["raw"]
# sr = d["sr"]
# print("Audio transformed with augmentation scheme 0 (no augementation; raw audio):")
# ipd.display(ipd.Audio(data=data, rate=sr))
# print("\n")



# # AUGMENTATION 1 - harmonic
# d = feature_extractor(augment_1_df["audio_path"][idx], augment=1, test=True)
# data = d["raw"]
# sr = d["sr"]
# print("Audio transformed with augmentation scheme 1:")
# ipd.display(ipd.Audio(data=data, rate=sr))
# print("\n")



# # AUGMENTATION 2 - pitch shift
# d = feature_extractor(augment_2_df["audio_path"][idx], augment=2, test=True)
# data = d["raw"]
# sr = d["sr"]
# print("Audio transformed with augmentation scheme 2:")
# ipd.display(ipd.Audio(data=data, rate=sr))
# print("\n")

<br/>

---

## **Dataset Splitting**

We will now split the dataset into train, cv, test splits.

60:20:20

In [351]:
# Split into 60% train, 20% val, 20% test set
train_df, test_df = train_test_split(df, test_size=0.40, random_state=42)
len(train_df), len(test_df)

(4465, 2977)

In [352]:
# Create augmented copies
augment_1_df = train_df.copy().reset_index(drop=True)
augment_1_df["augment"] = 1

augment_2_df = train_df.copy().reset_index(drop=True)
augment_2_df["augment"] = 2

# Reset index on the original training set too
train_df = train_df.reset_index(drop=True)
train_df["augment"] = 0

# ✅ Safe way to check an example row
print(
    train_df["audio_path"].iloc[0],
    train_df["video_path"].iloc[0],
    augment_1_df["audio_path"].iloc[0],
    augment_1_df["video_path"].iloc[0]
)


/content/drive/MyDrive/datasets/CREMA-D/AudioWAV/1035_ITS_DIS_XX.wav /content/drive/MyDrive/datasets/CREMA-D/VideoFlash/1035_ITS_DIS_XX.mp4 /content/drive/MyDrive/datasets/CREMA-D/AudioWAV/1035_ITS_DIS_XX.wav /content/drive/MyDrive/datasets/CREMA-D/VideoFlash/1035_ITS_DIS_XX.mp4


In [353]:
train_df = pd.concat([train_df, augment_1_df, augment_2_df])
train_df.head()
train_df = train_df.reset_index(drop=True)

In [354]:
# # Check your examples
# idx = 90 # Change index to see different examples
# show_example(train_df["video_path"].iloc[idx], train_df["audio_path"].iloc[idx], actual=train_df["label"].iloc[idx])

In [355]:
cv_df, test_df = train_test_split(test_df, test_size=0.50, random_state=42)

In [356]:
cv_df['augment'] = 0
test_df['augment'] = 0

In [357]:
# View their length
len(train_df), len(cv_df), len(test_df)

(13395, 1488, 1489)

In [358]:
del df
gc.collect()

4152

In [359]:
# import torchvision.models as models

In [360]:
# videodata = skvideo.io.vread("/kaggle/input/cremad/CREMA-D/VideoFlash/1066_IOM_DIS_XX.mp4")
# print(videodata.shape)

In [361]:
# videodata = torch.Tensor(videodata)
# videodata /= 255.0

In [362]:
# for i, frame in enumerate(videodata):
#     frame = frame.numpy()
#     plt.imshow(frame)
#     plt.show()

#     if i == 10:
#         break

In [364]:
random.seed(42)

class CREMADataset(Dataset):
    def __init__(
        self,
        dataframe,
        video_frame_transform=None,
        video_strategy='optimal',
        cut_video=False,
        cut_audio=False,
        selection="quartile",
        variant="all"
    ):
        self.cut_video = cut_video
        self.cut_audio = cut_audio

        self.examples = dataframe
        self.video_frame_transform = {
            0: video_frame_transform,
            1: video_frame_augment_color,
            2: video_frame_augment_persp
        }

        self.video_strategy = video_strategy
        self.selection = selection
        self.variant = variant

        del dataframe, video_frame_transform

    def __len__(self):
        return len(self.examples)

    def __getitem__(self, idx):
        row = self.examples.iloc[idx]
        video_path = row['video_path']
        audio_path = row['audio_path']
        label = row['label']
        augment = row['augment']

        # Load video
        video_data = skvideo.io.vread(video_path)  # [frames, H, W, C]
        video_data = torch.tensor(video_data).float() / 255.0  # Normalize

        # Select video frames
        if self.video_strategy == 'optimal':
            video_frames = self.__optimal_strategy(video_data, augment=augment)
        else:
            video_frames = self.__all_strategy(video_data)

        # Extract audio features
        audio_features = feature_extractor(audio_path, augment=augment, test=False)
        audio_tensor = dict_to_tensor(audio_features)

        return video_frames, audio_tensor, label, video_path, audio_path

    def __optimal_strategy(self, video, augment=0):
        frames = []

        q1_point = video.shape[0] // 4
        q2_point = video.shape[0] // 2
        q3_point = int((video.shape[0] * (3 / 4)))

        q1_q2_mid = int((q1_point + (q2_point - 5)) // 2)
        q2_q3_mid = int((q2_point + (q3_point - 5)) // 2)

        q1_lb = q1_point - 10
        q1_up = q1_q2_mid
        q2_lb = q2_point - 10
        q2_up = q2_q3_mid
        q3_lb = q3_point - 10

        six_frame_lower_bound = (self.selection == "six" and self.variant == "lower")
        six_frame_upper_bound = (self.selection == "six" and self.variant == "upper")
        original_strategy = (self.selection == "quartile" and self.variant == "all")

        # Helper function to process a single frame
        def process_frame(frame):
            frame = torch.permute(frame, (2, 0, 1))  # [C, H, W]
            return self.video_frame_transform[augment](frame)

        if six_frame_lower_bound or original_strategy:
            frames.append(process_frame(video[q1_lb]))

        frames.append(process_frame(video[q1_point]))

        if six_frame_upper_bound or original_strategy:
            frames.append(process_frame(video[q1_up]))

        if six_frame_lower_bound or original_strategy:
            frames.append(process_frame(video[q2_lb]))

        frames.append(process_frame(video[q2_point]))

        if six_frame_upper_bound or original_strategy:
            frames.append(process_frame(video[q2_up]))

        if six_frame_lower_bound or original_strategy:
            frames.append(process_frame(video[q3_lb]))

        frames.append(process_frame(video[q3_point]))

        if six_frame_upper_bound or original_strategy:
            frames.append(process_frame(video[-1]))

        frames = torch.stack(frames)  # Shape: [num_frames, C, H, W]
        return frames

    def __all_strategy(self, video):
        length = video.shape[0]
        frames = []

        for i, f in enumerate(video):
            if i == hyperparams["max_seq_len"]:
                break
            frame = torch.permute(f, (2, 0, 1))  # [C, H, W]
            frame = self.video_frame_transform[0](frame)
            frames.append(frame)
            del frame
            gc.collect()

        if length < hyperparams["max_seq_len"]:
            diff = hyperparams["max_seq_len"] - length
            Q2 = int(length // 2)
            for i in range(Q2, Q2 + diff):
                frame = torch.permute(video[i], (2, 0, 1))
                frame = self.video_frame_transform[0](frame)
                frames.append(frame)

        frames = torch.stack(frames)
        del video
        gc.collect()
        return frames



In [365]:
trainds = CREMADataset(train_df, video_frame_transform)
cvds = CREMADataset(cv_df, video_frame_transform)
testds = CREMADataset(test_df, video_frame_transform)

In [366]:
trainloader = DataLoader(trainds, batch_size=hyperparams["batch"], shuffle=True)
cvloader = DataLoader(cvds, batch_size=hyperparams["batch"], shuffle=False)
testloader = DataLoader(testds, batch_size=hyperparams["batch"], shuffle=False)

In [367]:
del trainds
del cvds
del testds
del train_df
del cv_df
del test_df
gc.collect()

1492

In [368]:
def view_a_loader(item, i):
    video, audio, label, video_p, audio_p = item
    show_example(video_p[i], audio_p[i], label[i].item(), label[i].item())
    print(f"Video shape: {video.shape} | Audio shape: {audio['mel'].shape}")
    print(f"{video_p[i]}")
    for f in video[i]:
        print(f[0])
        f = torch.permute(f, (1,2,0))
        plt.figure(figsize=(3, 3))
        plt.imshow(f.numpy())
        plt.show()
#     imgSpec(audio[i].squeeze())

    del item, video, audio, label, video_p, audio_p
    gc.collect()

## **The Model**

It's time to build the model

### The Video Model

In [369]:
from torchvision.models.video import r2plus1d_18

R2plus1D = r2plus1d_18(weights='KINETICS400_V1').to(device)

In [370]:
R2plus1D

VideoResNet(
  (stem): R2Plus1dStem(
    (0): Conv3d(3, 45, kernel_size=(1, 7, 7), stride=(1, 2, 2), padding=(0, 3, 3), bias=False)
    (1): BatchNorm3d(45, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace=True)
    (3): Conv3d(45, 64, kernel_size=(3, 1, 1), stride=(1, 1, 1), padding=(1, 0, 0), bias=False)
    (4): BatchNorm3d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (5): ReLU(inplace=True)
  )
  (layer1): Sequential(
    (0): BasicBlock(
      (conv1): Sequential(
        (0): Conv2Plus1D(
          (0): Conv3d(64, 144, kernel_size=(1, 3, 3), stride=(1, 1, 1), padding=(0, 1, 1), bias=False)
          (1): BatchNorm3d(144, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (2): ReLU(inplace=True)
          (3): Conv3d(144, 64, kernel_size=(3, 1, 1), stride=(1, 1, 1), padding=(1, 0, 0), bias=False)
        )
        (1): BatchNorm3d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=Tru

In [371]:
from torchvision.models.video.resnet import Conv2Plus1D

<hr>
<br>


### The Audio Model

In [372]:
class PassThrough(nn.Module):
    def __init__(self):
        super().__init__()

    def forward(self, x):
        return x

In [373]:
class Conv1D(nn.Module):
    def __init__(self):
        super().__init__()

        self.conv1 = nn.Sequential(
            nn.Conv1d(1, 128, kernel_size=3, dilation=2, bias=False),
            nn.GELU(),
            nn.Dropout(0.3),
            nn.Conv1d(128, 128, kernel_size=3, dilation=2, bias=False),
            nn.GELU(),
            nn.Dropout(0.3),
            nn.MaxPool1d(2),
            nn.GroupNorm(1, 128)
        )

        self.conv2 = nn.Sequential(
            nn.Conv1d(128, 128, kernel_size=3, dilation=2, bias=False),
            nn.GELU(),
            nn.Dropout(0.3),
            nn.Conv1d(128, 128, kernel_size=3, dilation=2, bias=False),
            nn.GELU(),
            nn.Dropout(0.3),
            nn.MaxPool1d(2),
            nn.GroupNorm(1, 128)
        )

        self.conv3 = nn.Sequential(
            nn.Conv1d(128, 128, kernel_size=3, dilation=2, bias=False),
            nn.GELU(),
            nn.Dropout(0.3),
            nn.MaxPool1d(2),
            nn.GroupNorm(1, 128)
        )



    def forward(self, x):
        out = self.conv1(x)
        out = self.conv2(out)
        out = self.conv3(out)

        return out

In [374]:
class SENet(nn.Module):
    def __init__(self, channels, reduction=16):
        super().__init__()

        self.se_net = nn.Sequential(
            nn.Linear(channels, channels // reduction),
            nn.ReLU(),
            nn.Linear(channels // reduction, channels),
            nn.Sigmoid()
        )

        self.global_pooling_bridge = nn.AdaptiveAvgPool1d(1)
        self.flatten = nn.Flatten()

    def forward(self, x):
        out = self.global_pooling_bridge(x)
#         print("shape input to se net: ", out.shape)
        out = self.flatten(out)
        out = self.se_net(out)
        return out

In [375]:
class DaggerNetV2(nn.Module):
    def __init__(self, ):
        super().__init__()

        self.cnn_encoder = Conv1D()

        self.se_net = SENet(channels=128)

        self.flatten = nn.Flatten()

        self.global_pooling = nn.AdaptiveAvgPool1d(1)


    def forward(self, x):

        out = self.cnn_encoder(x)
        residual = out

#         print("CNN out: ", out.shape)

        attn_out = self.se_net(out)

#         print("SE net out: ", attn_out.shape)

        attn_out = attn_out.unsqueeze(dim=-1)

        out_total = attn_out * residual

#         out_total = self.global_pooling(out_total)
        out_total = self.flatten(out_total)

#         print("Output: ", out_total.shape)

        return out_total

In [376]:
class AudioFeatureExtractor(nn.Module):
    def __init__(self, rnn_hidden_size, rnn_num_layers):
        super().__init__()

        self.rnn_hidden_size = rnn_hidden_size
        self.rnn_num_layers = rnn_num_layers


        self.zcr_net = DaggerNetV2()
        self.rms_net = DaggerNetV2()
        self.mfcc_net = DaggerNetV2()
        self.mel_net = DaggerNetV2()

        self.flatten = nn.Flatten()
        self.softmax = nn.Softmax(dim=1)


    def forward(self, x):
        x["mfcc"] = x["mfcc"].unsqueeze(dim=1).float()
        x["zcr"] = x["zcr"].unsqueeze(dim=1).float()
        x["mel"] = x["mel"].unsqueeze(dim=1).float()
        x["rms"] = x["rms"].unsqueeze(dim=1).float()

        out_mfcc = self.mfcc_net(x["mfcc"])
        out_mel = self.mel_net(x["mel"])

        out_zcr = self.zcr_net(x["zcr"])
        out_rms = self.rms_net(x["rms"])

        combined = torch.cat([out_mfcc, out_zcr, out_rms, out_mel], dim=1)

        return combined


    def init_hidden(self, batch):
        hidden = torch.zeros(self.rnn_num_layers*2, batch, self.rnn_hidden_size).float().to(device)
        cell = torch.zeros(self.rnn_num_layers*2, batch, self.rnn_hidden_size).float().to(device)
        return hidden, cell

<br>
<hr>
<br>

## The Multimodal Model

In [377]:
class MainMultimodal(nn.Module):
    def __init__(self, num_classes, trainable=False, fine_tune_limit=2):
        super().__init__()

        self.num_classes = num_classes

#         # define video extractor, cut off FCN layer
        self.video_extractor = R2plus1D

#         # cut off layer fcn
        self.video_extractor.fc = PassThrough()


        # define audio extractor
        self.audio_extractor = AudioFeatureExtractor(rnn_hidden_size=32, rnn_num_layers=1).to(device)

        self.fc = nn.Sequential(
            nn.LayerNorm(512 + 5632),
            nn.Dropout(0.5),
            nn.Linear(512 + 5632, num_classes, bias=True),
        )

        # init dual gpu usuage
        self.video_extractor = nn.DataParallel(self.video_extractor)
        self.audio_extractor = nn.DataParallel(self.audio_extractor)



        """
            Set the model to trainable false
        """
        if trainable is False:
            for param in self.video_extractor.parameters():
                param.requires_grad = False
        else:

            """
                Train all layers
            """
            if fine_tune_limit == 'all':
                for param in self.video_extractor.parameters():
                    param.requires_grad = True

            else:
                """
                    Set the fine tune limits
                """
                count = 0 # keep track for layer count
                length = sum(1 for _ in self.video_extractor.module.children()) # get the length of layers
                limit = length - fine_tune_limit # set the limit [if length is 7, then limit = 7-2(default) = 5 ---> if count is = or above this we set to trainable ]


                for child in self.video_extractor.module.children():
                    if count >= limit:
                        for param in child.parameters():
                            param.requires_grad = True
                    else:
                        for param in child.parameters():
                            param.requires_grad = False

                    count += 1


        self.flatten = nn.Flatten()
        self.softmax = nn.Softmax(dim=1)


    def forward(self, video, audio):

        video = torch.permute(video, (0,2,1,3,4))


        video_feature_values = self.video_extractor(video)

        audio_feature_values = self.audio_extractor(audio)


        del video, audio
        torch.cuda.empty_cache()
        gc.collect()


        combined = torch.cat([video_feature_values, audio_feature_values], dim=1)

        out_logits = self.fc(combined)
        out_softmax = self.softmax(out_logits)


        return out_logits, out_softmax

In [378]:
torch.manual_seed(42)

modelV1 = MainMultimodal(len(idx2class), trainable=True, fine_tune_limit=4).to(device)

In [379]:
# next(modelV1.parameters()).is_cuda

In [380]:
optim = torch.optim.AdamW(params=modelV1.parameters(), lr=hyperparams["lr"], betas=hyperparams["adam_betas"], weight_decay=1e-2)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optim, mode='min', factor=0.1, patience=10, threshold=0.0001, threshold_mode='rel', cooldown=0, min_lr=0, eps=1e-08)
loss_fn = nn.CrossEntropyLoss()

In [381]:
def train_step(model: torch.nn.Module, dataloader, optimizer, loss_fn, accuracy_fn=None, save_memory=False):
    train_loss = 0.0
    train_acc = 0.0

    model.train()

    try:
        wandb.watch(model, criterion=loss_fn, log="all", log_freq=10)
    except:
        print("NOT LOGGING TO WANDB")

    for batch, (videos, audios, labels, video_paths, audio_paths) in enumerate(dataloader):
        labels = labels.type(torch.LongTensor)
        videos, labels = videos.to(device), labels.to(device)

        y_logits, y_softmax = model(videos, audios)
        y_logits, y_softmax = y_logits.to(device), y_softmax.to(device)

        # print(y_logits.shape)

        optimizer.zero_grad()

        preds = y_softmax.argmax(dim=1).to(device)
        videos = videos.detach().cpu()
        # audios = audios.detach().cpu()
        del videos, audios
        torch.cuda.empty_cache()
        gc.collect()


        # print(labels.shape, preds.shape)

        loss = loss_fn(y_logits, labels)
        acc = accuracy_fn(preds, labels, num_classes=len(idx2class))
        train_loss += loss.item()
        train_acc += acc

        loss.backward()

        optimizer.step()


        if batch == 0 or batch % 1000 == 0 or batch == len(dataloader) - 1:
            sample = random.randint(1, y_logits.shape[0])-1
            print(f"Batch: #{batch} | Train Loss: {loss} | Train Accuracy: {acc}")
            show_example(video_paths[sample], audio_paths[sample], preds[sample].detach().cpu().item(), labels[sample].detach().cpu().item(), save_memory)

        del labels
        del video_paths
        del audio_paths
        preds = preds.detach().cpu()
        del preds
        y_logits = y_logits.detach().cpu()
        del y_logits
        torch.cuda.empty_cache()
        gc.collect()


    train_loss /= len(dataloader)
    train_acc /= len(dataloader)
    print(f"Total Train loss: {train_loss} | Total Train accuracy: {train_acc}")
    return train_loss, train_acc


def eval_step(model: torch.nn.Module, dataloader, loss_fn, accuracy_fn=None, save_memory=False, confusion_matrix=False):
    eval_loss = 0.0
    eval_acc = 0.0

    y_true = []
    y_preds = []

    model.eval()

    with torch.no_grad():
        for batch, (videos, audios, labels, video_paths, audio_paths) in enumerate(dataloader):
            labels = labels.type(torch.LongTensor)
            videos, labels = videos.to(device), labels.to(device)

            y_logits, y_softmax = model(videos, audios)
            y_logits, y_softmax = y_logits.to(device), y_softmax.to(device)

            preds = y_softmax.argmax(dim=1).to(device)

            if confusion_matrix:
                y_preds.extend(preds.detach().cpu().numpy())
                y_true.extend(labels.detach().cpu().numpy())

            videos = videos.detach().cpu()
            # audios = audios.detach().cpu()
            del videos, audios
            torch.cuda.empty_cache()
            gc.collect()


            loss = loss_fn(y_logits, labels)
            acc = accuracy_fn(preds, labels, num_classes=len(idx2class))
            eval_loss += loss.item()
            eval_acc += acc


            if batch == 0 or batch % 1000 == 0 or batch == len(dataloader) - 1:
                sample = random.randint(1, y_logits.shape[0])-1
                print(f"Batch: #{batch} | Eval. Loss: {loss} | Eval. Accuracy: {acc}")
                show_example(video_paths[sample], audio_paths[sample], preds[sample].detach().cpu().item(), labels[sample].detach().cpu().item(), save_memory)

            del labels
            del video_paths
            del audio_paths
            preds = preds.detach().cpu()
            del preds
            y_logits = y_logits.detach().cpu()
            del y_logits
            torch.cuda.empty_cache()
            gc.collect()


        eval_loss /= len(dataloader)
        eval_acc /= len(dataloader)

    print(f"Total Eval. Loss: {eval_loss} | Total Eval. Accuracy: {eval_acc}")

    if confusion_matrix:
        return eval_loss, eval_acc, y_true, y_preds
    else:
        return eval_loss, eval_acc

In [382]:
from tqdm.autonotebook import tqdm
import time
import datetime

In [383]:
epochs = []
train_loss_history = []
eval_loss_history = []

train_accuracy_history = []
eval_accuracy_history = []

In [384]:
best_params = {}

In [385]:
def save_checkpoint(state, filename="checkpoint.pth.tar"):
    torch.save(state, filename)

def load_checkpoint(model, optimizer, filename="checkpoint.pth.tar"):
    checkpoint = torch.load(filename)
    model.load_state_dict(checkpoint['model_state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    start_epoch = checkpoint['epoch']
    best_eval_loss = checkpoint['best_eval_loss']

    return model, optimizer, start_epoch, best_eval_loss

def load_checkpoint_wandb(model, optimizer, filename, run_path):
    with wandb.restore(filename, run_path=run_path) as io:
        name = io.name
    checkpoint = torch.load(name, map_location=torch.device(device))

    model.load_state_dict(checkpoint['model_state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    start_epoch = checkpoint['epoch']
    best_eval_loss = checkpoint['best_eval_loss']

    return model, optimizer, start_epoch, best_eval_loss

In [386]:
start_epoch = 0

In [387]:
best_eval_loss = 1e9

In [388]:
print(start_epoch, best_eval_loss)

0 1000000000.0


In [389]:
torch.manual_seed(42)

save_memory = True

with wandb.init(project="multimodal-ser", name='cremad-ablation-C'):

    wandb.define_metric("epoch")
    wandb.define_metric("train/*", step_metric="epoch")
    wandb.define_metric("val/*", step_metric="epoch")

    if save_memory:
        print("\tSave memory mode is on. Set `save_memory=False` to see video-audio examples")

    start = time.time()
    for epoch in range(start_epoch, hyperparams["epochs"]):
        print(f"========================== Starting Epoch: # {epoch} ==========================")

        inference_start = time.time()

        train_loss, train_acc = train_step(modelV1, trainloader, optim, loss_fn, multiclass_f1_score, save_memory=save_memory)
        eval_loss, eval_acc = eval_step(modelV1, cvloader, loss_fn, multiclass_f1_score, save_memory=save_memory)
        scheduler.step(eval_loss)

        wandb.log({"train/loss": train_loss, "val/loss": eval_loss, "train/f1_acc": train_acc, "val/f1_acc": eval_acc}, step=epoch)

        inference_total = time.time() - inference_start
        convert_inf = str(datetime.timedelta(seconds=inference_total))


        print(f"Epoch: #{epoch} | Total Train Loss: {train_loss} | Total Eval. Loss: {eval_loss} | Train Acc: {train_acc * 100}% | Eval Acc: {eval_acc * 100}% in {convert_inf}")


        epochs.append(epoch+1)
        train_loss_history.append(train_loss)
        eval_loss_history.append(eval_loss)
        train_accuracy_history.append(train_acc.detach().cpu()*100)
        eval_accuracy_history.append(eval_acc.detach().cpu()*100)

        if eval_loss < best_eval_loss:
            best_eval_loss = eval_loss
            # save best checkpoint
            save_checkpoint({
                'model_state_dict': modelV1.state_dict(),
                'optimizer_state_dict': optim.state_dict(),
                'epoch': epoch,
                'best_eval_loss': best_eval_loss
            }, filename="best_checkpoint.pth.tar")
            wandb.save('/kaggle/working/best_checkpoint.pth.tar')

        # save global checkpoint
        save_checkpoint({
            'model_state_dict': modelV1.state_dict(),
            'optimizer_state_dict': optim.state_dict(),
            'epoch': epoch + 1,
            'best_eval_loss': eval_loss
        }, filename="checkpoint.pth.tar")
        wandb.save('/kaggle/working/checkpoint.pth.tar')

        # after retry loop
        if 'video' not in locals():
    # fallback to a dummy tensor if video completely fails
           video = torch.zeros((1, 3, 64, 64))  # [frames, channels, H, W]
        else:
           video = torch.permute(video, (0, 3, 1, 2))


        del train_loss, eval_loss, train_acc, eval_acc
        torch.cuda.empty_cache()
        gc.collect()

	Save memory mode is on. Set `save_memory=False` to see video-audio examples
Batch: #0 | Train Loss: 1.7799670696258545 | Train Accuracy: 0.25
Predicted Label: happy
Actual Label: happy


Traceback (most recent call last):
  File "/tmp/ipython-input-1150801830.py", line 20, in <cell line: 0>
    train_loss, train_acc = train_step(modelV1, trainloader, optim, loss_fn, multiclass_f1_score, save_memory=save_memory)
                            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/tmp/ipython-input-1141561911.py", line 16, in train_step
    y_logits, y_softmax = model(videos, audios)
                          ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/torch/nn/modules/module.py", line 1773, in _wrapped_call_impl
    return self._call_impl(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/torch/nn/modules/module.py", line 1879, in _call_impl
    return inner()
           ^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/torch/nn/modules/module.py", line 1827, in inner
    result = forward_call(*args, **kwargs)
         

KeyboardInterrupt: 

In [None]:
gc.collect()

In [None]:
# epoch = hyperparams["epochs"]
epoch = len(epochs) + 1

plt.plot(epochs, train_loss_history, color='dodgerblue', label='Train Loss')
plt.plot(epochs, eval_loss_history, color='orange', label='Eval. Loss')


plt.xlabel("Epochs")
plt.ylabel("Loss Value")
plt.title(f"Train and Eval. Loss along {epoch} epochs (RAVDESS)")

plt.legend()

plt.savefig("Loss curves.png")

plt.show()

In [None]:
plt.plot(epochs, train_accuracy_history, color='dodgerblue', label='Train Accuracy')
plt.plot(epochs, eval_accuracy_history, color='orange', label='Eval. Accuracy')

plt.xlabel("Epochs")
plt.ylabel("F1 Score Value")
plt.title(f"Train and Eval. Accuracy along {epoch} epochs (RAVDESS)")

plt.legend()


plt.savefig("F1-Score curves.png")

plt.show()

In [None]:
modelV1, _, _, _ = load_checkpoint(modelV1, optim, "./best_checkpoint.pth.tar")

test_loss, test_acc, y_true, y_preds = eval_step(modelV1, testloader, loss_fn, multiclass_f1_score, save_memory=False, confusion_matrix=True)
test_acc = test_acc.detach().cpu()

print(f"Test loss: {test_loss}\tTest Accuracy: {test_acc*100}")

In [None]:
def format_margins(ax, x=0.05, y=0.05):
    xlim = ax.get_xlim()
    ylim = ax.get_ylim()

    xmargin = (xlim[1]-xlim[0])*x
    ymargin = (ylim[1]-ylim[0])*y

    ax.set_xlim(xlim[0]-xmargin,xlim[1]+xmargin)
    ax.set_ylim(ylim[0]-ymargin,ylim[1]+ymargin)

In [None]:
classes = [v for k,v in idx2class.items()]

cf_matrix = confusion_matrix(y_true, y_preds)

df_cm = pd.DataFrame(cf_matrix / np.sum(cf_matrix, axis=1)[:, None], index = [i for i in classes], columns = [i for i in classes])

plt.figure(figsize = (12,7))

s = sn.heatmap(df_cm, annot=True, cmap='Blues', fmt=".2f")

plt.xlabel('Predicted Label', fontsize=14, labelpad=20, fontweight='bold')

plt.ylabel('True Label', fontsize=14, labelpad=20, fontweight='bold')

# format_margins(s, x=0.1)

plt.savefig('confusion_matrix_crema.png', dpi=1200)

plt.show()

In [None]:
report = classification_report(y_true, y_preds, target_names=[v for k,v in idx2class.items()], output_dict=True)

df = pd.DataFrame(report).transpose()


df = df.round(decimals=4)

df.to_csv('classification_report_crema.csv')
df

In [None]:
# Save stats
with open("recorded.txt", "w") as f:
    f.write("R2plus1D & CNN+SE net attempt CREMA\n")
    for i, line in enumerate(epochs):
        f.write(f"Epoch: {line}: | Train Loss: {train_loss_history[i]} | Train Accuracy: {train_accuracy_history[i]} | Eval Loss: {eval_loss_history[i]} | Eval Accuracy: {eval_accuracy_history[i]}")
        f.write("\n")

    f.write("\n==================================================\n")
    f.write(f"On best weights => Test loss: {test_loss}\tTest Accuracy: {test_acc*100}")
    f.write("\n==================================================\n\n\n")

## **Citations**

1. Livingstone SR, Russo FA (2018) The Ryerson Audio-Visual Database of Emotional Speech and Song (RAVDESS): A dynamic, multimodal set of facial and vocal expressions in North American English. PLoS ONE 13(5): e0196391. https://doi.org/10.1371/journal.pone.0196391