<a href="https://colab.research.google.com/github/NilofarMoradiFarisar/My_project_2022_4/blob/main/Conversational_Transformer_Network_for_Emotion_Recognition.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import torchtext
import torch.nn as nn
from torch.autograd import Variable
from torch.nn import functional as F

import math
import numpy as np
import copy
import time

import pandas as pd
from sklearn.model_selection import train_test_split


import os
import numpy as np
import pandas as pd
from datetime import datetime
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable

import pickle
import random
import librosa
import librosa.display
import IPython.display
import warnings
warnings.filterwarnings(action='ignore')


from google.colab import drive
drive.mount('/content/G')

Drive already mounted at /content/G; to attempt to forcibly remount, call drive.mount("/content/G", force_remount=True).


In [None]:
path_to_train_dataset = "/content/G/MyDrive/VED/EMO-DB-CLASSIFIED-0.1/train/"
path_to_test_dataset = "/content/G/MyDrive/VED/EMO-DB-CLASSIFIED-0.1/test/"

In [None]:
def get_abs_subdir_files(path_to_folder):

    filelist = []
    for root, dirs, files in os.walk(path_to_folder):
        for file in files:
            filelist.append(os.path.join(root,file))

    return filelist



def remove_silent_space(signal, sr=22050, threshold=0.0005):
    mask = []
    signal_abs = pd.Series(signal).apply(np.abs)
    signal_mean = signal_abs.rolling(window = int(sr/10), min_periods = 1, center = True).mean()
    for mean in signal_mean:
        if mean > threshold:
            mask.append(True)
        else:
            mask.append(False)
    return np.array(signal[mask])




def get_label(file):

    emotion_mappings  = {
                            "A" : 0,
                            "T" : 1,
                            "E" : 2,
                            "W" : 3,
                            "L" : 4,
                            "N" : 5,
                            "F" : 6
                        }

    return emotion_mappings[file[-6:-5]]



def get_voice_features(signal, sample_rate=22050, n_fft = 2048, hop_length = 512, n_mfcc = 13):

    voice_features = []
    signal = signal[:22050]
    voice_features.append(librosa.feature.mfcc(signal, sr=sample_rate, n_fft = n_fft, hop_length=hop_length, n_mfcc=n_mfcc))
    voice_features.append(librosa.feature.melspectrogram(signal, sr=sample_rate, n_fft = n_fft, hop_length=hop_length))
    voice_features.append(librosa.feature.chroma_stft(signal, sr=sample_rate, n_fft = n_fft, hop_length=hop_length))
    voice_features.append(librosa.feature.spectral_contrast(signal, sr=sample_rate, n_fft = n_fft, hop_length=hop_length))
    voice_features.append(librosa.feature.tonnetz(signal, sr=sample_rate, hop_length=hop_length))

    signal_harmonic, signal_percussive = librosa.effects.hpss(signal)
    melspec_harmonic = librosa.feature.melspectrogram(signal_harmonic, n_mels = 64)
    melspec_percussive = librosa.feature.melspectrogram(signal_percussive, n_mels = 64)
    hpss_hp = np.average([melspec_harmonic, melspec_percussive], axis=0)

    voice_features.append(hpss_hp)


    image_width = voice_features[0].shape[1]
    image_height = 0
    for v_feat in voice_features:
        image_height += v_feat.shape[0]


    start_index = 0
    feature = np.zeros((image_height, image_width))
    for v_feat in voice_features:
        feature[start_index: start_index + v_feat.shape[0], :] = v_feat
        start_index += v_feat.shape[0]

    feature = torch.from_numpy(feature.astype(np.float32))

    return (feature.T)



class EMODB_Dataset(Dataset):

    def __init__(self, path_to_audio_waves, sample_rate=22050):

        self.audio_waves = get_abs_subdir_files(path_to_audio_waves)
        self.sample_rate = sample_rate
        self.labels = []
        self.features = []

        for idx in range(len(self.audio_waves)):
            audio_file = self.audio_waves[idx]
            signal, sr = librosa.load(audio_file, sr=self.sample_rate)
            signal = remove_silent_space(signal, sr=sr)
            
            label = get_label(audio_file)

            label = np.array(label)
            label = torch.from_numpy(label)
            self.features.append(get_voice_features(signal))
            self.labels.append(label)


    def __len__(self):
        return len(self.audio_waves)

    def __getitem__(self, idx):

        return self.features[idx], self.labels[idx]

In [None]:
training_data = EMODB_Dataset(path_to_train_dataset)
test_data = EMODB_Dataset(path_to_test_dataset)

In [None]:
train_loader = DataLoader(training_data, batch_size=8, shuffle=True)
test_loader = DataLoader(test_data, batch_size=8, shuffle=True)

