# Emotional Speech Recognition
This notebook will be an audio classification problem and solved with Audio Feature extraction and augmentation, Machine Learning and Deep Learning. 

https://www.kaggle.com/dejolilandry/asvpesdspeech-nonspeech-emotional-utterances

In [None]:
import numpy as np 
import pandas as pd
import os 
import math
import librosa
import torch 
import torchaudio 
import torch.nn as nn 
import torch.optim as optim
from tqdm import tqdm
import torch.nn.functional as F
import torch.optim as optim

from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt 
#%matplotlib_inline
import seaborn as sns
import librosa 
import librosa.display

import IPython.display as ipd


# custom modules 
from MyDataClasses import ASVPDataset
from torch_functions import *
import torch_functions as H

In [None]:
ASVP_dir = '/Users/stephen/Desktop/Speech_Recognition/Data/ASVP-ESD_UPDATE/Audio/'
ASVP_metadata = pd.read_csv('/Users/stephen/Desktop/Speech_Recognition/Data/ASVP-ESD_UPDATE/asvp_metadata.csv', index_col=0)
ASVP_metadata.head()

In [None]:
print(f'The length of the dataset is: {len(ASVP_metadata)}')

In [None]:
ASVP_metadata.describe()

# Deciding how long all the files will be 
inorder for the CNN to take in an audio dataset all the audio files must be the same length. From the analysis we did in EDA we can see the lengths vary from 3.7 seconds to over 330 seconds.

In [None]:
# how many are less than 20 seconds 
over_20_sec = len(ASVP_metadata) - len(ASVP_metadata[ASVP_metadata['Duration'] <= 20])
print(f"There are {over_20_sec} rows over 20 seconds")

Looks like we only lose 17 rows if we get ride of everything over 20 seconds. Lets see how many we lose when we cut everything over 15 seconds and 10 seconds. The shorter then length of all our audio files the less computation power we'll nee. 

In [None]:
longest_file = ASVP_metadata[ASVP_metadata['Duration']==ASVP_metadata['Duration'].max()]
longest_file

In [None]:
longest_file_path = f"{ASVP_dir}actor_3/03-01-05-01-14-03-02-03-01.wav"	
longest_file_path

In [None]:
longest_wav, sr = torchaudio.load(longest_file_path)
H.plot_waveform(longest_wav, sr)

In [None]:
H.play_audio(longest_wav, sr)

In [None]:
# how many are less than 15 seconds 
over_15_sec = len(ASVP_metadata) - len(ASVP_metadata[ASVP_metadata['Duration'] <= 15])
print(f"There are {over_15_sec} rows over 15 seconds")

In [None]:
# how many are less than 15 seconds 
over_10_sec = len(ASVP_metadata) - len(ASVP_metadata[ASVP_metadata['Duration'] <= 10])
print(f"There are {over_10_sec} rows over 10 seconds")

# New DataFrame 
lets decide how long are samples should be but lets look how much it effects the classes 

In [None]:
df_20s = ASVP_metadata[ASVP_metadata['Duration'] <= 20]
df_15s = ASVP_metadata[ASVP_metadata['Duration'] <= 15]
df_10s = ASVP_metadata[ASVP_metadata['Duration'] <= 10]


print(f'Length of dataframe (20 seconds): {len(df_20s)} rows')
print(f'Length of dataframe (15 seconds): {len(df_15s)} rows')
print(f'Length of dataframe (10 seconds): {len(df_10s)} rows')

In [None]:
df_20s['Emotions'].value_counts()

In [None]:
plt.title('Count of Emotions', size=20)
sns.countplot(df_20s.Emotions)
plt.ylabel('Count', size=12)
plt.xlabel('Emotions', size=12)
sns.despine(top=True, right=True, left=False, bottom=False)
plt.figure(figsize=(20,56))
plt.show()

