# **Imports**

1-Validation Important
2-Inception Module 
4-Convert melspectrogram to Image
5-Increase melspectrogram more than 20 Important
6-Scheduler
7-Checkpoint
9-Changing Architecture 
11-Balancing

In [None]:
import pandas as pd
import os
import librosa
import librosa.display as libd
import matplotlib.pyplot as plt
from sklearn.preprocessing import normalize
import warnings
warnings.filterwarnings('ignore')
import numpy as np
import pickle
import joblib
from sklearn.model_selection import train_test_split
import IPython.display as ipd
import torch
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms
from torch.optim import Adam
from tqdm import tqdm
from sklearn.metrics import f1_score
from sklearn.metrics import confusion_matrix
from sklearn.metrics import ConfusionMatrixDisplay

# **Importing Data**

In [None]:
# emotions = {'SAD' : 'sadness',
# 'ANG' : 'angry',
# 'DIS' : 'disgust',
# 'FEA' : 'fear',
# 'HAP' : 'happy',
# 'NEU' : 'neutral'}
emotions = {'SAD' : 0,
'ANG' : 1,
'DIS' : 2,
'FEA' : 3,
'HAP' : 4,
'NEU' : 5}

In [None]:
 audio_data = '../input/speech-emotion-recognition-en/Crema/1028_TSI_DIS_XX.wav'
 x , sr = librosa.load(audio_data)
 print(type(x), type(sr))
 print(x.shape, sr)
audio_files_names = os.listdir('../input/speech-emotion-recognition-en/Crema')
number_of_audio_files = len(audio_files_names)
dataset = []
labels = []
curr = 0
for i in range(20):
    # print('../input/speech-emotion-recognition-en/Crema' + '/' + audio_files_names[i])
    audio_file_emotion = audio_files_names[i].split('_')[2]
    x,_ = librosa.load('../input/speech-emotion-recognition-en/Crema' + '/' + audio_files_names[i])
    dataset.append(x)
    labels.append(emotions[audio_file_emotion])

In [None]:
df = np.array(dataset)

In [None]:
def extract_features(dataset,frame_length,hop_length):
    features_zcr = []
    features_rms = []
    for data_sample in dataset:
        zcr = librosa.feature.zero_crossing_rate(data_sample,frame_length = frame_length,hop_length = hop_length)
        print(zcr[0].shape)
        features_zcr.append(zcr[0])
        rms = librosa.feature.rms(data_sample,frame_length = frame_length,hop_length = hop_length)
        print(rms[0].shape)
        features_rms.append(rms[0])
    return np.array(features_zcr),np.array(features_rms)

In [None]:
def extract_MFCC(dataset):
    mfcc_feature_space = []
    for data_sample in dataset:
        mfccs = librosa.feature.mfcc(y=data_sample)
        print(mfccs.shape)
        mfcc_feature_space.append(np.array(mfccs))
    return (mfcc_feature_space)

In [None]:
mfcc_feature_space = extract_MFCC(dataset)
print(type(mfcc_feature_space))

In [None]:
fs_zcr,fs_rms = extract_features(df,1024,512)

In [None]:
print(fs_rms[0])

In [None]:
ipd.Audio(audio_data)

In [None]:
libd.waveshow(x,sr=sr, x_axis='time', color='cyan')

In [None]:
print(x)

In [None]:
%matplotlib inline
import matplotlib.pyplot as plt

In [None]:
n0 = 9000
n1 = 11048
plt.figure(figsize=(14, 5))
plt.plot(x[n0:n1])
plt.grid()

In [None]:
zero_crossings = librosa.zero_crossings(x[n0:n1], pad=False)
print(sum(zero_crossings))

In [None]:
zcr = librosa.feature.zero_crossing_rate(x,hop_length = 1024)

In [None]:
print(zcr.size)

# Padding mfcc_feature_space

In [None]:
max_len_mfcc = max([len(x[0]) for x in mfcc_feature_space])

