<a href="https://colab.research.google.com/github/dr-antimonious/GRU-Emotion-Classification/blob/main/RUSU_testiranje.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
##### IMPORTI, KONSTANTE, UTILITIES

%%capture

import os
import gc
import sys
import shutil
import math
import numpy as np
import tarfile
import librosa
import transformers
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm
from scipy.io import loadmat
from sklearn.preprocessing import OneHotEncoder
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import pandas as pd
from random import sample
import seaborn

##### PyTorch

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torch.utils.tensorboard import SummaryWriter
from torch.nn.utils.rnn import pad_sequence

import torchvision
from torchvision import transforms, utils

!pip install torchinfo
from torchinfo import summary

%matplotlib inline
%load_ext tensorboard

DRIVE =         '/content/drive/MyDrive/LSSED/'   # KONSTANTA ZA DRIVE FOLDER GDJE JE POHRANJENO SVE ZA PROJEKT
TEST_METADATA = DRIVE + 'test_metadata.csv'       # KONSTANTA ZA TESTING DATA FILE
SAMPLING_RATE = 16000                             # KONSTANTA ZA SAMPLING RATE SNIMAKA
BATCH_SIZE =    1                                 # KONSTANTA ZA VELIČINU BATCHA
DEVICE =        torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

WAV2VEC2_NAME =     'facebook/wav2vec2-large-xlsr-53'
FEATURE_EXTRACTOR = transformers.Wav2Vec2FeatureExtractor.from_pretrained(WAV2VEC2_NAME)
WAV2VEC2_MODEL =    transformers.Wav2Vec2Model.from_pretrained(WAV2VEC2_NAME).to(DEVICE)

In [None]:
%%capture

from google.colab import drive
drive.mount('/content/drive')
shutil.copy('/content/drive/MyDrive/kaggle.json', '/content/kaggle.json')

!mkdir ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json
!kaggle datasets download -d dmitrybabko/speech-emotion-recognition-en
!unzip speech-emotion-recognition-en.zip -d speech-emotion-recognition-en

In [None]:
CREMA_DIR = '/content/speech-emotion-recognition-en/Crema/'
RAVDESS_DIR = '/content/speech-emotion-recognition-en/Ravdess/audio_speech_actors_01-24/'
SAVEE_DIR = '/content/speech-emotion-recognition-en/Savee/'
TESS_DIR = '/content/speech-emotion-recognition-en/Tess/'

RAVDESS_ACT_DIRS = os.listdir(RAVDESS_DIR)
TESS_ACT_DIRS = os.listdir(TESS_DIR)

NEW_DIR = '/content/speech-emotion-recognition-en/'

file_index = 0

for f in tqdm(os.listdir(CREMA_DIR)):
  emotion = f.split('_')[2]

  match emotion:
    case 'SAD':
      new_emotion = 'sadness'
    case 'ANG':
      new_emotion = 'anger'
    case 'HAP':
      new_emotion = 'happiness'
    case 'NEU':
      new_emotion = 'neutral'
    case 'DIS':
      new_emotion = 'disgust'
    case _ :
      continue

  new_name = new_emotion + '_' + str(file_index) + '.wav'
  os.rename(CREMA_DIR + f, NEW_DIR + new_name)
  file_index = file_index + 1

for d in tqdm(RAVDESS_ACT_DIRS):
  for f in os.listdir(RAVDESS_DIR + d):
    emotion = f.split('-')[2]

    match emotion:
      case '01':
        new_emotion = 'neutral'
      case '03':
        new_emotion = 'happiness'
      case '04':
        new_emotion = 'sadness'
      case '05':
        new_emotion = 'anger'
      case '07':
        new_emotion = 'disgust'
      case _ :
        continue

    new_name = new_emotion + '_' + str(file_index) + '.wav'
    os.rename(RAVDESS_DIR + d + '/' + f, NEW_DIR + new_name)
    file_index = file_index + 1