batch = next(iter(train_loader))
batch[0].shape, batch[1].shape

(torch.Size([8, 44, 230]), torch.Size([8]))

In [None]:
class PositionalEncoder(nn.Module):
    def __init__(self, d_model, max_seq_len = 32):
        super().__init__()
        self.d_model = d_model
        pe = torch.zeros(max_seq_len, d_model)

        for pos in range(max_seq_len):
            for i in range(0, d_model, 2):
                pe[pos, i] = \
                math.sin(pos / (10000 ** ((2 * i)/d_model)))
                pe[pos, i + 1] = \
                math.cos(pos / (10000 ** ((2 * (i + 1))/d_model)))
                
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)
 
    
    def forward(self, x):
        x = x * math.sqrt(self.d_model)
        seq_len = x.size(1)
        x = x + Variable(self.pe[:,:seq_len], requires_grad=False).cuda()
        return x

In [None]:
class MultiHeadAttention(nn.Module):
    def __init__(self, heads, d_model, dropout = 0.1):
        super().__init__()
        
        self.d_model = d_model
        self.d_k = d_model // heads
        self.h = heads
        
        self.q_linear = nn.Linear(d_model, d_model)
        self.v_linear = nn.Linear(d_model, d_model)
        self.k_linear = nn.Linear(d_model, d_model)
        self.dropout = nn.Dropout(dropout)
        self.out = nn.Linear(d_model, d_model)
    
    def forward(self, q, k, v, mask=None):
        
        bs = q.size(0)
                
        k = self.k_linear(k).view(bs, -1, self.h, self.d_k)
        q = self.q_linear(q).view(bs, -1, self.h, self.d_k)
        v = self.v_linear(v).view(bs, -1, self.h, self.d_k)
        
       
        k = k.transpose(1,2)
        q = q.transpose(1,2)
        v = v.transpose(1,2)
        scores = attention(q, k, v, self.d_k, mask, self.dropout)
        
        concat = scores.transpose(1,2).contiguous().view(bs, -1, self.d_model)
        output = self.out(concat)
    
        return output

In [None]:
def attention(q, k, v, d_k, mask=None, dropout=None):
    
    scores = torch.matmul(q, k.transpose(-2, -1)) /  math.sqrt(d_k)

    if mask is not None:
            mask = mask.unsqueeze(1)
            scores = scores.masked_fill(mask == 0, -1e9)
    scores = F.softmax(scores, dim=-1)
    
    if dropout is not None:
        scores = dropout(scores)
    output = torch.matmul(scores, v)
    
    return output

In [None]:
class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff=2048, dropout = 0.5):
        super().__init__() 
        # We set d_ff as a default to 2048
        self.linear_1 = nn.Linear(d_model, d_ff)
        self.dropout = nn.Dropout(dropout)
        self.linear_2 = nn.Linear(d_ff, d_model)
    def forward(self, x):
        x = self.dropout(F.relu(self.linear_1(x)))
        x = self.linear_2(x)
        return x

In [None]:
class Norm(nn.Module):
    def __init__(self, d_model, eps = 1e-6):
        super().__init__()
    
        self.size = d_model
        # create two learnable parameters to calibrate normalisation
        self.alpha = nn.Parameter(torch.ones(self.size))
        self.bias = nn.Parameter(torch.zeros(self.size))
        self.eps = eps
    def forward(self, x):
        norm = self.alpha * (x - x.mean(dim=-1, keepdim=True)) / (x.std(dim=-1, keepdim=True) + self.eps) + self.bias
        return norm

In [None]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, heads, dropout = 0.1):
        super().__init__()
        self.norm_1 = Norm(d_model)
        self.norm_2 = Norm(d_model)
        self.attn = MultiHeadAttention(heads, d_model)
        self.ff = FeedForward(d_model)
        self.dropout_1 = nn.Dropout(dropout)
        self.dropout_2 = nn.Dropout(dropout)
        
    def forward(self, x, mask):
        x2 = self.norm_1(x)
        x = x + self.dropout_1(self.attn(x2,x2,x2,mask))
        x2 = self.norm_2(x)
        x = x + self.dropout_2(self.ff(x2))
        return x

def get_clones(module, N):
    return nn.ModuleList([copy.deepcopy(module) for i in range(N)])