In [None]:
plt.title('Count of Emotions', size=20)
sns.countplot(df_15s.Emotions)
plt.ylabel('Count', size=12)
plt.xlabel('Emotions', size=12)
sns.despine(top=True, right=True, left=False, bottom=False)
plt.figure(figsize=(20,56))
plt.show()

In [None]:
plt.title('Count of Emotions', size=20)
sns.countplot(df_10s.Emotions)
plt.ylabel('Count', size=12)
plt.xlabel('Emotions', size=12)
sns.despine(top=True, right=True, left=False, bottom=False)
plt.figure(figsize=(20,56))
plt.show()

So lets go with the 10 second dataframe. It will contain the Path's for all the files that are 10 seconds or less. 

In [None]:
df_10s.describe()

In [None]:
df_10s['Emotions'].value_counts()

In [None]:
# Global Variables 

SAMPLE_RATE = 16000
N_FFT = int(0.025 * SAMPLE_RATE)   # 25 ms 
HOP_LENGTH = int(0.01 * SAMPLE_RATE)  # 10 ms
DURATION = 10
N_SAMPLES = SAMPLE_RATE * DURATION 

if torch.cuda.is_available():
    device = "cuda"
else:
    device = "cpu"



In [None]:
audio_dataset = ASVPDataset(
    annotations_file=df_10s, 
    audio_dir=ASVP_dir,
    target_sample_rate=SAMPLE_RATE, 
    num_samples=N_SAMPLES,
    device=device  
)

In [None]:
len(audio_dataset)

In [None]:
audio_dataset[133]

# Data Preprossing 

not that we are able to locate our data properly we'll create a custom Dataset object with Pytorch. This will make it easier to work with 

In [None]:
test_sample, test_label = audio_dataset[1000]
print(test_label, test_sample)

In [None]:

plot_waveform(test_sample, 16000)



In [None]:
print_stats(test_sample, sample_rate=SAMPLE_RATE)

In [None]:
n_fft = 1024
win_length = None
hop_length = 512

# define transformation
spectrogram = torchaudio.transforms.Spectrogram(
    n_fft=n_fft,
    win_length=win_length,
    hop_length=hop_length,
    center=True,
    pad_mode="reflect",
    power=2.0,
)
# Perform transformation
spec = spectrogram(test_sample)

print_stats(spec)
plot_spectrogram(spec[0], title='torchaudio')


# Split data into Train, Test and Validation sets 

In [None]:
total_count = len(audio_dataset)
train_count = int(0.7 * total_count)
valid_count = int(0.2 * total_count)
test_count = total_count - train_count - valid_count
print('Train count: ' + str(train_count))
print('validation count: ' + str(valid_count))
print('Test count: ' + str(test_count))

In [None]:
train_dataset, valid_dataset, test_dataset = torch.utils.data.random_split(audio_dataset, (train_count, valid_count, test_count))

In [None]:
print('Total number of entries in training set :', (len(train_dataset)))
print('Total number of entries in validation set :', (len(valid_dataset)))
print('Total number of entries in test set :', (len(test_dataset)))

In [None]:
labels = sorted(list(set(r[1] for r in train_dataset)))
labels


In [None]:
def label_to_index(word):
    # Return the position of the word in labels
    return torch.tensor(labels.index(word))


def index_to_label(index):
    # Return the word corresponding to the index in labels
    # This is the inverse of label_to_index
    return labels[index]

In [None]:
word_start = "neutral"
index = label_to_index(word_start)
word_recovered = index_to_label(index)

print(word_start, "-->", index, "-->", word_recovered)

In [None]:
type(index)


In [None]:
new_sample_rate = 8000
transform = torchaudio.transforms.Resample(orig_freq=16000, new_freq=new_sample_rate)
transformed = transform(test_sample)

In [None]:
one_hot = torch.nn.functional.one_hot(index, 12)
one_hot