mfcc_feature_space = [np.pad(x, ((0,0),(0, max_len_mfcc - (x.shape[1]))), 'constant') for x in mfcc_feature_space]


# Padding Zero Crossing Rate Feature Space

In [None]:
max_len_zcr = max([len(x) for x in fs_zcr])
fs_zcr = [np.pad(x,(0, max_len_zcr - (len(x))), 'constant') for x in fs_zcr]
fs_zcr=[x.astype('float32') for x in fs_zcr]

# Padding Energy Feature Space

In [None]:
max_len_rms = max([len(x) for x in fs_rms])
fs_rms = [np.pad(x,(0, max_len_rms - (len(x))), 'constant') for x in fs_rms]

# Splitting and Balancing the Data

In [None]:
from collections import Counter
def split_and_balance(data,labels):
  s=Counter(labels)
  x=min(s,key=s.get)
  length_if_each_list=s[x];
  list0=[]
  list1=[]
  list2=[]
  list3=[]
  list4=[]
  list5=[]
  list6=[]
  for i in range(len(labels)):
    if labels[i]==0 and len(list0)<length_if_each_list :
      list0.append(data[i])
    elif labels[i]==1 and len(list1)<length_if_each_list :
      list1.append(data[i])
    elif labels[i]==2 and len(list2)<length_if_each_list :
      list2.append(data[i])
    elif labels[i]==3 and len(list3)<length_if_each_list :
      list3.append(data[i])
    elif labels[i]==4 and len(list4)<length_if_each_list :
      list4.append(data[i])
    elif labels[i]==5 and len(list5)<length_if_each_list :
      list5.append(data[i])

  list_label0 = [ 0 for iter in range(length_if_each_list)]
  list_label1 = [ 1 for iter in range(length_if_each_list)]
  list_label2 = [ 2 for iter in range(length_if_each_list)]
  list_label3 = [ 3 for iter in range(length_if_each_list)]
  list_label4 = [ 4 for iter in range(length_if_each_list)]
  list_label5 = [ 5 for iter in range(length_if_each_list)]
    
  return list0+list1+list2+list3+list4+list5,list_label0+list_label1+list_label2+list_label3+list_label4+list_label5

In [None]:
def append_data_and_labels(data,labels):
    data_with_labels=[]
    for i in range(len(labels)) :
        data_with_labels.append([data[i],labels[i]])
    return np.array(z)

In [None]:
x_training_validation_mfcc,x_test_mfcc,y_training_validation_mfcc,y_test_mfcc = train_test_split(mfcc_feature_space,labels,test_size=0.3,random_state=70)

x_training_mfcc,x_validation_mfcc,y_training_mfcc,y_validation_mfcc= train_test_split(x_training_validation_mfcc,y_training_validation_mfcc,test_size=0.05,random_state=70)


x_training_validation_zcr,x_test_zcr,y_training_validation_zcr,y_test_zcr = train_test_split(fs_zcr,labels,test_size=0.3,random_state=70)
x_training_zcr,x_validation_zcr,y_training_zcr,y_validation_zcr= train_test_split(x_training_validation_zcr,y_training_validation_zcr,test_size=0.05,random_state=70)

x_training_validation_rms,x_test_rms,y_training_validation_rms,y_test_rms = train_test_split(fs_rms,labels,test_size=0.3,random_state=70)
x_training_rms,x_validation_rms,y_training_rms,y_validation_rms= train_test_split(x_training_validation_rms,y_training_validation_rms,test_size=0.05,random_state=70)

#x_training_mfcc,y_training_mfcc=split_and_balance(x_training_mfcc,y_training_mfcc)
#x_test_mfcc,y_test_mfcc=split_and_balance(x_test_mfcc,y_test_mfcc)
#x_validation_mfcc,y_validation_mfcc=split_and_balance(x_validation_mfcc,y_validation_mfcc)

