In [None]:
import matplotlib.pyplot as plt
import numpy as np
import wave
import sys
import os
import librosa as lib
import librosa.display
from IPython.display import Audio
from sklearn.model_selection import train_test_split

import keras
import tensorflow as tf
from keras.callbacks import ReduceLROnPlateau
from keras.models import Sequential
from keras.layers import Dense, Conv1D, MaxPooling1D, Flatten, Dropout, BatchNormalization, Conv2D, MaxPooling2D, LeakyReLU
from keras.utils import np_utils
from tensorflow.keras.utils import to_categorical
from keras.callbacks import ModelCheckpoint

from sklearn.metrics import confusion_matrix, classification_report

## Initialize some variables

In [None]:
samplerate = 22050
folderPath="../input/speech-emotion-recognition-en/Crema"
crema_emotions = {
    'NEU':0,
    'HAP':1,
    'SAD':2,
    'ANG':3,
    'FEA':4,
    'DIS':5}

## Load audio data and adjust its length

In [None]:
def loadAudio(folderPath):
    X=[]
    labels=[]
    audioFiles = sorted(os.listdir(folderPath))
    progress = 0
    xAvg = 0
    n = len(audioFiles)
    for audio in audioFiles:
        label = [0]*6  # we have 6 classes only 1 of them will be 1
        audioPath = os.path.join(folderPath ,audio)
        data,samplerate = lib.load(audioPath) #default sample rate = 22050 ----> data = 22050*T (audio duration)
        label[crema_emotions[audio.split("_")[2]]] = 1
        labels.append(label)
        X.append(data)
        xAvg += float(len(data)/n)
        
        if progress%500==0 :
            print(int(100*progress/len(audioFiles)), "%")
        progress+=1
    
    return audioFiles, X, labels, int(xAvg)  

In [None]:
audioFiles, X , labels , xAvg = loadAudio(folderPath)

In [None]:
def adjustLength(X , xAvg):
    # we want all the inputs to be the same shape
    # thats why we are going to do zeroPadding with the maximum audio length among all samples 
    # zero padding adds zeros to the end of the audio feature to compensate the miss in its shape
    xPad = []
    for data in X:
        if(len(data)<xAvg): #apply zero padding
            n = xAvg - len(data)
            xPad.append(np.pad(data, (0, n), 'constant'))
        elif(len(data)>xAvg):
            xPad.append(data[:xAvg])
        else:
            xPad.append(data)
    return xPad

In [None]:
xPad = adjustLength(X , xAvg)

## Plotting the waveplot of some of the data

In [None]:
def wavePlot(audioFiles, X, labels, index):
    librosa.display.waveshow(X[index] , sr=22050)
    plt.xlabel("Time")
    plt.ylabel("amplitude")
    plt.show()

In [None]:
wavePlot(audioFiles, X, labels, 0)
Audio(os.path.join(folderPath ,audioFiles[0]))

In [None]:
wavePlot(audioFiles, X, labels, 1)
Audio(os.path.join(folderPath ,audioFiles[1]))

In [None]:
wavePlot(audioFiles, X, labels, 2)
Audio(os.path.join(folderPath ,audioFiles[2]))

In [None]:
wavePlot(audioFiles, X, labels, 3)
Audio(os.path.join(folderPath ,audioFiles[3]))

In [None]:
wavePlot(audioFiles, X, labels, 4)
Audio(os.path.join(folderPath ,audioFiles[4]))

In [None]:
wavePlot(audioFiles, X, labels, 5)
Audio(os.path.join(folderPath ,audioFiles[5]))

In [None]:
xPad = np.array(xPad)
labels = np.array(labels)
xPad.shape , labels.shape

## Feature extraction

In [None]:
def noise(data):
    noise_amp = 0.035*np.random.uniform()*np.amax(data)
    data += noise_amp*np.random.normal(size=data.shape[0])
    return data

def pitch(data, samplerate, pitchFactor=0.7):
    return librosa.effects.pitch_shift(data, samplerate, pitchFactor)

