In [90]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import librosa # audio processing
from IPython.display import Audio # playing audio
from matplotlib import pyplot as plt # plots
import librosa.display

!pip install noisereduce
import noisereduce as nr

from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix

import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import seaborn as sns


import os
#for dirname, _, filenames in os.walk('/kaggle/input/speech-emotion-recognition-en/Crema'):
#    for filename in filenames:
#        print(os.path.join(dirname, filename))


# Load Dataset

In [2]:
data = [] # the audio signal
label = [] # the sentiment (for classification)
meta = [] # metadata (actor_sentence_sentiment_pitch)
sampling_rate = 18000 # all of them should have the same sampling rate

In [3]:
def play_plot(index):
    print(meta[index])
    Audio(data=data[index], rate=sampling_rate)
    #fig, ax = plt.subplots(nrows=3, sharex=True)
    #librosa.display.waveshow(data[index], sr=sampling_rate, ax=ax[0])

In [4]:
def adjust_length(time_series_list, length):
    n = len(time_series_list)
    for i in range(n):
        audio_length = len(time_series_list[i])
        if audio_length < length:
            time_series_list[i] = np.append(time_series_list[i], [0 for i in range(length-audio_length)])
        else:
            time_series_list[i] = np.array(time_series_list[i][:length])

In [5]:
def check_for_nan(l):
    for x in l:
        if str(x) == 'nan':
            return True
    return False

In [6]:
emotions_dict = dict()
emotions_dict['SAD'] = 0
emotions_dict['ANG'] = 1
emotions_dict['DIS'] = 2
emotions_dict['FEA'] = 3
emotions_dict['HAP'] = 4
emotions_dict['NEU'] = 5

In [7]:
'''Load to lists.. takes too long, run it just once''' 
length_sum = 0
list_a = []
list_b = []
for dirname, _, filenames in os.walk('/kaggle/input/speech-emotion-recognition-en/Crema'):
    for filename in filenames:
        meta.append(filename[:-4])
        full_filename = os.path.join(dirname, filename)
        sentiment = filename.split('_')[2]
        label.append(emotions_dict[sentiment])
        signal, sr = librosa.load(full_filename, sr = sampling_rate)
        reduced_noise = nr.reduce_noise(y=signal, sr=sampling_rate)
        if not check_for_nan(reduced_noise):
            signal = reduced_noise
        data.append(signal)
        length_sum += len(signal)
        if (len(data)%100 == 0):
            print(len(data), " audio loaded")
n = len(data)
adjust_length(data, 3*sampling_rate)
data = np.array(data)

## Display metadata, play audio and plot waveform

In [8]:
index = 5

In [9]:
Audio(data=data[index], rate=sampling_rate)

In [10]:
fig = plt.figure(figsize=(15,5))
fig.suptitle(meta[index], fontsize=15)
librosa.display.waveshow(data[index], sr=sampling_rate)

## Feature Extraction

In [11]:
def feature_extraction_1D(data):

    # Zero Crossing rate
    features = librosa.feature.zero_crossing_rate(y=data)

    # Energy
    features = np.append(features, librosa.feature.rms(y=data), axis=1)

    # Mel-frequency cepstral coefficient
    l = np.mean(librosa.feature.mfcc(y=data, sr=sampling_rate, n_mfcc=13), axis=0).reshape(1, 106)
    features = np.append(features, l, axis=1)
    
    # Spectral Centroid
    features = np.append(features, librosa.feature.spectral_centroid(y=data, sr=sampling_rate), axis=1)
    
    # Spectral Bandwidth
    features = np.append(features, librosa.feature.spectral_bandwidth(y=data, sr=sampling_rate), axis=1)
    
    # Spectral Flatness
    features = np.append(features, librosa.feature.spectral_flatness(y=data), axis=1)
    
    # Spectral Rolloff maximum frequencies
    features = np.append(features, librosa.feature.spectral_rolloff(y=data, sr=sampling_rate), axis=1)
    
    # Spectral Rolloff minimum frequencies
    features = np.append(features, librosa.feature.spectral_rolloff(y=data, sr=sampling_rate, roll_percent=0.01), axis=1)
    
    return np.array(features)