#training_dataset_mfcc=append_data_and_labels(x_training_mfcc,y_training_mfcc)
#test_dataset_mfcc=append_data_and_labels(x_test_mfcc,y_test_mfcc)
#validation_dataset_mfcc=append_data_and_labels(x_validation_mfcc,y_validation_mfcc)


#x_training_fs,y_training_fs=split_and_balance(x_training_fs,y_training_fs)
#x_test_fs,y_test_fs=split_and_balance(x_test_fs,y_test_fs)
#x_validation_fs,y_validation_fs=split_and_balance(x_validation_fs,y_validation_fs)

#training_dataset_fs=append_data_and_labels(x_training_fs,y_training_fs)
#test_dataset_fs=append_data_and_labels(x_test_fs,y_test_fs)
#validation_dataset_fs=append_data_and_labels(x_validation_fs,y_validation_fs)

# Neural Network Architecture

In [None]:
# Define relevant variables for the ML task
batch_size = 64
num_classes = 6
learning_rate = 0.001
num_epochs = 10

# Device will determine whether to run the training on GPU or CPU.
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
# Creating a CNN class
class ConvNeuralNet2D(nn.Module):
	#  Determine what layers and their order in CNN object 
    def __init__(self, num_classes):
        super(ConvNeuralNet2D, self).__init__()
        self.conv_layer1 = nn.Conv2d(in_channels=1, out_channels=512, kernel_size=5,stride=1)
        
        self.relu1 = nn.ReLU()
        
        self.max_pool1 = nn.MaxPool2d(kernel_size = 5, stride = 2)
        
        self.conv_layer2 = nn.Conv2d(in_channels=512, out_channels=512, kernel_size=5,stride=1)
        
        self.relu2 = nn.ReLU()
        
        self.max_pool2 = nn.MaxPool2d(kernel_size = 5, stride = 2)
        
        self.conv_layer3 = nn.Conv2d(in_channels=512, out_channels=128, kernel_size=5,stride=1)
        
        self.relu3 = nn.ReLU()
        
        self.max_pool3 = nn.MaxPool2d(kernel_size = 5, stride = 2)
        
        self.fc1 = nn.LazyLinear(256)
        
        self.relu4 = nn.ReLU()
        
        self.fc2 = nn.Linear(256, num_classes)
        
        self.dropout = nn.Dropout(p=0.5,inplace=False)
        
        
    
    # Progresses data across layers    
    def forward(self, x):
        out = self.conv_layer1(x)
        
        out=  self.relu1(out)
        
        #out = self.max_pool1(out)
        
        out = self.conv_layer2(out)
        
        out=self.relu2(out)
        
        #out=self.max_pool2(out)
        
        out = self.conv_layer3(out)
        
        out=self.relu3(out)
        
        #out = self.max_pool3(out)
                
        out = out.reshape(out.size(0), -1)
        
        out= self.dropout(out)
        
        out = self.fc1(out)
        
        out = self.relu4(out)
        
        out= self.dropout(out)
        
        out = self.fc2(out)
        
        
        return out