for d in tqdm(TESS_ACT_DIRS):
  for f in os.listdir(TESS_DIR + d):
    emotion = f.split('_')[2].split('.')[0]

    match emotion:
      case 'sad':
        new_emotion = 'sadness'
      case 'angry':
        new_emotion = 'anger'
      case 'disgust':
        new_emotion = 'disgust'
      case 'happy':
        new_emotion = 'happiness'
      case 'neutral':
        new_emotion = 'neutral'
      case _ :
        continue

    new_name = new_emotion + '_' + str(file_index) + '.wav'
    os.rename(TESS_DIR + d + '/' + f, NEW_DIR + new_name)
    file_index = file_index + 1

for f in tqdm(os.listdir(SAVEE_DIR)):
  emotion = f.split('_')[1].split('.')[0]

  match emotion[0]:
    case 'a':
      new_emotion = 'anger'
    case 'd':
      new_emotion = 'disgust'
    case 'h':
      new_emotion = 'happiness'
    case 'n':
      new_emotion = 'neutral'
    case 's':
      match emotion[1]:
        case 'a':
          new_emotion = 'sadness'
        case _ :
          continue
    case _ :
      continue

  new_name = new_emotion + '_' + str(file_index) + '.wav'
  os.rename(SAVEE_DIR + f, NEW_DIR + new_name)
  file_index = file_index + 1

In [None]:
for f in tqdm(os.listdir(CREMA_DIR)):
  os.remove(CREMA_DIR + f)
os.rmdir(CREMA_DIR)

for d in tqdm(RAVDESS_ACT_DIRS):
  for f in os.listdir(RAVDESS_DIR + d):
    os.remove(RAVDESS_DIR + d + '/' + f)
  os.rmdir(RAVDESS_DIR + d)
os.rmdir(RAVDESS_DIR)
os.rmdir('/content/speech-emotion-recognition-en/Ravdess')

for d in tqdm(TESS_ACT_DIRS):
  for f in os.listdir(TESS_DIR + d):
    os.remove(TESS_DIR + d + '/' + f)
  os.rmdir(TESS_DIR + d)
os.rmdir(TESS_DIR)

for f in tqdm(os.listdir(SAVEE_DIR)):
  os.remove(SAVEE_DIR + f)
os.rmdir(SAVEE_DIR)

In [None]:
f = os.listdir(NEW_DIR)

anger = [fi for fi in f if fi.split('_')[0] == 'anger']
sadness = [fi for fi in f if fi.split('_')[0] == 'sadness']
disgust = [fi for fi in f if fi.split('_')[0] == 'disgust']
happiness = [fi for fi in f if fi.split('_')[0] == 'happiness']
neutral = [fi for fi in f if fi.split('_')[0] == 'neutral']

emotions = [anger, sadness, disgust, happiness, neutral]
emotion_names = ['anger', 'sadness', 'disgust', 'happiness', 'neutral']

try:
  os.mkdir(NEW_DIR + 'test/')
except Exception as ex:
  print(ex)

test = pd.DataFrame(columns = ['File', 'Emotion'])

for i in range(emotions.__len__()):
  test_temp = pd.DataFrame({'File': emotions[i], 'Emotion': np.repeat(emotion_names[i], emotions[i].__len__())})
  test = pd.concat([test, test_temp]).reset_index().drop('index', axis = 1)

print(test)

In [None]:
for te in tqdm(test['File'].to_list()):
  os.rename(NEW_DIR + te, NEW_DIR + 'test/' + te)

In [None]:
test['Emotion'] = test['Emotion'].astype('category')

encoded_test =  pd.get_dummies(test['Emotion'])

test = pd.concat([test, encoded_test], axis = 1)

In [None]:
print(test)

In [None]:
class Speech_Dataset(Dataset):
  """Speech recordings dataset."""

  def __init__(self, metadata, directory, transform = None):
    """
    Arguments:
        metadata (DataFrame):             Pandas DataFrame containing dataset information.
        directory (string):               Path to the directory with the feature array files.
        transform (class | list | None):  Data transformation options.
    """
    self.metadata = metadata
    self.directory = directory
    self.transform = transform

  def __len__(self):
    return len(self.metadata)

  def __getitem__(self, idx):
    if torch.is_tensor(idx):
      idx = idx.tolist()

    path = self.directory + self.metadata['File'][idx]
    emotion = [self.metadata['anger'][idx], self.metadata['happiness'][idx], self.metadata['neutral'][idx], self.metadata['sadness'][idx], self.metadata['disgust'][idx]]

    if self.transform:
      recording = self.transform(recording)

    sample = {'path': path, 'emotion': emotion}

    return sample

