In [None]:
!git clone https://github.com/Mr-Patty/bimodal-emotion-recognition
!pip install soundfile

In [None]:
# Load the Drive helper and mount
from google.colab import drive

# This will prompt for authorization.
drive.mount('/content/drive')

In [None]:
!cp drive/'My Drive'/EmotionRecognition/bimodal-emotion-recognition.tar.gz bimodal-emotion-recognition/

In [None]:
%cd bimodal-emotion-recognition

In [None]:
!tar -C . -xzf bimodal-emotion-recognition.tar.gz
!rm bimodal-emotion-recognition.tar.gz
!mv Audio_ogg_10 Audio_preprocess

In [None]:
# !python processing.py

In [2]:
import wave
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import pandas as pd
import math
import librosa
import json
import wave
import sys
import pickle
import sklearn

import urllib.request

import librosa.display
import scipy, matplotlib.pyplot as plt, IPython.display as ipd

from scipy.io import wavfile

import librosa.display
import soundfile as sf

import torch
import torch.nn as nn
from torch import optim
from torch.autograd import Variable
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torch.utils.data.sampler import SubsetRandomSampler

import os
import matplotlib.style as ms
from tqdm import tqdm
import random

from utils import *
ms.use('seaborn-muted')
%matplotlib inline

## Audio dataset

In [3]:
class AudioDataset(torch.utils.data.Dataset):
    def __init__(self, sampling_rate=44100, wav_files_path='Audio_preprocess/', meta_data="df_prep.csv", shuffle=False):
        self.emotion_dict = {'ang': 0,
                             'dis': 1,
                             'hap': 2,
                             'sad': 3,
                             'sca': 4,
                             'sur': 5,
                             'neu': 6
                            }
        
        self.wav_files_path = wav_files_path
        self.sr = sampling_rate
        self.shuffle = shuffle
        self.meta_data = pd.read_csv(meta_data).to_numpy()
        
    def getAudio(self, file_path):
        wav_vector, _sr = librosa.load(file_path, sr=self.sr)
#         wav_vector, _sr = torchaudio.load(file_path)
        """
        TODO: implement later needed processing of wav vector
        """
        
        try:
#             feature_vector = torchaudio.transforms.MelSpectrogram(_sr)(wav_vector)
            feature_vector = librosa.feature.melspectrogram(y=wav_vector, sr=_sr)
            feature_vector = torch.from_numpy(feature_vector).permute(1, 0)
        except:
            print(file_path)
            raise
        return feature_vector
                
    def __getitem__(self, index):
        file_path = self.wav_files_path + self.meta_data[index][0] + '.ogg'
        emotion = self.emotion_dict[self.meta_data[index][1]]
        emotion = torch.tensor(emotion)
        return self.getAudio(file_path), emotion

    def __len__(self):
        return len(self.meta_data)

In [None]:
dataset = AudioDataset(wav_files_path='Audio_ogg_10/')
train_loader = DataLoader(dataset, num_workers=1, shuffle=False,
                              batch_size=1)

# for batch in tqdm(train_loader):
#     x, y = batch

## Models

In [6]:
class AttentionRNNModel(torch.nn.Module):
    def __init__(self, batch_size, output_size, hidden_size, embedding_length, bidirectional=False, num_layers=1, dropout=0.5):
        super(AttentionRNNModel, self).__init__()

        """
        Arguments
        ---------
        batch_size : Size of the batch which is same as the batch_size of the data returned by the TorchText BucketIterator
        output_size : 2 = (pos, neg)
        hidden_sie : Size of the hidden_state of the LSTM
        vocab_size : Size of the vocabulary containing unique words
        embedding_length : Embeddding dimension of GloVe word embeddings
        weights : Pre-trained GloVe word_embeddings which we will use to create our word_embedding look-up table 

        --------

        """

        self.batch_size = batch_size
        self.output_size = output_size
        self.hidden_size = hidden_size
#         self.vocab_size = vocab_size
        self.embedding_length = embedding_length
        self.bidirectional = bidirectional
        self.num_layers = num_layers