In [12]:
data_features_extracted_1D = []
for i in range(n):
    data_features_extracted_1D.append(np.squeeze(np.append(feature_extraction_1D(data[i]), label[i])))
    if (len(data_features_extracted_1D)%100 == 0):
            print(len(data_features_extracted_1D), " entry processed")
data_features_extracted_1D = np.array(data_features_extracted_1D)
print(data_features_extracted_1D.shape)

# Split Data

In [42]:
def split_1D(x,y):
    x_train, x_test, y_train, y_test = train_test_split(x, y, test_size = 0.3, random_state =1, stratify = y)
    return x_train, x_test, y_train, y_test

In [43]:
x_train, x_test, y_train, y_test = split_1D(data_features_extracted_1D, label)

In [44]:
x_train.shape

In [None]:
x.squeeze

In [81]:
class AudioDataset(Dataset):
    def __init__(self, x, y):
        self.x = torch.FloatTensor(x)
        self.y = torch.tensor(y)
        
    def __len__(self):
        return len(self.x)
    
    def __getitem__(self, idx):
        return self.x[idx,:], self.y[idx]
    
train_ds = AudioDataset(x_train, y_train)
test_ds = AudioDataset(x_test, y_test)
train_dl = DataLoader(train_ds, batch_size = 100,shuffle = True)
test_dl = DataLoader(test_ds, batch_size = 100,shuffle = False)


In [78]:
class CNN(nn.Module):
    def __init__(self):
        super(CNN,self).__init__()
        self.layer1 =self.audio() 
        self.layer2=self.linear()
        
    def audio(self):
        layer=nn.Sequential(  
            nn.Conv1d(in_channels=1,out_channels=64,kernel_size=3,padding=1),nn.ReLU(),
            nn.MaxPool1d(kernel_size=2,stride=3),
            nn.Conv1d(in_channels=64,out_channels=64,kernel_size=3,padding=1),nn.ReLU(),
            nn.MaxPool1d(kernel_size=2,stride=2),
            nn.Conv1d(in_channels=64,out_channels=128,kernel_size=3,padding=1),nn.ReLU(),
            nn.Conv1d(in_channels=128,out_channels=128,kernel_size=3,padding=1),nn.ReLU(),
            nn.MaxPool1d(kernel_size=2,stride=2),
            nn.Conv1d(in_channels=128,out_channels=256,kernel_size=3,padding=1),nn.ReLU(),
            nn.Conv1d(in_channels=256,out_channels=256,kernel_size=3,padding=1),nn.ReLU(),
            nn.MaxPool1d(kernel_size=2,stride=2),
            nn.Conv1d(in_channels=256,out_channels=512,kernel_size=3,padding=1),nn.ReLU(),
            nn.Conv1d(in_channels=512,out_channels=512,kernel_size=3,padding=1),nn.ReLU(),
            nn.MaxPool1d(kernel_size=2,stride=2),
            nn.Conv1d(in_channels=512,out_channels=512,kernel_size=3,padding=1),nn.ReLU(),
            nn.MaxPool1d(kernel_size=2,stride=2)
        )
        return layer
    def linear(self):
        layer=nn.Sequential(
                nn.Flatten(),
                nn.Linear(4096,2048),nn.ReLU(),
                nn.Dropout(0.5),
                nn.Linear(2048,6)
                )
        return layer
        
    def forward(self,x):
        x=self.layer1(x)
        x=self.layer2(x)
        return x



# CNN Model

In [91]:
def train_one_epoch(model, optimizer, train_dl):
    device = "cuda" if torch.cuda.is_available else "cpu"
    train_loss = 0
    for X, y in train_dl:
        model.train()
        X = X.unsqueeze(1).to(device)
        y = y.to(device)
        y_pred = model(X)
        loss = F.cross_entropy(y_pred, y)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        train_loss += loss.item() * X.size(0)
        torch.cuda.empty_cache()
    train_loss = train_loss / len(train_dl.dataset)
    return train_loss