def featureExtraction(X):
    zcr= [] # zero crossing rate
    energy = [] # root mean square "used to represent the energy or the loudness of the signal"
    melSpectro = []
    mfcc = []
    sf = [] #spectral flatness
    for data in X:
        Noise = noise(data)
        Pitch = pitch(data, samplerate)
        
        Z = lib.feature.zero_crossing_rate(y=data)
        E = lib.feature.rms(y=data)
        M = lib.feature.melspectrogram(y=data, sr=samplerate)
        MF = np.mean(lib.feature.mfcc(y=data, sr=samplerate))
        SF = lib.feature.spectral_flatness(y=data)
        
        ZN = lib.feature.zero_crossing_rate(y=Noise)
        EN = lib.feature.rms(y=Noise)
        MN = lib.feature.melspectrogram(y=Noise, sr=samplerate)
        MFN = np.mean(lib.feature.mfcc(y=Noise, sr=samplerate))
        SFN = lib.feature.spectral_flatness(y=Noise)

        
        ZP = lib.feature.zero_crossing_rate(y=Pitch)
        EP = lib.feature.rms(y=Pitch)
        MP = lib.feature.melspectrogram(y=Pitch, sr=samplerate)
        MFP = np.mean(lib.feature.mfcc(y=Pitch, sr=samplerate))
        SFP = lib.feature.spectral_flatness(y=Pitch)
        
        
        zcr.append(Z[0])
        zcr.append(ZN[0])
        zcr.append(ZP[0])
        
        energy.append(E[0])
        energy.append(EN[0])
        energy.append(EP[0])
        
        melSpectro.append(M)
        melSpectro.append(MN)
        melSpectro.append(MP)
        
        mfcc.append(MF)
        mfcc.append(MFN)
        mfcc.append(MFP)
        
        sf.append(SF[0])
        sf.append(SFN[0])
        sf.append(SFP[0])
        
    return zcr , energy , mfcc,sf, melSpectro

In [None]:
zcr,energy,mfcc,sf,melSpectro = featureExtraction(xPad)

In [None]:
zcr = np.array(zcr)
energy = np.array(energy)
melSpectro = np.array(melSpectro)
mfcc = np.array(mfcc)
mfcc = mfcc.reshape(mfcc.shape[0],1)
sf = np.array(sf)
zcrE = np.concatenate((zcr,energy,mfcc,sf) , axis=1)

In [None]:
y = []
for label in labels:
    y.append(label)
    y.append(label)
    y.append(label)
y = np.array(y)
labels = y
labels.shape , zcr.shape , energy.shape , mfcc.shape , sf.shape , melSpectro.shape

In [None]:
# reshaping data to fit to model
zcr = zcr.reshape(zcr.shape[0],zcr.shape[1],1)
energy = energy.reshape(energy.shape[0],energy.shape[1],1)
zcrE = zcrE.reshape(zcrE.shape[0],zcrE.shape[1],1)
melSpectro = melSpectro.reshape(melSpectro.shape[0],melSpectro.shape[1] ,melSpectro.shape[2] ,1)

labels.shape , zcr.shape , sf.shape, energy.shape,  zcrE.shape , mfcc.shape , melSpectro.shape 

# split to train-test-validation of (70-25-5)%

In [None]:
zcr_train, zcr_test, y_train, y_test = train_test_split(zcr, labels, test_size = 0.3, train_size = 0.7, random_state=0, shuffle=True)
zcr_val, k, y_val, l = train_test_split(zcr_train, y_train, test_size = 0.95, train_size = 0.05, random_state=0, shuffle=True)

energy_train, energy_test, y_train, y_test = train_test_split(energy, labels, test_size = 0.3, train_size = 0.7, random_state=0, shuffle=True)
energy_val, k, y_val, l = train_test_split(energy_train, y_train, test_size = 0.95, train_size = 0.05, random_state=0, shuffle=True)

melSpectro_train, melSpectro_test, y_train, y_test = train_test_split(melSpectro, labels, test_size = 0.3, train_size = 0.7, random_state=0, shuffle=True)
melSpectro_val, k, y_val, l = train_test_split(melSpectro_train, y_train, test_size = 0.95, train_size = 0.05, random_state=0, shuffle=True)

zcrE_train, zcrE_test, y_train, y_test = train_test_split(zcrE, labels, test_size = 0.3, train_size = 0.7, random_state=0, shuffle=True)
zcrE_val, k, y_val, l = train_test_split(zcrE_train, y_train, test_size = 0.95, train_size = 0.05, random_state=0, shuffle=True)

In [None]:

print(zcr_train.shape)

print(zcrE_train.shape)

print(energy_train.shape)

print(melSpectro_train.shape)

print(y_train.shape)


## 1CNN Model