In [None]:
# Creating a CNN class
class ConvNeuralNet1D(nn.Module):
	#  Determine what layers and their order in CNN object 
    def __init__(self, num_classes):
        super(ConvNeuralNet1D, self).__init__()
        self.conv_layer1 = nn.Conv1d(in_channels=1, out_channels=512, kernel_size=5,stride=1)
        
        self.relu1 = nn.ReLU()
        
        self.max_pool1 = nn.MaxPool1d(kernel_size = 5, stride = 2)
        
        self.conv_layer2 = nn.Conv1d(in_channels=512, out_channels=512, kernel_size=5,stride=1)
        
        self.relu2 = nn.ReLU()
        
        self.max_pool2 = nn.MaxPool1d(kernel_size = 5, stride = 2)
        
        self.conv_layer3 = nn.Conv1d(in_channels=512, out_channels=128, kernel_size=5,stride=1)
        
        self.relu3 = nn.ReLU()
        
        self.max_pool3 = nn.MaxPool1d(kernel_size = 5, stride = 2)
        
        self.fc1 = nn.LazyLinear(256)
        
        self.relu4 = nn.ReLU()
        
        self.fc2 = nn.Linear(256, num_classes)
        
        self.dropout = nn.Dropout(p=0.5,inplace=False)
        

    
    # Progresses data across layers    
    def forward(self, x):
        out = self.conv_layer1(x)
        
        out=  self.relu1(out)
        #out = self.max_pool1(out)
        
        out = self.conv_layer2(out)
        
        out=self.relu2(out)
        
        #out=self.max_pool2(out)
        
        out = self.conv_layer3(out)
        
        out=self.relu3(out)
        
        #out = self.max_pool3(out)
                
        out = out.reshape(out.size(0), -1)
        
        out= self.dropout(out)
        
        out = self.fc1(out)
        
        out = self.relu4(out)
        
        out= self.dropout(out)
        
        out = self.fc2(out)
        
       
        
        return out

# Create Dataset Class

In [None]:
class Dataset(torch.utils.data.Dataset):

    def __init__(self, data,labels):
        
        self.labels = torch.tensor(labels)
        
        self.audios = torch.tensor(data)
        self.audios=self.audios.unsqueeze(1)

    def classes(self):
        return self.labels #Return the labels
 
    def __len__(self):
        return len(self.labels)#Return the number of labels

    def get_batch_labels(self, idx):
        # Fetch a batch of labels
        return np.array(self.labels[idx])

    def get_batch_audios(self, idx):
        # Fetch a batch of inputs
        return self.audios[idx]

    def __getitem__(self, idx):

        batch_audios = self.get_batch_audios(idx)#Return a batch of labels
        batch_y = self.get_batch_labels(idx)#Return a batch of reviews

        return batch_audios, batch_y

In [None]:
def calculate_accuracy(y_test,y_predicted):
 
    no_correct_samples=(y_predicted==y_test).sum().float()

    return no_correct_samples

In [None]:
def calculate_metrics(y_true,y_pred,labels):
    x=f1_score(y_true,y_pred,average="weighted")
    print("Weigted Average F-Score is ",x)
    ConfusionMatrixDisplay.from_predictions(y_true, y_pred,labels=[0,1,2,3,4,5],display_labels=labels)
   

In [None]:
def train(model,train,val):
    
    # Set Loss function with criterion
    criterion = nn.CrossEntropyLoss()
    # Set optimizer with optimizer
    optimizer = Adam(model.parameters(), lr= learning_rate)
    train_dataloader = torch.utils.data.DataLoader(train, batch_size=batch_size, shuffle=True)#While training a model,pass samples in “minibatches”, reshuffle the data at every epoch to reduce model overfitting
    val_dataloader = torch.utils.data.DataLoader(val, batch_size=batch_size)
    

    # We use the pre-defined number of epochs to determine how many iterations to train the network on
    for epoch in range(num_epochs):
        total_acc_train = 0
        total_loss_train = 0
        #Load in the data in batches using the train_loader object
        for (audios, labels) in tqdm(train_dataloader):  
            # Move tensors to the configured device
            audios = audios.to(device)
            labels = labels.to(device)

            # Forward pass
            outputs = model(audios)
          
            outputs=torch.nn.functional.softmax(outputs)
            

            loss = criterion(outputs, labels)

            total_loss_train += loss.item()

            outputs = torch.argmax(outputs,dim=1)

            acc = calculate_accuracy(labels,outputs)

            total_acc_train += acc

            # Backward and optimize
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()


        total_acc_val = 0
        total_loss_val = 0

        with torch.no_grad():# Disable gradient calculation.
        #Model is being validated so there is no need to calculate gradients. It will reduce memory consumption for computations that would otherwise have requires_grad=True.

            for val_audios, val_labels in val_dataloader:
                val_labels = val_labels.to(device)
                val_audios = val_audios.to(device)
                outputs = model(val_audios)
                
                outputs=torch.nn.functional.softmax(outputs)
                
                loss = criterion(outputs, val_labels)
                total_loss_val += loss.item()
                outputs = torch.argmax(outputs,dim=1)
                acc = calculate_accuracy(val_labels,outputs)
                total_acc_val += acc

        print(
            f'Epochs: {epoch + 1} | Train Loss: {total_loss_train / len(train): .3f} \
            | Train Accuracy: {total_acc_train / len(train): .3f} \
            | Val Loss: {total_loss_val / len(val): .3f} \
            | Val Accuracy: {total_acc_val / len(val): .3f}')



