In [5]:
#Import Dependencies
from pylab import *
import os
import pandas as pd
import librosa
import librosa.display
import glob
import random
from IPython.display import Audio
from sklearn import tree
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler, OneHotEncoder
import matplotlib.pyplot as plt
import numpy as np
from sklearn.model_selection import train_test_split
import joblib
from sklearn.metrics import classification_report
import seaborn as sns
from sklearn.metrics import confusion_matrix, classification_report
from sklearn.linear_model import LinearRegression
from sklearn.tree import DecisionTreeClassifier 
from sklearn.ensemble import RandomForestClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn import svm, metrics
import tensorflow
from sklearn.metrics import accuracy_score
from sklearn.naive_bayes import MultinomialNB
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Flatten, Dropout,Conv1D,MaxPooling1D, BatchNormalization
from tensorflow.keras.regularizers import l2
import h5py
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.callbacks import Callback
from sklearn.metrics import f1_score
from keras.callbacks import ModelCheckpoint
import tensorflow as tf
from keras.models import load_model

In [6]:
class SpeechEmotion():
    def get_data(self, data_dir, name_dir):
        ##Get list of Files in folder
        file_name = []

        for root, dirs, files in os.walk(data_dir):
            for filename in files:
                file_name.append(filename)

        file_name_df = pd.DataFrame(file_name, columns = ['file_name'])
        file_name_df['encoding']= file_name_df['file_name']
        file_name_df['ID'] = file_name_df.index 
        file_name_df['dir']= name_dir
        return file_name_df
    
    def add_gender(self, encoding_df):
        """
        1. Added a new column - Gender
        2. Populated Gender column after stripping .wav from 'Actor'
        3. Convert data type from obj to string then to int which is 
           needed to determine odd or even
        """
        encoding_df['Actor'] = encoding_df['Actor'].map(lambda actor: actor.rstrip('.wav'))
        encoding_df['Gender'] = 'Gender'
        encoding_df['Actor'].astype(str).astype(int)

        encoding_df
        """
        1. Loop through each row in dataframe 'Actor' column
        2. Set value of Gender to Female if Actor value is even or Male if odd
        """
        for index, row in encoding_df.iterrows():
            if int(row[6]) % 2 == 0:
                row['Gender'] = '1'
            else:
                row['Gender'] = '0'
        return encoding_df
    
    def fix_ravdass_data(self, file_name_df, flag_gender = False):
        #Create DF seperating Values
        # Filename identifiers
        # Modality (01 = full-AV, 02 = video-only, 03 = audio-only).
        # Vocal channel (01 = speech, 02 = song).
        # Emotion (01 = neutral, 02 = calm, 03 = happy, 04 = sad, 05 = angry, 06 = fearful, 07 = disgust, 08 = surprised).
        # Emotional intensity (01 = normal, 02 = strong). NOTE: There is no strong intensity for the 'neutral' emotion.
        # Statement (01 = "Kids are talking by the door", 02 = "Dogs are sitting by the door").
        # Repetition (01 = 1st repetition, 02 = 2nd repetition).
        # Actor (01 to 24. Odd numbered actors are male, even numbered actors are female).
        
        encoding_df = file_name_df["encoding"].str.split("-", n=-1, expand=True)
        encoding_df.columns = ['Modality','Vocal_channel','Class','Intensity','Statement','Repetition','Actor']
        
        if flag_gender == True:
            encoding_df = self.add_gender(encoding_df)
        final_df = encoding_df.drop(columns=['Modality','Vocal_channel','Intensity','Statement','Repetition','Actor'])
        #assign emotion value numbers
        final_df.loc[final_df.Class == '01', 'Emotion'] = 'neutral'
        final_df.loc[final_df.Class == '02', 'Emotion'] = 'calm'
        final_df.loc[final_df.Class == '03', 'Emotion'] = 'happy'
        final_df.loc[final_df.Class == '04', 'Emotion'] = 'sad'
        final_df.loc[final_df.Class == '05', 'Emotion'] = 'angry'
        final_df.loc[final_df.Class == '06', 'Emotion'] = 'fearful'
        final_df.loc[final_df.Class == '07', 'Emotion'] = 'disgust'
        final_df.loc[final_df.Class == '08', 'Emotion'] = 'surprise'

        #merge data frame
        pre_merged_df = file_name_df.join(final_df, how='outer')
        merged_df = pre_merged_df.drop(columns=['encoding'])
        merged_df.set_index('ID')
        
        # drop nan from dataset
        ddf = merged_df.dropna()
        return ddf
    
    def fix_toronto_data(self, t_file_name_df):
        t_encoding_df = t_file_name_df["encoding"].str.split("_", n=-1, expand=True)
        t_encoding_df.columns = ['Actor','Word','Emotion']
        t_final_df = t_encoding_df.drop(columns=['Actor','Word'])
        t_final_df = t_final_df['Emotion'].str.strip('.wav')
        toronto_df = pd.DataFrame(t_final_df)
        
        #Fix odd values in file and convert Nuetral class to calm
        toronto_df.loc[toronto_df.Emotion == 'ps', 'Emotion'] = 'surprise'
        toronto_df.loc[toronto_df.Emotion == 'ngry', 'Emotion'] = 'angry'
        toronto_df.loc[toronto_df.Emotion == 'fear', 'Emotion'] = 'fearful'
        #assign emotion value numbers
        toronto_df.loc[toronto_df.Emotion == 'neutral', 'Class'] = '01'
        toronto_df.loc[toronto_df.Emotion == 'calm', 'Class'] = '02'
        toronto_df.loc[toronto_df.Emotion == 'happy', 'Class'] = '03'
        toronto_df.loc[toronto_df.Emotion == 'sad', 'Class'] = '04'
        toronto_df.loc[toronto_df.Emotion == 'angry', 'Class'] = '05'
        toronto_df.loc[toronto_df.Emotion == 'fearful', 'Class'] = '06'
        toronto_df.loc[toronto_df.Emotion == 'disgust', 'Class'] = '07'
        toronto_df.loc[toronto_df.Emotion == 'surprise', 'Class'] = '08'
        
        #merge data frame and export CSV
        t_pre_merged_df = t_file_name_df.join(toronto_df, how='outer')
        t_merged_df = t_pre_merged_df.drop(columns=['encoding'])
        t_merged_df = t_merged_df[['file_name', 'dir', 'Class','Emotion','ID']]
        t_merged_df.set_index('ID')
        
        ddf2 = t_merged_df.dropna()
        return ddf2
    
    def conact_dataset_and_reset_index(self, ddf, ddf2):
        dataset_df = pd.concat([ddf, ddf2])
        dataset2 = dataset_df.reset_index(drop=True)
        dataset2['ID']= dataset2.index
        final_dataset_df = dataset2
        return final_dataset_df
    
    def save_csv(self, path, df, index_file):
        df.to_csv(path, index = index_file)
     
    def read_csv(self, path):
        return pd.read_csv(path)
        
    def parser(self,row,flag_gender = False):
       # function to load files and extract features
       file_name = os.path.join(os.path.abspath(('data')), str(row.dir), str(row.file_name))
       # handle exception to check if there isn't a file which is corrupted
       try:
          # extraction
          X, sample_rate = librosa.load(file_name, res_type='kaiser_fast') 
          # extract mfcc data
          mfccs = np.mean(librosa.feature.mfcc(y=X, sr=sample_rate, n_mfcc=128).T,axis=0) 
       except Exception as e:
          print("Error encountered while parsing file: ", file_name)
          return None, None, None
    
       feature = mfccs
       if flag_gender == True:
           label = row.Gender
       else:
           label = row.Class
       emotion = row.Emotion
       return [feature, label, emotion]