In [None]:
test_dataset = Speech_Dataset(test, NEW_DIR + 'test/')

In [None]:
def collate_fn(batch):

  def load_recording(path):
    recording = librosa.load(path, sr = SAMPLING_RATE)[0]
    return recording

  data = load_recording(batch[0]['path'])
  target = batch[0]['emotion']

  target = torch.as_tensor(target)

  return (data, target)

In [None]:
test_dataloader = DataLoader(test_dataset, BATCH_SIZE, False, num_workers = 12, collate_fn = collate_fn)

In [None]:
##### DEFINIRANJE KLASIFIKATORA

class EmotionClassifier(nn.Module):
  def __init__(self):
    super().__init__()

    self.norm = nn.LayerNorm(normalized_shape = 1024)
    self.rnn1 = nn.LSTM(input_size = 1024, hidden_size = 1024, num_layers = 3, batch_first = True, bidirectional = False)
    self.linear1 = nn.Linear(1024, 5)

  def forward(self, x, length):
    out = self.norm(x)
    out, _ = self.rnn1(out)

    # Many-to-one RNN mod
    try:
      _ = out.shape[2]
      indices = [i for i in range(out.shape[0])]
      out = out[indices, np.subtract(length, 1), :]
    except:
      out = out[np.subtract(length, 1), :]

    out = self.linear1(out)
    return out

In [None]:
##### DEFINIRANJE CIJELOG MODELA

class Emotioner(nn.Module):
  def __init__(self, feature_extractor, wav2vec2_model, emotion_classifier, sampling_rate = 16000):
    super().__init__()
    self.feature_extractor = feature_extractor
    self.wav2vec2_model = wav2vec2_model
    self.emotion_classifier = emotion_classifier
    self.sampling_rate = sampling_rate

  def set_feature_extractor(self, feature_extractor):
    self.feature_extractor = feature_extractor
    return self

  def set_wav2vec2_model(self, wav2vec2_model):
    self.wav2vec2_model = wav2vec2_model
    return self

  def set_emotion_classifier(self, emotion_classifier):
    self.emotion_classifier = emotion_classifier
    return self

  def set_sampling_rate(self, sampling_rate):
    self.sampling_rate = sampling_rate
    return self

  def extract_features(self, wav_array, sampling_rate):
    wavs_token = self.feature_extractor([wav_array], sampling_rate = sampling_rate, padding = True, do_normalize = True, return_tensors = 'pt').to(DEVICE)
    outputs = self.wav2vec2_model(**wavs_token, output_hidden_states = True)
    w2vlastfeat = outputs['last_hidden_state'].squeeze().detach().cpu().numpy()
    feature_array = torch.FloatTensor(w2vlastfeat).to(DEVICE)
    return feature_array

  def forward(self, wav_array):
    features = self.extract_features(wav_array, self.sampling_rate)
    output = self.emotion_classifier(features, features.shape[0])
    _, pred_label = torch.max(output.data, dim = 0)
    return (output, pred_label)

In [None]:
##### RNN TESTING

y_pred = [[],[],[],[],[],[],[],[],[],[],[],[]]
y_true = []

true_preds, num_preds, epoch_acc = np.zeros(12), np.zeros(12), np.zeros(12)
true_zeros, true_ones, true_twos, true_threes, true_fours = np.zeros(12), np.zeros(12), np.zeros(12), np.zeros(12), np.zeros(12) # TP
false_zeros, false_ones, false_twos, false_threes, false_fours = np.zeros(12), np.zeros(12), np.zeros(12), np.zeros(12), np.zeros(12) # FP
missed_zeros, missed_ones, missed_twos, missed_threes, missed_fours = np.zeros(12), np.zeros(12), np.zeros(12), np.zeros(12), np.zeros(12) # FN

