In [1]:
!pip install -q transformers pydub

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.2/7.2 MB[0m [31m48.1 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m268.5/268.5 kB[0m [31m27.9 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.8/7.8 MB[0m [31m100.9 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.3/1.3 MB[0m [31m71.7 MB/s[0m eta [36m0:00:00[0m
[?25h

## Basic Import

In [2]:
import os
import pandas as pd
import numpy as np

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torchvision
import torchvision.transforms as T
import librosa
import librosa.display
import IPython.display as ipd
import matplotlib.pyplot as plt
import random

# 0. Device Agnostic Code

In [3]:
!nvidia-smi

Wed Jul  5 19:47:37 2023       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 525.85.12    Driver Version: 525.85.12    CUDA Version: 12.0     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  Tesla T4            Off  | 00000000:00:04.0 Off |                    0 |
| N/A   61C    P8    12W /  70W |      0MiB / 15360MiB |      0%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Proces

In [4]:
# Set up device agnostic code
device = "cuda" if torch.cuda.is_available() else "cpu"
device

'cuda'

# 1. Create DataFrame for the data

In [5]:
from google.colab import files
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [6]:
TESS = '/content/drive/MyDrive/tess/TESS Toronto emotional speech set data/TESS Toronto emotional speech set data/'
RAV = '/content/drive/MyDrive/ravdess/'
SAVEE = '/content/drive/MyDrive/savee/ALL/'
CREMA = '/content/drive/MyDrive/cremad/AudioWAV/'

In [7]:
# Get the data location for SAVEE
dir_list = os.listdir(SAVEE)

# parse the filename to get the emotions
emotion=[]
path = []
for i in dir_list:
    if i[-8:-6]=='_a':
        emotion.append('male_angry')
    elif i[-8:-6]=='_d':
        emotion.append('male_disgust')
    elif i[-8:-6]=='_f':
        emotion.append('male_fear')
    elif i[-8:-6]=='_h':
        emotion.append('male_happy')
    elif i[-8:-6]=='_n':
        emotion.append('male_neutral')
    elif i[-8:-6]=='sa':
        emotion.append('male_sad')
    elif i[-8:-6]=='su':
        emotion.append('male_surprise')
    else:
        emotion.append('male_error')
    path.append(SAVEE + i)

# Now check out the label count distribution
SAVEE_df = pd.DataFrame(emotion, columns = ['labels'])
SAVEE_df['source'] = 'SAVEE'
SAVEE_df = pd.concat([SAVEE_df, pd.DataFrame(path, columns = ['path'])], axis = 1)
SAVEE_df.labels.value_counts()

male_neutral     120
male_angry        60
male_happy        60
male_disgust      60
male_fear         60
male_surprise     60
male_sad          60
Name: labels, dtype: int64

In [8]:
SAVEE_df

Unnamed: 0,labels,source,path
0,male_angry,SAVEE,/content/drive/MyDrive/savee/ALL/DC_a01.wav
1,male_angry,SAVEE,/content/drive/MyDrive/savee/ALL/DC_a08.wav
2,male_angry,SAVEE,/content/drive/MyDrive/savee/ALL/DC_a09.wav
3,male_angry,SAVEE,/content/drive/MyDrive/savee/ALL/DC_a10.wav
4,male_angry,SAVEE,/content/drive/MyDrive/savee/ALL/DC_a02.wav
...,...,...,...
475,male_neutral,SAVEE,/content/drive/MyDrive/savee/ALL/KL_n02.wav
476,male_surprise,SAVEE,/content/drive/MyDrive/savee/ALL/KL_su01.wav
477,male_neutral,SAVEE,/content/drive/MyDrive/savee/ALL/KL_n27.wav
478,male_neutral,SAVEE,/content/drive/MyDrive/savee/ALL/KL_n30.wav


In [9]:
dir_list = os.listdir(RAV)
dir_list

['Actor_01',
 'Actor_02',
 'Actor_03',
 'Actor_04',
 'Actor_14',
 'Actor_13',
 'Actor_07',
 'Actor_09',
 'Actor_11',
 'Actor_08',
 'Actor_10',
 'Actor_12',
 'Actor_06',
 'Actor_05',
 'Actor_15',
 'Actor_16',
 'Actor_23',
 'Actor_17',
 'Actor_18',
 'Actor_24',
 'Actor_20',
 'Actor_21',
 'audio_speech_actors_01-24',
 'Actor_22',
 'Actor_19']

In [10]:
dir_list = os.listdir(RAV)
dir_list.sort()

emotion = []
gender = []
path = []
for directory in dir_list:
    if directory == "audio_speech_actors_01-24":
        continue
    fname = os.listdir(RAV + directory)
    for f in fname:
        part = f.split('.')[0].split('-')
        emotion.append(int(part[2]))
        temp = int(part[6])
        if temp%2 == 0:
            temp = "female"
        else:
            temp = "male"
        gender.append(temp)
        path.append(RAV + directory + '/' + f)

RAV_df = pd.DataFrame(emotion)
RAV_df = RAV_df.replace({1:'neutral', 2:'neutral', 3:'happy', 4:'sad', 5:'angry', 6:'fear', 7:'disgust', 8:'surprise'})
RAV_df = pd.concat([pd.DataFrame(gender),RAV_df],axis=1)
RAV_df.columns = ['gender','emotion']
RAV_df['labels'] =RAV_df.gender + '_' + RAV_df.emotion
RAV_df['source'] = 'RAVDESS'
RAV_df = pd.concat([RAV_df,pd.DataFrame(path, columns = ['path'])],axis=1)
RAV_df = RAV_df.drop(['gender', 'emotion'], axis=1)
RAV_df.labels.value_counts()

male_neutral       144
female_neutral     144
male_sad            96
male_happy          96
male_angry          96
male_disgust        96
male_surprise       96
male_fear           96
female_happy        96
female_sad          96
female_angry        96
female_disgust      96
female_fear         96
female_surprise     96
Name: labels, dtype: int64

In [11]:
RAV_df

Unnamed: 0,labels,source,path
0,male_neutral,RAVDESS,/content/drive/MyDrive/ravdess/Actor_01/03-01-...
1,male_neutral,RAVDESS,/content/drive/MyDrive/ravdess/Actor_01/03-01-...
2,male_sad,RAVDESS,/content/drive/MyDrive/ravdess/Actor_01/03-01-...
3,male_happy,RAVDESS,/content/drive/MyDrive/ravdess/Actor_01/03-01-...
4,male_angry,RAVDESS,/content/drive/MyDrive/ravdess/Actor_01/03-01-...
...,...,...,...
1435,female_angry,RAVDESS,/content/drive/MyDrive/ravdess/Actor_24/03-01-...
1436,female_surprise,RAVDESS,/content/drive/MyDrive/ravdess/Actor_24/03-01-...
1437,female_surprise,RAVDESS,/content/drive/MyDrive/ravdess/Actor_24/03-01-...
1438,female_surprise,RAVDESS,/content/drive/MyDrive/ravdess/Actor_24/03-01-...


In [12]:
dir_list = os.listdir(TESS)
dir_list.sort()

path = []
emotion = []

for directory in dir_list:
    if directory == "tess toronto emotional speech set data":
        continue
    fname = os.listdir(TESS + directory)

    for f in fname:
        if directory == 'OAF_angry' or directory == 'YAF_angry':
            emotion.append('female_angry')
        elif directory == 'OAF_disgust' or directory == 'YAF_disgust':
            emotion.append('female_disgust')
        elif directory == 'OAF_Fear' or directory == 'YAF_fear':
            emotion.append('female_fear')
        elif directory == 'OAF_happy' or directory == 'YAF_happy':
            emotion.append('female_happy')
        elif directory == 'OAF_neutral' or directory == 'YAF_neutral':
            emotion.append('female_neutral')
        elif directory == 'OAF_Pleasant_surprise' or directory == 'YAF_pleasant_surprised':
            emotion.append('female_surprise')
        elif directory == 'OAF_Sad' or directory == 'YAF_sad':
            emotion.append('female_sad')
        else:
            emotion.append('Unknown')
        path.append(TESS + directory + "/" + f)

TESS_df = pd.DataFrame(emotion, columns = ['labels'])
TESS_df['source'] = 'TESS'
TESS_df = pd.concat([TESS_df,pd.DataFrame(path, columns = ['path'])], axis=1)
TESS_df.labels.value_counts()

female_fear        400
female_surprise    400
female_sad         400
female_angry       400
female_disgust     400
female_happy       400
female_neutral     400
Name: labels, dtype: int64

In [13]:
TESS_df

Unnamed: 0,labels,source,path
0,female_fear,TESS,/content/drive/MyDrive/tess/TESS Toronto emoti...
1,female_fear,TESS,/content/drive/MyDrive/tess/TESS Toronto emoti...
2,female_fear,TESS,/content/drive/MyDrive/tess/TESS Toronto emoti...
3,female_fear,TESS,/content/drive/MyDrive/tess/TESS Toronto emoti...
4,female_fear,TESS,/content/drive/MyDrive/tess/TESS Toronto emoti...
...,...,...,...
2795,female_sad,TESS,/content/drive/MyDrive/tess/TESS Toronto emoti...
2796,female_sad,TESS,/content/drive/MyDrive/tess/TESS Toronto emoti...
2797,female_sad,TESS,/content/drive/MyDrive/tess/TESS Toronto emoti...
2798,female_sad,TESS,/content/drive/MyDrive/tess/TESS Toronto emoti...


In [14]:
dir_list = os.listdir(CREMA)
dir_list.sort()

gender = []
emotion = []
path = []
female = [1002,1003,1004,1006,1007,1008,1009,1010,1012,1013,1018,1020,1021,1024,1025,1028,1029,1030,1037,1043,1046,1047,1049,
          1052,1053,1054,1055,1056,1058,1060,1061,1063,1072,1073,1074,1075,1076,1078,1079,1082,1084,1089,1091]

for i in dir_list:
    part = i.split('_')
    if int(part[0]) in female:
        temp = 'female'
    else:
        temp = 'male'
    gender.append(temp)
    if part[2] == 'SAD' and temp == 'male':
        emotion.append('male_sad')
    elif part[2] == 'ANG' and temp == 'male':
        emotion.append('male_angry')
    elif part[2] == 'DIS' and temp == 'male':
        emotion.append('male_disgust')
    elif part[2] == 'FEA' and temp == 'male':
        emotion.append('male_fear')
    elif part[2] == 'HAP' and temp == 'male':
        emotion.append('male_happy')
    elif part[2] == 'NEU' and temp == 'male':
        emotion.append('male_neutral')
    elif part[2] == 'SAD' and temp == 'female':
        emotion.append('female_sad')
    elif part[2] == 'ANG' and temp == 'female':
        emotion.append('female_angry')
    elif part[2] == 'DIS' and temp == 'female':
        emotion.append('female_disgust')
    elif part[2] == 'FEA' and temp == 'female':
        emotion.append('female_fear')
    elif part[2] == 'HAP' and temp == 'female':
        emotion.append('female_happy')
    elif part[2] == 'NEU' and temp == 'female':
        emotion.append('female_neutral')
    else:
        emotion.append('Unknown')
    path.append(CREMA + i)

CREMA_df = pd.DataFrame(emotion, columns = ['labels'])
CREMA_df['source'] = 'CREMA'
CREMA_df = pd.concat([CREMA_df,pd.DataFrame(path, columns = ['path'])],axis=1)
CREMA_df.labels.value_counts()

male_angry        671
male_disgust      671
male_fear         671
male_happy        671
male_sad          671
female_angry      600
female_disgust    600
female_fear       600
female_happy      600
female_sad        600
male_neutral      575
female_neutral    512
Name: labels, dtype: int64

In [15]:
CREMA_df

Unnamed: 0,labels,source,path
0,male_angry,CREMA,/content/drive/MyDrive/cremad/AudioWAV/1001_DF...
1,male_disgust,CREMA,/content/drive/MyDrive/cremad/AudioWAV/1001_DF...
2,male_fear,CREMA,/content/drive/MyDrive/cremad/AudioWAV/1001_DF...
3,male_happy,CREMA,/content/drive/MyDrive/cremad/AudioWAV/1001_DF...
4,male_neutral,CREMA,/content/drive/MyDrive/cremad/AudioWAV/1001_DF...
...,...,...,...
7437,female_disgust,CREMA,/content/drive/MyDrive/cremad/AudioWAV/1091_WS...
7438,female_fear,CREMA,/content/drive/MyDrive/cremad/AudioWAV/1091_WS...
7439,female_happy,CREMA,/content/drive/MyDrive/cremad/AudioWAV/1091_WS...
7440,female_neutral,CREMA,/content/drive/MyDrive/cremad/AudioWAV/1091_WS...


In [16]:
df = pd.concat([SAVEE_df, RAV_df, TESS_df, CREMA_df], axis = 0)
df = df.reset_index(drop=True)
print(df.labels.value_counts())

female_happy       1096
female_sad         1096
female_angry       1096
female_disgust     1096
female_fear        1096
female_neutral     1056
male_neutral        839
male_angry          827
male_happy          827
male_disgust        827
male_fear           827
male_sad            827
female_surprise     496
male_surprise       156
Name: labels, dtype: int64


In [17]:
df

Unnamed: 0,labels,source,path
0,male_angry,SAVEE,/content/drive/MyDrive/savee/ALL/DC_a01.wav
1,male_angry,SAVEE,/content/drive/MyDrive/savee/ALL/DC_a08.wav
2,male_angry,SAVEE,/content/drive/MyDrive/savee/ALL/DC_a09.wav
3,male_angry,SAVEE,/content/drive/MyDrive/savee/ALL/DC_a10.wav
4,male_angry,SAVEE,/content/drive/MyDrive/savee/ALL/DC_a02.wav
...,...,...,...
12157,female_disgust,CREMA,/content/drive/MyDrive/cremad/AudioWAV/1091_WS...
12158,female_fear,CREMA,/content/drive/MyDrive/cremad/AudioWAV/1091_WS...
12159,female_happy,CREMA,/content/drive/MyDrive/cremad/AudioWAV/1091_WS...
12160,female_neutral,CREMA,/content/drive/MyDrive/cremad/AudioWAV/1091_WS...


## 1.2 Convert Label to classes

In [18]:
labels = ['female_angry', 'female_disgust', 'female_fear', 'female_happy',
 'female_neutral', 'female_sad', 'female_surprise', 'male_angry',
 'male_disgust', 'male_fear', 'male_happy', 'male_neutral', 'male_sad',
 'male_surprise']

In [19]:
def label_to_class(label, labels_list):
    if label in labels_list:
        return labels_list.index(label)

In [20]:
df['label_class'] = df['labels'].apply(lambda x: label_to_class(x, labels))

In [21]:
df.label_class.value_counts()

3     1096
5     1096
0     1096
1     1096
2     1096
4     1056
11     839
7      827
10     827
8      827
9      827
12     827
6      496
13     156
Name: label_class, dtype: int64

## 1.3 Calculate duration of each clips

In [22]:
from pydub import AudioSegment

def calculate_duration(audio_file_path):
    audio = AudioSegment.from_file(audio_file_path)

    # Calculate duration in seconds
    duration = audio.duration_seconds
    return duration

In [None]:
df['duration'] = df['path'].apply(lambda x: calculate_duration(x))

## 2. Data Visualization

In [None]:
# import pandas as pd
# df = pd.read_csv("/content/drive/MyDrive/Kaggle_emotional_data_path.csv", index_col=0)
df

## 2.1 Shuffle the datasets

In [None]:
from sklearn.utils import shuffle

df = shuffle(df)
df

In [None]:
df[df['duration']>4]

## 2.2 Drop all long clips (audio > 4 sec)

Since we got over 10k data, we can cut some long clips off to save some computational power

In [None]:
df = df.drop(df[df.duration > 4].index)

In [None]:
df.sort_values(by="duration")

## 2.3 Create functions for visualize data

In [None]:
def load_audio(AUDIO_PATH):
    audio, sr = librosa.load(AUDIO_PATH, sr=44100)
    return audio, sr

def wav2melSpec(AUDIO_PATH):
    audio, sr = librosa.load(AUDIO_PATH)
    return librosa.feature.melspectrogram(y=audio, sr=sr)

def imgSpec(ms_feature):
    fig, ax = plt.subplots()
    ms_dB = librosa.power_to_db(ms_feature, ref=np.max)
    print(ms_feature.shape)
    img = librosa.display.specshow(ms_dB, x_axis='time', y_axis='mel', ax=ax)
    fig.colorbar(img, ax=ax, format='%+2.0f dB')
    ax.set(title='Mel-frequency spectrogram');

def hear_audio(AUDIO_PATH):
    audio, sr = librosa.load(AUDIO_PATH)

    print("\t", end="")
    ipd.display(ipd.Audio(data=audio, rate=sr))


def get_audio_info(path, show_melspec=False, label=None):
    spec = wav2melSpec(path)
    if label is not None:
        print("Label:", label)
    if show_melspec is not False:
        imgSpec(spec)
    hear_audio(path)

In [None]:
get_audio_info('/content/drive/MyDrive/savee/ALL/JK_f11.wav', show_melspec=True)

# 3. Create DataLoader

In [None]:
from torchaudio import transforms

class AudioDataset(Dataset):
    def __init__(self, df, data_col, label_col, max_length=4*16000, new_sr=16000, train=True, train_size=0.80):
        self.file_path_list = df[data_col].tolist()
        self.label_list = df[label_col].tolist()
        self.max_length = max_length
        self.new_sr = new_sr

        total_len = len(self.file_path_list)

        if train:
            self.file_path_list, self.label_list = self.file_path_list[:int(train_size * total_len)], self.label_list[:int(train_size * total_len)]
        else:
            self.file_path_list, self.label_list = self.file_path_list[int(train_size * total_len):], self.label_list[int(train_size * total_len):]

    def __len__(self):
        return len(self.file_path_list)

    def __getitem__(self, idx):
        audio, sample_rate = librosa.load(self.file_path_list[idx])
        if sample_rate != self.new_sr:
            audio = librosa.resample(audio, orig_sr=sample_rate, target_sr=self.new_sr)
        label = self.label_list[idx]

        # Pad or trim the audio signal to the desired length
        # Pad the audio tensor with zeros to a fixed length of 160000
        desired_length = self.max_length
        if len(audio) < desired_length:
            padding = desired_length - len(audio)
            audio = np.pad(audio, (0, padding), 'constant')

        return audio, label, self.file_path_list[idx]


In [None]:
BATCH_SIZE = 16

train_dataset = AudioDataset(df, 'path', 'label_class', train=True)
train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)

test_dataset = AudioDataset(df, 'path', 'label_class', train=False)
test_dataloader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=True)

print(len(train_dataset), len(test_dataset))

In [None]:
# # Visualize the shape of the data in dataloader
# max_length = 0
# for batch_idx, (audio_batch, label_batch, file_path) in enumerate(train_dataset):
#     print(audio_batch.shape, label_batch, file_path)
#     if audio_batch.shape[0] > max_length:
#         max_length = audio_batch.shape[0]
# print(f"max length = {max_length}")

# 4. Model Building

## 4.1 HuBERT from Transformers (huggingface)
Assume that the audio is represented as an **numpy array** with a shape of (time_frames, )

- output_size of Wav2Vec2FeatureExtractor = (batch_size, num_channels, time_frames)
  - Remove num_channels before go into the hubert model

- output_size of hubert = (batch_size, time_frames, frequency_bins).



***Remarks:*** The number of channels is 1 because audio typically only has one channel.

#### Model Architecture

In [None]:
import torch
import torchaudio
from transformers import HubertModel, Wav2Vec2FeatureExtractor

# Create feature extractor
model_id = "facebook/hubert-base-ls960"
feature_extractor = Wav2Vec2FeatureExtractor.from_pretrained(model_id)

# Load the Hubert model and tokenizer
hubert_base = HubertModel.from_pretrained(model_id)


class HubertAudioModel(torch.nn.Module):
    def __init__(self, hubert_model=hubert_base):
        super().__init__()
        self.hubert = hubert_model
        self.fc1 = torch.nn.Linear(199*768, 256)
        self.fc2 = torch.nn.Linear(256, 14)

    def forward(self, audio_array):
        # Resample the audio to the required sample rate (16kHz for Hubert)
        # audio_array = librosa.load(audio_file, sr=16000, mono=False)[0]
        # print(f"audio_array shape before Wav2Vec: {audio_array.shape}")
        input = feature_extractor(audio_array,
                           sampling_rate=16000,
                           padding=True,
                           return_tensors='pt').to(device)

        # print(f"input.input_values shape after Wav2Vec: {input.input_values.shape}")

        input = input.input_values.squeeze(dim=0)
        # print(f"input shape after squeeze: {input.shape}")

        # Pass the spectrogram through the Hubert model
        output = self.hubert(input)
        # print(f"output.last_hidden_state shape after hubert: {output.last_hidden_state.shape}")

        # Flatten the output of the Hubert model
        output = torch.flatten(output.last_hidden_state, start_dim=1)

        # print(f"output shape after flatten: {output.shape}")

        # Pass the flattened output through two dense layers
        output = torch.nn.functional.relu(self.fc1(output))
        output = self.fc2(output)

        return output


In [None]:
model = HubertAudioModel().to(device)

next(model.parameters()).device

# 5. Setup Loss function, Optimizers & Metrics: Accuracy function

In [None]:
# Loss function
loss_fn = nn.CrossEntropyLoss() # Multi-category loss

# Create an optimizer
optimizer = torch.optim.SGD(params=model.parameters(), lr=0.005)

In [None]:
# Calculate accuracy (a classification metric)
def accuracy_fn(y_true, y_pred):
    correct = torch.eq(y_true, y_pred).sum().item() # torch.eq() calculates where two tensors are equal
    acc = (correct / len(y_pred)) * 100
    return acc

# 6. Create Training & Testing Loop

In [None]:
def train_step(model: torch.nn.Module, data_loader: torch.utils.data.DataLoader,
               loss_fn: torch.nn.Module, optimizer: torch.optim.Optimizer, accuracy_fn,
               device: torch.device = device):

    train_loss, train_acc = 0, 0

    # Put model into training mode
    model.train()

    # Add a loop to loop through training batches
    for batch, (X, y, file_path) in enumerate(data_loader):
        # Put data on target device
        X, y = X.to(device), y.to(device)

        # 1. Forward pass
        y_logits = model(X).to(device)
        y_pred = torch.softmax(y_logits, dim=1).argmax(dim=1).to(device)

        # 2.1 Calculate loss (per batch)
        loss = loss_fn(y_logits, y)
        train_loss += loss # accumulatively add up the loss per epoch

        # 2.2 Calculate accuracy (per batch)
        acc = accuracy_fn(y_true=y, y_pred=y_pred)
        train_acc += acc

        # 3. Optimizer zero grad
        optimizer.zero_grad()

        # 4. Loss backward
        loss.backward()

        # 5. Optimizer step
        optimizer.step()

        if batch % 50 == 0:
            sample = random.randint(0, BATCH_SIZE-2)
            print(f"\tBatch {batch}: Train loss: {loss:.5f} | Train accuracy : {acc:.2f}%")
            get_audio_info(file_path[sample], label=y_pred[sample].item())
            print("----------------------------------------")

    # Divide total train loss and accuracy by length of train dataloader (average loss per batch per epoch)
    train_loss /= len(data_loader)
    train_acc /= len(data_loader)


    print(f"Train loss: {train_loss:.5f} | Train acc: {train_acc:.2f}%\n")

In [None]:
def test_step(model: torch.nn.Module, data_loader: torch.utils.data.DataLoader,
              loss_fn: torch.nn.Module, accuracy_fn, device: torch.device = device):
    ### Testing
    # Setup variables for accumulatively adding up loss and accuracy
    test_loss, test_acc = 0, 0

    # Put the model in eval mode
    model.eval()

    # Turn on inference mode context manager
    with torch.inference_mode():
        for batch, (X, y, file_path) in enumerate(data_loader):
            # Send the data to the target device
            X, y = X.to(device), y.to(device)

            # 1. Forward pass
            test_logits = model(X)
            test_pred = torch.softmax(test_logits, dim=1).argmax(dim=1).to(device)

            # 2. Calculate loss / acc (accumatively)
            loss = loss_fn(test_logits, y)
            test_loss += loss

            # 3. Calculate accuracy (preds need to be same as y_true)
            acc = accuracy_fn(y_true=y, y_pred=test_pred)
            test_acc += acc

            if batch % 50 == 0:
                sample = random.randint(0, BATCH_SIZE-2)
                print(f"\tBatch {batch}: Test loss: {test_loss:.5f} | Test accuracy : {acc:.2f}%")
                get_audio_info(file_path[sample], label=test_pred[sample].item())
                print("----------------------------------------")


        # Calculations on test metrics need to happen inside torch.inference_mode()
        # Divide total test loss / accuracy by length of test dataloader (per batch)
        test_loss /= len(data_loader)
        test_acc /= len(data_loader)

        print(f"Test loss: {test_loss:.5f}, Test acc: {test_acc:.2f}%\n")


# 7. Train the model

In [None]:
from tqdm.auto import tqdm

# Set the seed
# torch.manual_seed(42)

# Set epochs
epochs = 3

# Create a optimization and evaluation loop using train_step() and test_step()
for epoch in tqdm(range(epochs)):
    print(f"Epoch: {epoch}\n-------")
    train_step(model=model, data_loader=train_dataloader, loss_fn=loss_fn,
               optimizer=optimizer, accuracy_fn=accuracy_fn, device=device)

    test_step(model=model, data_loader=test_dataloader, loss_fn=loss_fn,
              accuracy_fn=accuracy_fn, device=device)


# 8. Evaluate the model

In [None]:
torch.manual_seed(42)
def eval_model(model: torch.nn.Module,
               data_loader: torch.utils.data.DataLoader,
               loss_fn: torch.nn.Module,
               accuracy_fn, device=device):
    """Return a dictionary containing the results of model predicting on data_loader"""
    loss, acc = 0, 0
    model.eval()
    with torch.inference_mode():
        for X, y, file_path in data_loader:
            # Put data on target device
            X, y = X.to(device), y.to(device)
            # Make predictions
            y_logits = model(X)
            y_pred = torch.softmax(y_logits, dim=1).argmax(dim=1).to(device)

            # Accumulate the loss and acc values per batch
            loss += loss_fn(y_logits, y)
            acc += accuracy_fn(y_pred=y_pred, y_true=y)

        # Scale loss and acc to find the average loss/acc per batch
        loss /= len(data_loader)
        acc /= len(data_loader)

    return {"model_name": model.__class__.__name__,
            "model_loss": loss.item(),
            "model_acc": acc}

In [None]:
model_result = eval_model(model, test_dataloader, loss_fn, accuracy_fn)

In [None]:
model_result

# 9. Make and Evaluate random predictions

In [None]:
def resample(sample, sample_rate, new_sample_rate):
      return librosa.resample(sample, orig_sr=sample_rate, target_sr=16000)

def pad(sample, desired_length=4*16000):
    # Pad the audio tensor with zeros to a fixed length of 16000*8s
    if len(sample) < desired_length:
        padding = desired_length - len(sample)
        sample = np.pad(sample, (0, padding), 'constant')
    elif len(sample) > desired_length:
        sample = sample[:desired_length]
    return sample

In [None]:
def make_predictions(model, data, device=device):
    pred_probs = []
    model.eval()
    with torch.inference_mode():
        for sample in data:
            # Prepare the sample (add a batch dimension and pass to target device)
            sample = torch.unsqueeze(sample, dim=0).to(device)

            # Forward pass (model outputs raw logits)
            pred_logits = model(sample)

            # Get prediction probability (logit -> prediction probability)
            pred_prob = torch.softmax(pred_logits.squeeze(), dim=0)

            # Get pred_probs off the GPU for further calculations
            pred_probs.append(pred_prob.cpu())

    # Stack the pred_probs to turn list into a tensor
    return torch.stack(pred_probs)

In [None]:
import random
random.seed(42)

test_data = list(zip(df.path, df.label_class))
test_samples = []
test_labels = []

# Pick k=9 samples randomly
for sample_path, label in random.sample(test_data, k=9):
    sample, sample_rate = load_audio(sample_path)
    if sample_rate != 16000:
        sample = resample(sample, sample_rate, 16000)
    sample = pad(sample)
    sample_tensor = torch.from_numpy(sample)
    test_samples.append(sample_tensor)
    test_labels.append(label)

# View the first sample shape
test_samples[0].shape

In [None]:
pred_probs = make_predictions(model=model,
                              data=test_samples)
pred_probs

In [None]:
test_labels

In [None]:
pred_classes = pred_probs.argmax(dim=1)
pred_classes

In [None]:
# Plot predictions
plt.figure(figsize=(9,9))
nrows = 3
ncols = 3

class_names = ['female_angry', 'female_disgust', 'female_fear', 'female_happy',
 'female_neutral', 'female_sad', 'female_surprise', 'male_angry',
 'male_disgust', 'male_fear', 'male_happy', 'male_neutral', 'male_sad',
 'male_surprise']

for i, sample in enumerate(test_samples):
    # Create subplot
    plt.subplot(nrows, ncols, i+1)

    # Plot the audio
    plt.plot(sample)

    # Find the prediction (in text form)
    pred_label = class_names[pred_classes[i]]

    # Get the true label (in text form)
    true_label = class_names[test_labels[i]]

    # Create the title of the plot
    title = f"Pred: {pred_label} | Truth: {true_label}"

    # # Display the target audio
    # ipd.display(ipd.Audio(data=sample, rate=16000))

    # Check for equality between pred and true labels and change color of title text
    if pred_label == true_label:
        plt.title(title, fontsize=10, c="g")
    else:
        plt.title(title, fontsize=10, c="r")

    # Adjust subplot
    plt.subplots_adjust(left=0.1,
                        bottom=0.1,
                        right=1.0,
                        top=1.0,
                        wspace=0.5,
                        hspace=0.5)

    plt.axis(False)

## 9. Confusion Matrix

In [None]:
!pip install -q torchmetrics -U mlxtend

In [None]:
def make_predictions(model, data_loader, device=device):
    y_preds = []
    y_labels =[]
    model.eval()
    with torch.inference_mode():
        for X, y, file_path in tqdm(data_loader, desc="Making predictions..."):
            # Prepare the sample (add a batch dimension and pass to target device)
            X, y = X.to(device), y.to(device)

            # Forward pass (model outputs raw logits)
            y_logits = model(X)

            # Get prediction from logit -> prediction probability -> prediction labels
            # dimension = 1: each row sum up => 1
            y_pred = torch.softmax(y_logits, dim=1).argmax(dim=1).to(device)

            # Get pred_probs off the GPU for further calculations
            y_preds.append(y_pred.cpu())
            y_labels.append(y.cpu())

    # Stack the pred_probs to turn list into a tensor
    y_pred_tensor = torch.cat(y_preds)
    y_label_tensor = torch.cat(y_labels)
    return y_pred_tensor, y_label_tensor

In [None]:
y_pred_tensor, y_label_tensor = make_predictions(model, test_dataloader)
y_pred_tensor[:10]

In [None]:
y_label_tensor[:10]

In [None]:
from torchmetrics import ConfusionMatrix
from mlxtend.plotting import plot_confusion_matrix
import numpy as np

class_names = ['female_angry', 'female_disgust', 'female_fear', 'female_happy',
               'female_neutral', 'female_sad', 'female_surprise', 'male_angry',
               'male_disgust', 'male_fear', 'male_happy', 'male_neutral',
               'male_sad', 'male_surprise']

# default threshold = 0.5
conf_mat = ConfusionMatrix(task='multiclass', num_classes=14)
conf_mat_tensor = conf_mat(preds=y_pred_tensor, target=y_label_tensor)

# Calculate row-wise sums
row_sums = np.sum(conf_mat_tensor.numpy(), axis=1)

# Normalize the confusion matrix
conf_mat_tensor = conf_mat_tensor.numpy() / row_sums[:, np.newaxis]

fig, ax = plot_confusion_matrix(
    conf_mat=conf_mat_tensor,
    class_names=class_names,
    figsize=(10, 7),
    show_absolute=False,  # Show relative values instead of absolute
    show_normed=True  # Show values as percentages
)