In [7]:
class Models():
    def get_confusion_matrix(self, y_test_clf, y_pred_clf, exp):
        cm = confusion_matrix(y_test_clf, y_pred_clf)
        plt.figure(figsize = (12, 10))
        cm = pd.DataFrame(cm , index = [i for i in exp] , columns = [i for i in exp])
        sns.heatmap(cm, linecolor='white', cmap='Blues', linewidth=1, annot=True, fmt='')
        plt.title('Confusion Matrix', size=20)
        plt.xlabel('Predicted Labels', size=14)
        plt.ylabel('Actual Labels', size=14)
        plt.show()
      
    def convert_data_to_scaler(self, X_train, X_test, name_file):
        X_scaler = StandardScaler().fit(X_train)
        joblib.dump(X_scaler, name_file)
        X_train_scaled = X_scaler.transform(X_train)
        X_test_scaled = X_scaler.transform(X_test)
        return X_train_scaled, X_test_scaled
    
    def random_forest_classifier(self, X_train_scaled, y_train):
        # Create Random Forest classifer object
        rf = RandomForestClassifier(n_estimators=200, random_state=0)
        # Train andom Forest Classifer
        rf = rf.fit(X_train_scaled, y_train)
        return rf
    
    def decision_tree_classifier(self, X_train_scaled, y_train):
        # Create Decision Tree classifer object
        dt = DecisionTreeClassifier(criterion = 'entropy', random_state = 0)
        # Train Decision Tree Classifer
        dt = dt.fit(X_train_scaled, y_train)
        return dt
    
    def knn_classifier(self, X_train_scaled, y_train):
        # Create KNN classifer object
        knn = KNeighborsClassifier(n_neighbors = 8)
        # Train KNN Classifer
        knn = knn.fit(X_train_scaled, y_train)
        return knn

    def svc_classifier(self, X_train_scaled, y_train):
        # Create SVC classifer object
        svc = svm.SVC(probability = True, random_state=0)
        # Train SVM Classifer
        svc.fit(X_train_scaled, y_train)
        return svc
    
    def model_evaluation(self, clf, X_test_scaled, y_test, exp):
        clf.score(X_test_scaled, y_test)
        predictions = clf.predict(X_test_scaled)
        print("Accuracy Classifier: {}%".format(round(accuracy_score(y_test, predictions)*100,2)))
        print(classification_report(y_test, predictions))
        self.get_confusion_matrix(y_test, predictions, exp)
        
    def save_classifier(self, clf, name_clf):
        joblib.dump(clf, name_clf)
        
        
    def one_hot(self, y_train, y_test):
        # As this is a multiclass classification problem onehotencoding our Y.
        encoder = OneHotEncoder()
        y_train_oneHot  = encoder.fit_transform(np.array(y_train).reshape(-1,1)).toarray()
        y_test_oneHot  = encoder.fit_transform(np.array(y_test).reshape(-1,1)).toarray()
        return encoder, y_train_oneHot, y_test_oneHot

    def get_model(self, y_train_oneHot, X_train_scaled, flag_gender = False):
        model = Sequential()
        model.add(Dense(units=256, activation='relu', input_dim=X_train_scaled.shape[1], kernel_regularizer=l2(0.05)))
        model.add(Dense(units=256, activation='relu', kernel_regularizer=l2(0.05)))
        model.add(Dropout(0.4))
        if flag_gender == True:
            model.add(Dense(units=y_train_oneHot.shape[1], activation='sigmoid'))
        else:
            model.add(Dense(units=y_train_oneHot.shape[1], activation='softmax'))
        # Compile the model
        opt =  tensorflow.keras.optimizers.RMSprop(lr=0.0001)
        model.compile(loss='categorical_crossentropy', optimizer=opt, metrics=['accuracy'])
        model.summary()
        return model

    def fit_model(self, model, X_train_scaled, y_train_oneHot, X_test_scaled,y_test_oneHot, all):
        # Fit the model to the training data
        history = model.fit(X_train_scaled,y_train_oneHot, epochs=140, validation_data=(X_test_scaled,y_test_oneHot),
                            verbose=1, callbacks= all)
        return history

    def show_loss_and_accuracy(self,model, history, X_test_scaled, y_test_oneHot):
        print("Accuracy of our model on test data : " , model.evaluate(X_test_scaled,y_test_oneHot)[1]*100 , "%")
        epochs = [i for i in range(140)]
        fig , ax = plt.subplots(1,2)
        train_acc = history['accuracy']
        train_loss = history['loss']
        test_acc = history['val_accuracy']
        test_loss = history['val_loss']

        fig.set_size_inches(20,6)
        ax[0].plot(epochs , train_loss , label = 'Training Loss')
        ax[0].plot(epochs , test_loss , label = 'Testing Loss')
        ax[0].set_title('Training & Testing Loss')
        ax[0].legend()
        ax[0].set_xlabel("Epochs")

        ax[1].plot(epochs , train_acc , label = 'Training Accuracy')
        ax[1].plot(epochs , test_acc , label = 'Testing Accuracy')
        ax[1].set_title('Training & Testing Accuracy')
        ax[1].legend()
        ax[1].set_xlabel("Epochs")
        plt.show()

    def save_model_and_history(self, model, history, name_model, name_history):
        model.save(name_model)
        np.save(name_history,history.history)
        
    def load_model_and_history(self, name_model, name_history):
        model = load_model(name_model)
        history=np.load(name_history,allow_pickle='TRUE').item()
        return model, history 
    
    def test_predict(self,model, encoder, X_test_scaled, y_test_oneHot):
        # predicting on test data.
        pred_test = model.predict(X_test_scaled)
        y_pred = encoder.inverse_transform(pred_test)
        y_test = encoder.inverse_transform(y_test_oneHot)
        df = pd.DataFrame(columns=['Predicted Labels', 'Actual Labels'])
        df['Predicted Labels'] = y_pred.flatten()
        df['Actual Labels'] = y_test.flatten()
        return df, y_pred, y_test

In [8]:
class F1(Callback):

    def __init__(self,x_train, y_train,x_test, y_test):
        self.x = x_train
        self.y = y_train
        self.x_val = x_test
        self.y_val = y_test
    
    def on_epoch_end(self, epoch, logs={}):

        val_predict = (np.asarray(self.model.predict(self.x_val))).round()
        val_targ = self.y_val
        _val_f1 = round(f1_score(val_targ, val_predict, average= 'micro'), 4)
        print('f1_score:{}'.format(_val_f1))