#         self.word_embeddings = nn.Embedding(vocab_size, embedding_length)
#         self.word_embeddings.weights = nn.Parameter(weights, requires_grad=False)
        self.lstm = nn.LSTM(embedding_length, hidden_size, bidirectional=self.bidirectional, num_layers=self.num_layers)
        self.label = nn.Linear(hidden_size * 2 if self.bidirectional else hidden_size, output_size)
        self.dropout = nn.Dropout(dropout)
        #self.attn_fc_layer = nn.Linear()

    def attention_net(self, lstm_output, final_state):

        """ 
        Now we will incorporate Attention mechanism in our LSTM model. In this new model, we will use attention to compute soft alignment score corresponding
        between each of the hidden_state and the last hidden_state of the LSTM. We will be using torch.bmm for the batch matrix multiplication.

        Arguments
        ---------

        lstm_output : Final output of the LSTM which contains hidden layer outputs for each sequence.
        final_state : Final time-step hidden state (h_n) of the LSTM

        ---------
    
        Returns : It performs attention mechanism by first computing weights for each of the sequence present in lstm_output and and then finally computing the
                  new hidden state.

        Tensor Size :
                hidden.size() = (batch_size, hidden_size)
                attn_weights.size() = (batch_size, num_seq)
                soft_attn_weights.size() = (batch_size, num_seq)
                new_hidden_state.size() = (batch_size, hidden_size)

        """

        hidden = final_state.squeeze(0)
        attn_weights = torch.bmm(lstm_output, hidden.unsqueeze(2)).squeeze(2)
        soft_attn_weights = F.softmax(attn_weights, 1)
        new_hidden_state = torch.bmm(lstm_output.transpose(1, 2), soft_attn_weights.unsqueeze(2)).squeeze(2)

        return new_hidden_state

    def forward(self, input, batch_size=None):

        """ 
        Parameters
        ----------
        input_sentence: input_sentence of shape = (batch_size, num_sequences)
        batch_size : default = None. Used only for prediction on a single sentence after training (batch_size = 1)
    
        Returns
        -------
        Output of the linear layer containing logits for pos & neg class which receives its input as the new_hidden_state which is basically the output of the Attention network.
        final_output.shape = (batch_size, output_size)

        """

#         input = self.word_embeddings(input_sentences)
        input = input.permute(1, 0, 2)
        # h_0 = torch.zeros(1, self.batch_size, self.hidden_size)
        # c_0 = torch.zeros(1, self.batch_size, self.hidden_size)

        output, (final_hidden_state, final_cell_state) = self.lstm(input) # final_hidden_state.size() = (1, batch_size, hidden_size) 
        # output = output.permute(0, 2, 1) # output.size() = (batch_size, num_seq, hidden_size)

        # attn_output = self.attention_net(output, final_hidden_state)
        final_hidden_state = self.dropout(torch.cat((final_hidden_state[-2,:,:], final_hidden_state[-1,:,:]), dim = 1))
        logits = self.label(final_hidden_state)

        return logits

class TextRNN(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, n_layers, 
                 bidirectional, dropout, pad_idx):
        
        super().__init__()
        
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx = pad_idx)
        
        self.rnn = nn.LSTM(embedding_dim, 
                           hidden_dim, 
                           num_layers=n_layers, 
                           bidirectional=bidirectional, 
                           dropout=dropout)
        
        self.fc = nn.Linear(hidden_dim * 2, output_dim)
        
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, text, text_lengths):
        
        #text = [sent len, batch size]
        
        embedded = self.dropout(self.embedding(text))
        
        #embedded = [sent len, batch size, emb dim]
        
        #pack sequence
        packed_embedded = nn.utils.rnn.pack_padded_sequence(embedded, text_lengths, enforce_sorted=False)
        
        packed_output, (hidden, cell) = self.rnn(packed_embedded)
        
        #unpack sequence
        output, output_lengths = nn.utils.rnn.pad_packed_sequence(packed_output)

        #output = [sent len, batch size, hid dim * num directions]
        #output over padding tokens are zero tensors
        
        #hidden = [num layers * num directions, batch size, hid dim]
        #cell = [num layers * num directions, batch size, hid dim]
        
        #concat the final forward (hidden[-2,:,:]) and backward (hidden[-1,:,:]) hidden layers
        #and apply dropout
        
        hidden = self.dropout(torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim = 1))
                
        #hidden = [batch size, hid dim * num directions]
            
        return self.fc(hidden)
    