prec_zeros, prec_ones, prec_twos, prec_threes, prec_fours = np.zeros(12), np.zeros(12), np.zeros(12), np.zeros(12), np.zeros(12)
rec_zeros, rec_ones, rec_twos, rec_threes, rec_fours = np.zeros(12), np.zeros(12), np.zeros(12), np.zeros(12), np.zeros(12)
f1_zeros, f1_ones, f1_twos, f1_threes, f1_fours = np.zeros(12), np.zeros(12), np.zeros(12), np.zeros(12), np.zeros(12)

epoch_rec, epoch_prec, epoch_f1 = np.zeros(12), np.zeros(12), np.zeros(12)

In [None]:
def test_mod(models, test_data_loader):
  global y_pred, y_true, true_preds, num_preds, epoch_acc, true_zeros, true_ones, true_twos, true_threes, true_fours, false_zeros, false_ones, false_twos, false_threes, false_fours, \
  missed_zeros, missed_ones, missed_twos, missed_threes, missed_fours, prec_zeros, prec_ones, prec_twos, prec_threes, prec_fours, rec_zeros, rec_ones, rec_twos, rec_threes, rec_fours, \
  f1_zeros, f1_ones, f1_twos, f1_threes, f1_fours, epoch_rec, epoch_prec, epoch_f1

  for data_inputs, labels in tqdm(test_data_loader):

    ##### Moving data to device
    _, labels = torch.max(labels.data, dim = 0)
    y_true = np.append(y_true, labels)

    for i in range(models.__len__()):
      output, pred_labels = models[i](data_inputs)

      true_preds[i] += (pred_labels == labels).sum().item()

      true_zeros[i] +=   torch.sum((pred_labels == 0) & (labels == 0)).item()
      true_ones[i] +=    torch.sum((pred_labels == 1) & (labels == 1)).item()
      true_twos[i] +=    torch.sum((pred_labels == 2) & (labels == 2)).item()
      true_threes[i] +=  torch.sum((pred_labels == 3) & (labels == 3)).item()
      true_fours[i] +=   torch.sum((pred_labels == 4) & (labels == 4)).item()

      false_zeros[i] +=  torch.sum((pred_labels == 0) & (labels != 0)).item()
      false_ones[i] +=   torch.sum((pred_labels == 1) & (labels != 1)).item()
      false_twos[i] +=   torch.sum((pred_labels == 2) & (labels != 2)).item()
      false_threes[i] += torch.sum((pred_labels == 3) & (labels != 3)).item()
      false_fours[i] +=  torch.sum((pred_labels == 4) & (labels != 4)).item()

      missed_zeros[i] +=   torch.sum((pred_labels != 0) & (labels == 0)).item()
      missed_ones[i] +=    torch.sum((pred_labels != 1) & (labels == 1)).item()
      missed_twos[i] +=    torch.sum((pred_labels != 2) & (labels == 2)).item()
      missed_threes[i] +=  torch.sum((pred_labels != 3) & (labels == 3)).item()
      missed_fours[i] +=   torch.sum((pred_labels != 4) & (labels == 4)).item()

      num_preds[i] += 1
      y_pred[i] = np.append(y_pred[i], pred_labels.cpu().numpy())

      del output, pred_labels

    ##### Cleaning up
    del data_inputs, labels

  ##### Metrics
  for i in range(models.__len__()):
    epoch_acc[i] =   true_preds[i] / num_preds[i]

    zeros_weight =  1923. / len(test_data_loader.dataset)
    ones_weight =   1923. / len(test_data_loader.dataset)
    twos_weight =   1703. / len(test_data_loader.dataset)
    threes_weight = 1923. / len(test_data_loader.dataset)
    fours_weight =  1923. / len(test_data_loader.dataset)

    prec_zeros[i] =  true_zeros[i] / (true_zeros[i] + false_zeros[i] + 1e-10)
    prec_ones[i] =   true_ones[i] / (true_ones[i] + false_ones[i] + 1e-10)
    prec_twos[i] =   true_twos[i] / (true_twos[i] + false_twos[i] + 1e-10)
    prec_threes[i] = true_threes[i] / (true_threes[i] + false_threes[i] + 1e-10)
    prec_fours[i] =  true_fours[i] / (true_fours[i] + false_fours[i] + 1e-10)

    rec_zeros[i] =   true_zeros[i] / (true_zeros[i] + missed_zeros[i] + 1e-10)
    rec_ones[i] =    true_ones[i] / (true_ones[i] + missed_ones[i] + 1e-10)
    rec_twos[i] =    true_twos[i] / (true_twos[i] + missed_twos[i] + 1e-10)
    rec_threes[i] =  true_threes[i] / (true_threes[i] + missed_threes[i] + 1e-10)
    rec_fours[i] =   true_fours[i] / (true_fours[i] + missed_fours[i] + 1e-10)

    f1_zeros[i] =  (2 * prec_zeros[i] * rec_zeros[i]) / (prec_zeros[i] + rec_zeros[i] + 1e-10)
    f1_ones[i] =   (2 * prec_ones[i] * rec_ones[i]) / (prec_ones[i] + rec_ones[i] + 1e-10)
    f1_twos[i] =   (2 * prec_twos[i] * rec_twos[i]) / (prec_twos[i] + rec_twos[i] + 1e-10)
    f1_threes[i] = (2 * prec_threes[i] * rec_threes[i]) / (prec_threes[i] + rec_threes[i] + 1e-10)
    f1_fours[i] =  (2 * prec_fours[i] * rec_fours[i]) / (prec_fours[i] + rec_fours[i] + 1e-10)

    epoch_prec[i] =  zeros_weight * prec_zeros[i] + ones_weight * prec_ones[i] + twos_weight * prec_twos[i] + threes_weight * prec_threes[i] + fours_weight * prec_fours[i]
    epoch_rec[i] =   zeros_weight * rec_zeros[i] + ones_weight * rec_ones[i] + twos_weight * rec_twos[i] + threes_weight * rec_threes[i] + fours_weight * rec_fours[i]
    epoch_f1[i] =    zeros_weight * f1_zeros[i] + ones_weight * f1_ones[i] + twos_weight * f1_twos[i] + threes_weight * f1_threes[i] + fours_weight * f1_fours[i]

    print("Model", str(i), ":")
    print(f"Testing accuracy: {100.0*epoch_acc[i]:4.5f}%")
    print(f"Testing precision: {100.0*epoch_prec[i]:4.5f}%")
    print(f"Testing recall: {100.0*epoch_rec[i]:4.5f}%")
    print(f"Testing F1-score: {100.0*epoch_f1[i]:4.5f}%")

    print("Testing zeros:", str(true_zeros[i] + false_zeros[i]))
    print("Testing ones:", str(true_ones[i] + false_ones[i]))
    print("Testing twos:", str(true_twos[i] + false_twos[i]))
    print("Testing threes:", str(true_threes[i] + false_threes[i]))
    print("Testing fours:", str(true_fours[i] + false_fours[i]))