def test(model, test_dl):
    device = "cuda" if torch.cuda.is_available else "cpu"
    test_loss = 0
    accuracy = 0
    for X, y in test_dl:
        X = X.unsqueeze(1).to(device)
        y = y.to(device)
        model.eval()
        y_pred = model(X)
        loss = F.cross_entropy(y_pred, y)

        test_loss += loss.item() * X.size(0)
        accuracy += sum(y_pred.argmax(dim=1) == y)
        torch.cuda.empty_cache()
    # calculate accuracy and loss
    test_loss = test_loss / len(test_dl.dataset)
    accuracy = accuracy / len(test_dl.dataset)
    return test_loss, accuracy.item()


def train_loop(model, optimizer, train_dl, test_dl, epoch):
    for i in range(epoch):
        train_loss = train_one_epoch(model, optimizer, train_dl)
        test_loss, test_acc = test(model, test_dl)
        print(
            f"""train loss:{round(train_loss, 3)}, test loss: {round(test_loss, 3)}, test acc: {round(test_acc, 3)}""")

In [102]:
model = CNN().cuda()
optimizer=torch.optim.Adam(model.parameters(),lr=0.00005,weight_decay=5e-3) 
train_loop(model,optimizer,train_dl,test_dl,30)

## RESNET

In [132]:
class ResNetblock(nn.Module):
    def __init__(self,input_channel,out_channel,stride=1, convx=False):
        super(ResNetblock,self).__init__()
        self.C1 = nn.Conv1d(input_channel,out_channel,kernel_size=3,padding=1,stride=stride)
        self.B1 = nn.BatchNorm1d(out_channel)
        self.relu = nn.ReLU()
        self.C2 = nn.Conv1d(out_channel,out_channel,kernel_size=3,padding=1)
        self.B2 = nn.BatchNorm1d(out_channel)

        if convx:
            self.CX = nn.Conv1d(input_channel,out_channel,kernel_size=1,stride=stride)
        else:
            self.CX=None

    def forward(self,X):
        Y = self.relu(self.B1(self.C1(X)))
        Y = self.B2(self.C2(Y))
        if self.CX:
            X = self.CX(X)
        Y+=X
        return F.relu(Y)

class ResNet(nn.Module):
    def __init__(self):
        super(ResNet,self).__init__()
        self.b1 = self.head_(1)
        self.b2 = nn.Sequential(*self.resnetblock_(64, 64, 2, first=True))
        self.b3 = nn.Sequential(*self.resnetblock_(64, 128, 2))
        self.b4 = nn.Sequential(*self.resnetblock_(128, 256, 2))
        self.b5 = nn.Sequential(*self.resnetblock_(256, 256, 2))
        self.b6 = nn.Sequential(*self.resnetblock_(256, 512, 2))
        self.connect = nn.Sequential(nn.Flatten(),
                                     nn.Linear(4608,512),nn.ReLU(),
                                     nn.Dropout(0.5),
                                     nn.Linear(512,6)
                                    )
                        
    def forward(self,X):
        X=self.b1(X)
        X=self.b2(X)
        X=self.b3(X)
        X=self.b4(X)
        X=self.b5(X)
        X=self.b6(X)
        X=self.connect(X)
        return X
        
        
        
        
    def head_(self,input_channel):
        head = nn.Sequential(nn.Conv1d(input_channel,64,kernel_size=7,stride=3,padding=3),
                               nn.BatchNorm1d(64),
                               nn.MaxPool1d(kernel_size=3,padding=1,stride=2))
        return head

    def resnetblock_(self,input_channel,output_channel,num_res,first = False):
        block=[]
        for i in range(num_res):
            if i==0 and not first:
                block.append(ResNetblock(input_channel,output_channel,stride=2,convx=True))
            else:
                block.append(ResNetblock(output_channel,output_channel))
        return block




In [139]:
model = ResNet().cuda()
optimizer=torch.optim.Adam(model.parameters(),lr=0.00005,weight_decay=5e-3) 
train_loop(model,optimizer,train_dl,test_dl,10)