def Hybrid_model(nn.Module):
    def __init__(self, model_audio, model_text, hidden_dim=64, output_dim=7):
        
        super().__init__()
        self.model_audio = model_audio
        self.model_text = model_text
        self.model_text.fc = nn.Identity()
        self.model_audio.label = nn.Identity()
        
        self.fc = nn.Linear(hidden_dim * 4, output_dim)
        
    def forward(self, text, text_lengths, mel):
        
        hid_text = self.model_text(text, text_lengths)
        hid_audio = self.model_audio(mel)
        
        hid_context = torch.cat((hid_text, hid_audio), dim = 1)
        print(hid_context.shape)
        
        return self.fc(hid_context)

In [None]:
def categorical_accuracy(preds, y):
    """
    Returns accuracy per batch, i.e. if you get 8/10 right, this returns 0.8, NOT 8
    """
    max_preds = preds.argmax(dim = 1, keepdim = True) # get the index of the max probability
    correct = max_preds.squeeze(1).eq(y)
    return correct.sum() / torch.FloatTensor([y.shape[0]])

def train(model, iterator, optimizer, criterion):
    
    epoch_loss = 0
    epoch_acc = 0
    
    model.train()
    
    for batch in iterator:
        
        optimizer.zero_grad()
        
        text, text_lengths = batch.text
        file_path = batch.file
        melpath = 'Train_data/' + file_path + '.mel'
        mel = torch.load(melpath).squeeze(0).permute(1, 0)
        
        predictions = model(text, text_lengths, mel)
#         print(predictions.shape, batch.label.shape)
        # print(predictions, batch.label)
        loss = criterion(predictions, batch.label)
        
        acc = categorical_accuracy(predictions, batch.label)
        
        loss.backward()
        
        optimizer.step()
        
        epoch_loss += loss.item()
        epoch_acc += acc.item()
        
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

def evaluate(model, iterator, criterion):
    
    epoch_loss = 0
    epoch_acc = 0
    
    model.eval()
    
    with torch.no_grad():
    
        for batch in iterator:

            text, text_lengths = batch.text
            file_path = batch.file
            melpath = 'Train_data/' + file_path + '.mel'
            mel = torch.load(melpath).squeeze(0).permute(1, 0)
            
            predictions = model(text, text_lengths, mel).squeeze(1)
            
            loss = criterion(predictions, batch.label)
            
            acc = categorical_accuracy(predictions, batch.label)

            epoch_loss += loss.item()
            epoch_acc += acc.item()
        
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs

In [None]:
from spacy.lang.ru import Russian
from spacy_russian_tokenizer import RussianTokenizer, MERGE_PATTERNS
nlp = Russian()
russian_tokenizer = RussianTokenizer(nlp, MERGE_PATTERNS)
nlp.add_pipe(russian_tokenizer, name='russian_tokenizer')
def tokenize_ru(sentence):
    return [tok.text for tok in nlp(sentence)]

FILE = data.Field()
TEXT = data.Field(tokenize = tokenize_ru, include_lengths = True)
LABEL = data.LabelField(dtype = torch.long)

fields = [('file', FILE), ('text', TEXT), ('label', LABEL)]

train_data, test_data = data.TabularDataset.splits(
                                        path = './',
                                        train = 'train_text.csv',
                                        # validation = 'valid.csv',
                                        test = 'test_text.csv',
                                        format = 'csv',
                                        fields = fields,
                                        skip_header = True)
import os
url = 'https://dl.fbaipublicfiles.com/fasttext/vectors-wiki/wiki.ru.vec'
name = os.path.basename(url)
vec = torchtext.vocab.Vectors(name, url=url)
MAX_VOCAB_SIZE = 25_000

# vec = torchtext.vocab.FastText(language='en')

TEXT.build_vocab(train_data, 
                 max_size = MAX_VOCAB_SIZE, 
                 vectors = vec, 
                 unk_init = torch.Tensor.normal_)