In [None]:
def model1D(train_data):
    model=Sequential()
    model.add(Conv1D(512, kernel_size=5, strides=1, padding='same', activation='relu', input_shape=(train_data.shape[1], 1)))
    model.add(MaxPooling1D(pool_size=5, strides = 2, padding = 'same'))

    model.add(Conv1D(256, kernel_size=5, strides=1, padding='same', activation='relu'))
    model.add(BatchNormalization())

    model.add(MaxPooling1D(pool_size=5, strides = 2, padding = 'same'))
    model.add(Conv1D(128, kernel_size=5, strides=1, padding='same', activation='relu'))

    model.add(MaxPooling1D(pool_size=5, strides = 2, padding = 'same'))
    model.add(Dropout(0.25))

    model.add(Flatten())
    model.add(Dense(256, activation='relu'))
    model.add(Dropout(0.25))

    model.add(Dense(6, activation='softmax'))

    model.compile(
        optimizer=tf.keras.optimizers.Adam(),
        loss="categorical_crossentropy",
        metrics=['accuracy'],
    )
    model.summary()
    
    return model

In [59]:
model = model1D(zcrE_train)

history=model.fit(
    zcrE_train,
    y_train,
    batch_size=64,
    epochs=200,
    validation_data = (zcrE_val, y_val))

## Big picture

In [None]:
def Plotting(history):
    plt.plot(history.history['accuracy'])
    plt.plot(history.history['val_accuracy'])
    plt.title('model accuracy')
    plt.ylabel('accuracy')
    plt.xlabel('epoch')
    plt.legend(['train', 'validation'], loc='upper left')
    plt.show()
    # summarize history for loss
    plt.plot(history.history['loss'])
    plt.plot(history.history['val_loss'])
    plt.title('model loss')
    plt.ylabel('loss')
    plt.xlabel('epoch')
    plt.legend(['train', 'test'], loc='upper left')
    plt.show()

In [None]:
import pandas as pd
import seaborn as sns
def analysise(history,model,data_test,y_test):
    Plotting(history)
    y_pred = model.predict(data_test)
    y_pred = np.argmax(y_pred,1)
    y_act = np.argmax(y_test,1)
    print(classification_report(y_act, y_pred))
    
    cm = confusion_matrix(y_act, y_pred)
    plt.figure(figsize = (12, 10))
    cm = pd.DataFrame(cm , index = [i for i in crema_emotions] , columns = [i for i in crema_emotions])
    sns.heatmap(cm, linecolor='white', cmap='Blues', linewidth=1, annot=True, fmt='')
    plt.title('Confusion Matrix', size=20)
    plt.xlabel('Predicted Labels', size=14)
    plt.ylabel('Actual Labels', size=14)
    plt.show()

In [60]:
analysise(history , model , zcrE_test , y_test)

## 2CNN Model

In [None]:
def model2D(intput_data):
    model=Sequential()
    model.add(Conv2D(512, kernel_size=5, strides=1, padding='same', activation='relu', input_shape=(intput_data.shape[1], intput_data.shape[2], 1)))
    model.add(LeakyReLU(alpha=0.1))
    model.add(MaxPooling2D(pool_size=5, strides = 2, padding = 'same'))

    model.add(Conv2D(256, kernel_size=5, strides=1, padding='same', activation='relu'))
    model.add(LeakyReLU(alpha=0.1))
    model.add(MaxPooling2D(pool_size=5, strides = 2, padding = 'same'))

    model.add(Conv2D(128, kernel_size=5, strides=1, padding='same', activation='relu'))
    model.add(LeakyReLU(alpha=0.1))
    model.add(MaxPooling2D(pool_size=5, strides = 2, padding = 'same'))

    model.add(Conv2D(128, kernel_size=5, strides=1, padding='same', activation='relu'))
    model.add(LeakyReLU(alpha=0.1))
    model.add(MaxPooling2D(pool_size=5, strides = 3, padding = 'same'))

    model.add(Dropout(0.5))
    model.add(Flatten())
    model.add(Dense(128, activation='relu'))
    model.add(Dropout(0.5))

    model.add(Dense(6, activation='softmax'))

    model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
    loss="categorical_crossentropy",
    metrics=['accuracy'],
    )
    model.summary()
    return model


In [None]:
model = model2D(melSpectro_train)

In [58]:
history=model.fit(
    melSpectro_train,
    y_train,
    batch_size=64,
    epochs=5,
    validation_data = (melSpectro_val, y_val))

In [57]:
analysise(history , model , melSpectro_test , y_test)

## Save the trained model for later use

In [None]:
model_json = model.to_json()

In [None]:
with open("model1D.json", "w") as json_file:
    json_file.write(model_json)
# serialize weights to HDF5
model.save_weights("model1D.h5")
print("Saved model to disk")

## Re-use the model

In [None]:
json_file = open('model.json', 'r')
loaded_model_json = json_file.read()
json_file.close()
loaded_model = model_from_json(loaded_model_json)
# load weights into new model
loaded_model.load_weights("model.h5")
print("Loaded model from disk")