In [None]:
def pad_sequence(batch):
    # Make all tensor in a batch the same length by padding with zeros
    batch = [item.t() for item in batch]
    batch = torch.nn.utils.rnn.pad_sequence(batch, batch_first=True, padding_value=0.)
    return batch.permute(0, 2, 1)

def collate_fn(batch):

    # A data tuple has the form:
    # waveform, sample_rate, label, speaker_id, utterance_number

    tensors, targets = [], []

    # Gather in lists, and encode labels as indices
    for waveform, label in batch:
        tensors += [waveform]
        targets += [label_to_index(label)]

    # Group the list of tensors into a batched tensor
    tensors = pad_sequence(tensors)
    targets = torch.stack(targets)

    return tensors, targets

batch_size = 256
if device == "cuda":
    num_workers = 1
    pin_memory = True
else:
    num_workers = 0
    pin_memory = False

In [None]:
train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn, num_workers=num_workers, pin_memory=pin_memory)
test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False, drop_last=False, collate_fn=collate_fn, num_workers=num_workers, pin_memory=pin_memory)
validation_loader = DataLoader(dataset=valid_dataset, batch_size=batch_size, shuffle=False, drop_last=False, collate_fn=collate_fn, num_workers=num_workers, pin_memory=pin_memory)

In [None]:
train_loader

In [None]:
len(labels)

# Model 

In [None]:
# M5 model described in the following paper: 
# https://arxiv.org/pdf/1610.00087.pdf

class M5(nn.Module):
    def __init__(self, n_input=1, n_output=len(labels), stride=16, n_channel=32):
        super().__init__()
        self.conv1 = nn.Conv1d(n_input, n_channel, kernel_size=80, stride=stride)
        self.bn1 = nn.BatchNorm1d(n_channel)
        self.pool1 = nn.MaxPool1d(4)
        self.conv2 = nn.Conv1d(n_channel, n_channel, kernel_size=3)
        self.bn2 = nn.BatchNorm1d(n_channel)
        self.pool2 = nn.MaxPool1d(4)
        self.conv3 = nn.Conv1d(n_channel, 2 * n_channel, kernel_size=3)
        self.bn3 = nn.BatchNorm1d(2 * n_channel)
        self.pool3 = nn.MaxPool1d(4)
        self.conv4 = nn.Conv1d(2 * n_channel, 2 * n_channel, kernel_size=3)
        self.bn4 = nn.BatchNorm1d(2 * n_channel)
        self.pool4 = nn.MaxPool1d(4)
        self.fc1 = nn.Linear(2 * n_channel, n_output)

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(self.bn1(x))
        x = self.pool1(x)
        x = self.conv2(x)
        x = F.relu(self.bn2(x))
        x = self.pool2(x)
        x = self.conv3(x)
        x = F.relu(self.bn3(x))
        x = self.pool3(x)
        x = self.conv4(x)
        x = F.relu(self.bn4(x))
        x = self.pool4(x)
        x = F.avg_pool1d(x, x.shape[-1])
        x = x.permute(0, 2, 1)
        x = self.fc1(x)
        return F.log_softmax(x, dim=2)

In [None]:
model = M5(n_input=test_sample.shape[0], n_output=len(labels))
model.to(device)
print(model)

In [None]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)


n = count_parameters(model)
print("Number of parameters: %s" % n)

In [None]:
optimizer = optim.Adam(model.parameters(), lr=0.01, weight_decay=0.0001)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=20, gamma=0.1)  # reduce the learning after 20 epochs by a factor of 10

# train net

In [None]:

def train(model, epoch, log_interval):
    model.train()
    right = 0
    for batch_index, (data, target) in enumerate(train_loader):

        data = data.to(device)
        target = target.to(device)

        # apply transform and model on whole batch directly on device
        data = transform(data)
        output = model(data)

        pred = get_probable_idx(output)
        right += nr_of_right(pred, target)

        # negative log-likelihood for a tensor of size (batch x 1 x n_output)
        loss = F.nll_loss(output.squeeze(), target)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # print training stats
        if batch_index % log_interval == 0:
            print(f"Train Epoch: {epoch} [{batch_index * len(data)}/{len(train_loader.dataset)} ({100. * batch_index / len(train_loader):.0f}%)]\tLoss: {loss.item():.6f}\tAccuracy: {right}/{len(train_loader.dataset)} ({100. * right / len(train_loader.dataset):.0f}%)")
  
        # update progress bar
        pbar.update(pbar_update)
        # record loss
        losses_train.append(loss.item())

    acc = 100. * (right/len(train_loader.dataset))
    accuracy_train.append(acc)

In [None]:
def nr_of_right(pred, target):
    # count nr of right predictions
    return pred.squeeze().eq(target).sum().item()


def get_probable_idx(tensor):
    # find most probable wordclass index for each element in the batch
    return tensor.argmax(dim=-1)

In [None]:
def validate(model, epoch):
    #Stop training
    model.eval()
    
    right = 0
    for data, target in validation_loader:

        data = data.to(device)
        target = target.to(device)

        # apply transform and model on whole batch directly on device
        data = transform(data)
        output = model(data)

        pred = get_probable_idx(output)
        right += nr_of_right(pred, target)

        # negative log-likelihood for a tensor of size (batch x 1 x n_output)
        loss = F.nll_loss(output.squeeze(), target)

        # update progress bar
        pbar.update(pbar_update)

    print(f"\nValidation Epoch: {epoch} \tLoss: {loss.item():.6f}\tAccuracy: {right}/{len(validation_loader.dataset)} ({100. * right / len(validation_loader.dataset):.0f}%)\n")
     
    acc = 100. * right / len(validation_loader.dataset)
    accuracy_validation.append(acc)
    losses_validation.append(loss.item())





def test(model):
    #Stop training
    model.eval()
    
    right = 0
    for data, target in test_loader:

        data = data.to(device)
        target = target.to(device)

        # apply transform and model on whole batch directly on device
        data = transform(data)
        output = model(data)

        pred = get_probable_idx(output)
        right += nr_of_right(pred, target)

    print(f"\nTest set accuracy: {right}/{len(test_loader.dataset)} ({100. * right / len(test_loader.dataset):.0f}%)\n")

    return (100. * right / len(test_loader.dataset))

In [None]:
log_interval = 20
n_epoch = 20

pbar_update = 1 / (len(train_loader) + len(test_loader))
losses_train = []
losses_validation = []
accuracy_train = []
accuracy_validation = []

losses = []
# The transform needs to live on the same device as the model and the data.
transform = transform.to(device)
with tqdm(total=n_epoch) as pbar:
    for epoch in range(1, n_epoch + 1):
        train(model, epoch, log_interval)
        validate(model, epoch)
        scheduler.step()

In [None]:
# Plot training loss
plt.plot(losses_train, 'b', label='Train loss')
plt.legend(loc="upper left")
plt.title("M5 model training losses over all iterations")
plt.show()

# Plot validation loss
plt.plot(losses_validation, 'r', label='Valid loss')
plt.legend(loc="upper left")
plt.title("M5 model validation losses over validation epochs")
plt.show()

# Plot accuracy
plt.plot(accuracy_train, 'b', label='Train acc')
plt.plot(accuracy_validation,'r', label ='Valid acc')
plt.legend(loc="upper left")
plt.title("M5 model training & validation accuracy over epochs")
plt.show()


In [None]:
def predict(tensor):
    # Use the model to predict the label of the waveform
    tensor = tensor.to(device)
    tensor = transform(tensor)
    tensor = model(tensor.unsqueeze(0))
    tensor = get_likely_index(tensor)
    tensor = index_to_label(tensor.squeeze())
    return tensor


waveform, label = train_dataset[-1]
#ipd.Audio(waveform.numpy(), rate=sample_rate)

print(f"Expected: {label}. Predicted: {predict(waveform)}.")

In [None]:
plt.plot(losses)