LABEL.build_vocab(train_data)
FILE.build_vocab(train_data)

BATCH_SIZE = 1

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

train_iterator, test_iterator = data.BucketIterator.splits(
    (train_data, test_data), 
    batch_size = BATCH_SIZE,
    shuffle = True,
    sort = False,
    device = device)

In [8]:
Batch = 1

INPUT_DIM = len(TEXT.vocab)
EMBEDDING_DIM = 300
HIDDEN_DIM = 64
OUTPUT_DIM = len(LABEL.vocab)
N_LAYERS = 2
BIDIRECTIONAL = True
DROPOUT = 0.5
PAD_IDX = TEXT.vocab.stoi[TEXT.pad_token]

model_audio = AttentionRNNModel(batch_size=Batch, output_size=7, hidden_size=64, embedding_length=128)
model_text = TextRNN(INPUT_DIM, 
                    EMBEDDING_DIM, 
                    HIDDEN_DIM, 
                    OUTPUT_DIM, 
                    N_LAYERS, 
                    BIDIRECTIONAL, 
                    DROPOUT, 
                    PAD_IDX)
# pretrained_embeddings = TEXT.vocab.vectors
# UNK_IDX = TEXT.vocab.stoi[TEXT.unk_token]

# model.embedding.weight.data[UNK_IDX] = torch.zeros(EMBEDDING_DIM)
# model.embedding.weight.data[PAD_IDX] = torch.zeros(EMBEDDING_DIM)


model_audio.load_state_dict('audio_model.pt')
model_text.load_state_dict('text_model.pt')
for param in model_audio.parameters():
    param.requires_grad = False

for param in model_text.parameters():
    param.requires_grad = False
    
model = Hybrid_model(model_audio, model_text, hidden_dim=64, output_dim=7)

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
optimizer = optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()
model = model.to(device)
criterion = criterion.to(device)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min')

In [None]:
N_EPOCHS = 50

best_valid_loss = float('inf')
train_losses = []
test_losses = []
train_acces = []
test_acces = []

for epoch in range(N_EPOCHS):
    
    start_time = time.time()
    
    train_loss, train_acc = train(model, train_iterator, optimizer, criterion)
    valid_loss, valid_acc = evaluate(model, test_iterator, criterion)
    
    end_time = time.time()
    train_losses.append(train_loss)
    test_losses.append(valid_loss)
    train_acces.append(train_acc)
    test_acces.append(valid_acc)
    scheduler.step(train_loss)
    

    epoch_mins, epoch_secs = epoch_time(start_time, end_time)
    
    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), 'hybrid-model.pt')
    
    print(f'Epoch: {epoch+1:02} | Epoch Time: {epoch_mins}m {epoch_secs}s')
    print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.2f}%')
    print(f'\t Val. Loss: {valid_loss:.3f} |  Val. Acc: {valid_acc*100:.2f}%')

In [None]:
!cp hybrid-model.pt drive/'My Drive'/EmotionRecognition/

In [None]:
import matplotlib
import matplotlib.pyplot as plt
fig = plt.figure(figsize=(14 ,5))
ax1 = fig.add_subplot(121)
plt.plot(train_losses)
plt.title("Train loss")

In [None]:
fig = plt.figure(figsize=(14 ,5))
ax1 = fig.add_subplot(121)
plt.plot(test_losses)
plt.title("Test loss")

In [None]:
fig = plt.figure(figsize=(14 ,5))
ax1 = fig.add_subplot(121)
plt.plot(train_acces)
plt.title("Accuracy")

In [None]:
fig = plt.figure(figsize=(14 ,5))
ax1 = fig.add_subplot(121)
plt.plot(test_acces)
plt.title("Accuracy")

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)
train_loss, val_loss, best_model = train(model, dataset, device, batch_size=Batch, n_epoch=1, validation_split=.95)

In [None]:
train_loader = DataLoader(dataset, num_workers=1, shuffle=False,
                              batch_size=1)

for batch in dataset:
    x, y= batch
    if x.size(0) == 0:
#         print(x.shape, y.shape)
        print(x)
#     break