In [None]:
model_names = os.listdir(DRIVE + 'LSTM_MODELS')
model_names.sort()

models = []
for name in tqdm(model_names):
  temp_mod = EmotionClassifier().to(DEVICE)
  temp_mod.load_state_dict(torch.load(DRIVE + 'LSTM_MODELS/' + name))
  temp_mod.eval()
  mod = Emotioner(FEATURE_EXTRACTOR, WAV2VEC2_MODEL, temp_mod, SAMPLING_RATE).to(DEVICE)
  mod.eval()
  models = np.append(models, mod)

In [None]:
test_mod(models, test_dataloader)

In [None]:
epochs = np.arange(17, 29)

for i in range(models.__len__()):
  plt.figure()
  disp = ConfusionMatrixDisplay(confusion_matrix = confusion_matrix(y_true, y_pred[i]), display_labels = ['anger', 'happiness', 'neutral', 'sadness', 'disgust'])
  disp.plot()
  plt.savefig('confusion_matrix_rnn_' + str(epochs[i]) + '_.png')
  plt.show()

In [None]:
for e in epochs:
  shutil.copy('/content/confusion_matrix_rnn_' + str(e) +'_.png', DRIVE + 'LSTM_CONFUSION/confusion_matrix_lstm_' + str(e) + '.png')