# Training Melspectrogram

In [None]:
model=ConvNeuralNet2D(num_classes)
train_data=Dataset(x_training_mfcc,y_training_mfcc)
val=Dataset(x_validation_mfcc,y_validation_mfcc)            
train(model,train_data,val)

# Training Zero Crossing Rate

In [None]:
model=ConvNeuralNet1D(num_classes)
train_data=Dataset(x_training_rms,y_training_rms)
val=Dataset(x_validation_rms,y_validation_rms)            
train(model,train_data,val)

# Training Energy

In [None]:
model=ConvNeuralNet1D(num_classes)
train_data=Dataset(x_training_zcr,y_training_zcr)
val=Dataset(x_validation_zcr,y_validation_zcr)            
train(model,train_data,val)

In [None]:
def evaluate(model,test_data,test_labels):

    test = Dataset(test_data,test_labels)

    test_dataloader = torch.utils.data.DataLoader(test, batch_size=batch_size)


    total_acc_test = 0
    y_pred = []
    y_pred=torch.tensor(y_pred)
    y_pred=y_pred.to(device)
    
    with torch.no_grad():

        for test_input, test_label in test_dataloader:

            test_label = test_label.to(device)
            test_input = test_input.to(device)
            
            outputs = model(test_input)
            outputs=torch.nn.functional.softmax(outputs)
            outputs = torch.argmax(outputs,dim=1)
            y_pred=torch.cat((y_pred,outputs))
            
            acc = calculate_accuracy(test_label,outputs)
              
            total_acc_test += acc
        
            
            print(f'Test Accuracy: {total_acc_test / len(test_data): .3f}')
        
        labels=emotions = ['SAD','ANG','DIS','FEA','HAP','NEU']
        calculate_metrics(test_labels,y_pred,labels)

#         test_data['sentiment'].replace({'positive':1,'negative':0},inplace=True)
#         y_test=test_data['sentiment'].to_numpy()
#         y_pred=torch.flatten(y_pred)
#         y_pred=y_pred.detach().cpu().numpy()
#         y_pred=np.where(y_pred > 0.5, 1, 0)
#         classes=['negative','positive']
        
        
#         net_classification_report=classification_report(y_test,y_pred,target_names=classes)    
#         print(net_classification_report)
#         conf_mat=confusion_matrix(y_test,y_pred)

        
#         disp=ConfusionMatrixDisplay(confusion_matrix=conf_mat,display_labels=classes)
#         disp.plot()
#         plt.show()

#         gc.collect()
    

# #Load Best Model
# PATH='model.pt'
# checkpoint = torch.load(PATH)
# model.load_state_dict(checkpoint['model_state_dict'])
# #optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
# epoch = checkpoint['epoch']
# loss = checkpoint['loss']
# #print the best model parameters







# Testing On Melspectrogram

In [None]:
model=ConvNeuralNet2D(num_classes)
evaluate(model,x_test_mfcc,y_test_mfcc)

# Testing On Zero Crossing Rate

In [None]:
model=ConvNeuralNet1D(num_classes)
evaluate(model,x_test_zcr,y_test_zcr)

# Testing On Energy

In [None]:
model=ConvNeuralNet1D(num_classes)
evaluate(model,x_test_rms,y_test_rms)