In [None]:
class Encoder(nn.Module):
    def __init__(self, input_channel, d_model, N, heads, output_channel=32, frame_attention=True):
        super().__init__()
        self.frame_attention = frame_attention
        self.N = N
        self.conv1d = torch.nn.Conv1d(input_channel, output_channel, 15, stride=1)
        self.pe = PositionalEncoder(d_model)
        self.layers = get_clones(EncoderLayer(d_model, heads), N)
        self.norm = Norm(d_model)

        self.Wz = Variable(torch.randn(d_model, 1).type(torch.FloatTensor), requires_grad=True).to("cuda")
        self.bz = Variable(torch.randn(output_channel, 1).type(torch.FloatTensor), requires_grad=True).to("cuda")

        self.fc = nn.Linear(d_model, 7)
        self.dropout = nn.Dropout()

    def forward(self, x, mask=None):
        x = self.conv1d(x)
        x = self.pe(x).cuda()
        for i in range(N):
            x = self.layers[i](x.cuda(), None)

        #Frame-based Attention Mechanism
        if self.frame_attention:
            logit = x @ self.Wz + self.bz
            alpha_fuse = F.softmax(logit, dim=1)
            alpha_fuse = alpha_fuse.transpose(dim0=1, dim1=2)
            x = alpha_fuse @ x
            x = x.reshape(-1, d_model)
            print("self",x.shape)
        else:
            x = x[:, -1, :]


        x = self.fc(x)
        x = self.dropout(x)

        return x

In [None]:
# m = torch.nn.Conv1d(44, 44, 15, stride=1)
# input = torch.randn(20, 44, 216)
# n = m(input)
# input.shape, m(input).shape

In [None]:
# import torch
# from torch.autograd import Variable

# d_model = 216
# input_channel = 32
# batch = 8

# input = torch.randn(batch, input_channel, d_model)


# dtype = torch.FloatTensor
# N, D_in, H, D_out = 64, 1000, 100, 10

# Wz = Variable(torch.randn(d_model, 1).type(dtype), requires_grad=True)
# bz = Variable(torch.randn(input_channel, 1).type(dtype), requires_grad=True)
# logit = input @ Wz + bz
# alpha_fuse = F.softmax(logit, dim=1)
# alpha_fuse = alpha_fuse.transpose(dim0=1, dim1=2)
# best_output = alpha_fuse @ input
# best_output = best_output.reshape(-1, d_model)

In [None]:
d_model = 216
heads = 8
N = 6
input_channel = 44

model = Encoder(input_channel, d_model, N, heads, frame_attention=False)

for p in model.parameters():
    if p.dim() > 1:
        nn.init.xavier_uniform_(p)

optimizer = torch.optim.Adam(model.parameters(), lr=0.001, betas=(0.9, 0.98), eps=1e-9)
criterion = nn.CrossEntropyLoss()

In [None]:
model.to("cuda")

Encoder(
  (conv1d): Conv1d(44, 32, kernel_size=(15,), stride=(1,))
  (pe): PositionalEncoder()
  (layers): ModuleList(
    (0): EncoderLayer(
      (norm_1): Norm()
      (norm_2): Norm()
      (attn): MultiHeadAttention(
        (q_linear): Linear(in_features=216, out_features=216, bias=True)
        (v_linear): Linear(in_features=216, out_features=216, bias=True)
        (k_linear): Linear(in_features=216, out_features=216, bias=True)
        (dropout): Dropout(p=0.1, inplace=False)
        (out): Linear(in_features=216, out_features=216, bias=True)
      )
      (ff): FeedForward(
        (linear_1): Linear(in_features=216, out_features=2048, bias=True)
        (dropout): Dropout(p=0.5, inplace=False)
        (linear_2): Linear(in_features=2048, out_features=216, bias=True)
      )
      (dropout_1): Dropout(p=0.1, inplace=False)
      (dropout_2): Dropout(p=0.1, inplace=False)
    )
    (1): EncoderLayer(
      (norm_1): Norm()
      (norm_2): Norm()
      (attn): MultiHeadAttenti

In [None]:
# for inputs, targets in train_loader:
#     inputs, targets = inputs.to("cuda"), targets.to("cuda")
#     outputs = model(inputs,mask=None)
#     loss = criterion(outputs, targets)
#     print (loss)
#     break

In [None]:
def get_test_dataset_accuracy(test_loader, device="cuda:0"):
    
    n_correct = 0.
    n_total = 0.
    predictions_total = []
    targets_total = []

    for inputs, targets in test_loader:

        inputs, targets = inputs.to(device), targets.to(device)
        outputs = model(inputs)
        _, predictions = torch.max(outputs, 1)
        n_correct += (predictions == targets).sum().item()
        n_total += targets.shape[0]
        predictions_total += list(predictions.to("cpu").numpy().astype(int))
        targets_total += list(targets.to("cpu").numpy().astype(int))

    return n_correct / n_total, predictions_total, targets_total

In [None]:
def batch_gd(model, criterion, optimizer, train_loader, test_loader, epochs, device="cuda"):
  
  accuracy_best = -1
  train_losses = np.zeros(epochs)
  test_losses = np.zeros(epochs)

  for it in range(epochs):
    model.train()
    t0 = datetime.now()
    train_loss = []
    for inputs, targets in train_loader:

      inputs, targets = inputs.to(device), targets.to(device)
      optimizer.zero_grad()
      outputs = model(inputs)
      loss = criterion(outputs, targets)

      loss.backward()
      optimizer.step()
      train_loss.append(loss.item())

    train_loss = np.mean(train_loss) # a little misleading
    
    model.eval()
    test_loss = []
    for inputs, targets in test_loader:
      inputs, targets = inputs.to(device), targets.to(device)
      outputs = model(inputs)
      loss = criterion(outputs, targets)
      test_loss.append(loss.item())
    test_loss = np.mean(test_loss)
    

    # Save losses
    train_losses[it] = train_loss
    test_losses[it] = test_loss
    

    accuracy_test,  predictions_total, targets_total = get_test_dataset_accuracy(test_loader, device="cuda:0")
    accuracy_train,  predictions_total, targets_total = get_test_dataset_accuracy(train_loader, device="cuda:0")


    dt = datetime.now() - t0

    print(f'Epoch {it+1}/{epochs}, Train Loss: {train_loss:.4f}, Test Loss: {test_loss:.4f}, accuracy_train: {accuracy_train:.4f}, accuracy_test: {accuracy_test:.4f}, Duration: {dt}')
        
  return train_losses, test_losses

# Results taken from last layer (Best 0.68)

In [None]:
train_losses, test_losses = batch_gd(model, criterion, optimizer, train_loader, test_loader, epochs=200)

Epoch 1/200, Train Loss: 912.0397, Test Loss: 452.3109, accuracy_train: 0.2971, accuracy_test: 0.2766, Duration: 0:00:01.952119
Epoch 2/200, Train Loss: 426.3029, Test Loss: 324.7593, accuracy_train: 0.4816, accuracy_test: 0.4043, Duration: 0:00:01.819651
Epoch 3/200, Train Loss: 297.5868, Test Loss: 298.1315, accuracy_train: 0.5553, accuracy_test: 0.4043, Duration: 0:00:01.786297
Epoch 4/200, Train Loss: 175.3705, Test Loss: 184.6307, accuracy_train: 0.5697, accuracy_test: 0.3191, Duration: 0:00:01.807451
Epoch 5/200, Train Loss: 132.5955, Test Loss: 182.1044, accuracy_train: 0.6230, accuracy_test: 0.4468, Duration: 0:00:01.795618
Epoch 6/200, Train Loss: 131.1731, Test Loss: 172.4223, accuracy_train: 0.5963, accuracy_test: 0.4043, Duration: 0:00:01.769391
Epoch 7/200, Train Loss: 96.3859, Test Loss: 201.6052, accuracy_train: 0.6680, accuracy_test: 0.4043, Duration: 0:00:01.776223
Epoch 8/200, Train Loss: 79.1019, Test Loss: 161.1593, accuracy_train: 0.7049, accuracy_test: 0.5745, Dur

# Results with Frame-based Attention Mechanism (Best 65.96)

In [None]:
train_losses, test_losses = batch_gd(model, criterion, optimizer, train_loader, test_loader, epochs=200)

Epoch 1/200, Train Loss: 1200.6798, Test Loss: 796.1382, accuracy_train: 0.3463, accuracy_test: 0.2553, Duration: 0:00:03.239980
Epoch 2/200, Train Loss: 698.3785, Test Loss: 577.1768, accuracy_train: 0.3525, accuracy_test: 0.3617, Duration: 0:00:01.826536
Epoch 3/200, Train Loss: 580.9515, Test Loss: 533.0606, accuracy_train: 0.4037, accuracy_test: 0.2766, Duration: 0:00:01.806217
Epoch 4/200, Train Loss: 404.0277, Test Loss: 402.9656, accuracy_train: 0.5451, accuracy_test: 0.4255, Duration: 0:00:01.816111
Epoch 5/200, Train Loss: 321.5694, Test Loss: 527.5856, accuracy_train: 0.5717, accuracy_test: 0.3617, Duration: 0:00:01.820112
Epoch 6/200, Train Loss: 366.0500, Test Loss: 416.2723, accuracy_train: 0.6455, accuracy_test: 0.4468, Duration: 0:00:01.815130
Epoch 7/200, Train Loss: 306.3182, Test Loss: 419.3286, accuracy_train: 0.5738, accuracy_test: 0.4043, Duration: 0:00:01.799698
Epoch 8/200, Train Loss: 273.0719, Test Loss: 420.1214, accuracy_train: 0.5840, accuracy_test: 0.3830, 