In [40]:
import sys

class Model(object):

    def __init__(self, save_path=None, name='Not Specified', **params):
       
        # Place holder for model
        self.model = None
        # Place holder on where to save the model
        self.save_path = save_path
        # Place holder for name of the model
        self.name = name
        # Model has been trained or not
        self.trained = False

    def train(self, x_train, y_train, x_val=None, y_val=None):
       
        self.model.fit(x_train, y_train)
        self.trained = True
        if self.save_path:
            self.save_model()

    def predict(self, data):
       
        if not self.trained:
            sys.stderr.write("Model should be trained or loaded before doing predict\n")
            sys.exit(-1)
        return self.model.predict(data)

    def restore_model(self, load_path=None):
       
        to_load = load_path or self.save_path
        if to_load is None:
            sys.stderr.write("Provide a path to load from or save_path of the model\n")
            sys.exit(-1)
        self.load_model(to_load)
        self.trained = True

    def load_model(self, to_load):
        
        # This will be specific to model so should be implemented by child classes
        raise NotImplementedError()

    def save_model(self):
       
        # This will be specific to model so should be implemented by child classes
        raise NotImplementedError()

    def evaluate(self, x_test, y_test):
        
        # This will be specific to model so should be implemented by child classes
        raise NotImplementedError()

In [41]:
import numpy as np

import scipy.io.wavfile as wav
import os
import speechpy
from sklearn.model_selection import train_test_split

class_labels = ['Neutral', 'Angry', 'Happy', 'Sad']

mslen = 32000  # Empirically calculated for the given dataset


def read_wav(filename):
   
    return wav.read(filename)

import numpy as np


def plot_confusion_matrix(cm,
                          target_names,
                          title='Confusion matrix',
                          cmap=None,
                          normalize=True):
    """
    given a sklearn confusion matrix (cm), make a nice plot

    Arguments
    ---------
    cm:           confusion matrix from sklearn.metrics.confusion_matrix

    target_names: given classification classes such as [0, 1, 2]
                  the class names, for example: ['high', 'medium', 'low']

    title:        the text to display at the top of the matrix

    cmap:         the gradient of the values displayed from matplotlib.pyplot.cm
                  see http://matplotlib.org/examples/color/colormaps_reference.html
                  plt.get_cmap('jet') or plt.cm.Blues

    normalize:    If False, plot the raw numbers
                  If True, plot the proportions

    Usage
    -----
    plot_confusion_matrix(cm           = cm,                  # confusion matrix created by
                                                              # sklearn.metrics.confusion_matrix
                          normalize    = True,                # show proportions
                          target_names = y_labels_vals,       # list of names of the classes
                          title        = best_estimator_name) # title of graph

    Citiation
    ---------
    http://scikit-learn.org/stable/auto_examples/model_selection/plot_confusion_matrix.html

    """
    import matplotlib.pyplot as plt
    import numpy as np
    import itertools

    accuracy = np.trace(cm) / float(np.sum(cm))
    misclass = 1 - accuracy

    if cmap is None:
        cmap = plt.get_cmap('Blues')

    plt.figure(figsize=(6, 4))
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()

    if target_names is not None:
        tick_marks = np.arange(len(target_names))
        plt.xticks(tick_marks, target_names, rotation=45)
        plt.yticks(tick_marks, target_names)

    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]


    thresh = cm.max() / 1.5 if normalize else cm.max() / 2
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        if normalize:
            plt.text(j, i, "{:0.4f}".format(cm[i, j]),
                     horizontalalignment="center",
                     color="white" if cm[i, j] > thresh else "black")
        else:
            plt.text(j, i, "{:,}".format(cm[i, j]),
                     horizontalalignment="center",
                     color="white" if cm[i, j] > thresh else "black")


    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label\naccuracy={:0.4f}; misclass={:0.4f}'.format(accuracy, misclass))
    plt.show()

    
def get_data(dataset_path, flatten=True, mfcc_len=39):

    data = []
    labels = []
    max_fs = 0
    s = 0
    cnt = 0
    cur_dir = os.getcwd()
    print('curdir', cur_dir)
    os.chdir(dataset_path)
    for i, directory in enumerate(class_labels):
        print("started reading folder", directory)
        os.chdir(directory)
        for filename in os.listdir('.'):
                fs, signal = read_wav(filename)
                max_fs = max(max_fs, fs)
                s_len = len(signal)
#                 print(s_len)
           
                if s_len < mslen:
                    pad_len = mslen - s_len
                    pad_rem = pad_len % 2
                    pad_len //= 2
                    signal = np.pad(signal, (pad_len, pad_len + pad_rem), 'constant', constant_values=0)
                else:
                    pad_len = s_len - mslen
                    pad_len //= 2
                    signal = signal[pad_len:pad_len + mslen]
                mfcc = speechpy.feature.mfcc(signal, fs, num_cepstral=mfcc_len)
#                 print(mfcc.shape)
                
                if flatten:
                    # Flatten the data
                    mfcc = mfcc.flatten()
                data.append(mfcc)
                labels.append(i)
                cnt += 1
        print("ended reading folder", directory)
        os.chdir('..')
    os.chdir(cur_dir)
    x_train, x_test, y_train, y_test = train_test_split(data, labels, test_size=0.2, random_state=42)
    return np.array(x_train), np.array(x_test), np.array(y_train), np.array(y_test)


In [42]:
"""
This file contains classes which implement deep neural networks namely CNN and LSTM
"""
import sys
import keras
from keras import Sequential
from keras.callbacks import EarlyStopping
from keras.layers import LSTM as lstm, Dense, Dropout, Conv2D, Flatten, \
    BatchNormalization, Activation, MaxPooling2D
import numpy as np



class DNN(Model):
   

    def __init__(self, input_shape, num_classes, **params):
       
        super(DNN, self).__init__(**params)
        self.input_shape = input_shape
        self.model = Sequential()
        self.make_default_model()
        self.model.add(Dense(num_classes, activation='softmax'))
        self.model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['categorical_accuracy'])
        print(self.model.summary())
        self.save_path = 'models/' + self.name + '_best_model_ravdess_test.h5'

    def load_model(self, to_load):
        try:
            self.model.load_weights(to_load)
        except:
            sys.stderr.write("Invalid saved file provided")
            sys.exit(-1)

    def save_model(self):
        self.model.save_weights(self.save_path)

    def evaluate(self, x_test, y_test):
        y_pred = self.predict(x_test)
        print(y_pred)
        print(y_pred.argmax(1))
        print(y_test)
        print('Accuracy= ',keras.metrics.categorical_accuracy(y_test,y_pred))
        print('Accuracy =', self.model.evaluate(x_test, y_test)[1])
        cm = confusion_matrix(y_pred=y_pred.argmax(1), y_true=y_test.argmax(1))
        plot_confusion_matrix(cm,normalize=False,target_names =['Neutral', 'Angry', 'Happy', 'Sad'],title = "Confusion Matrix")

    def train(self, x_train, y_train, x_val=None, y_val=None):
        cnt = 0
        best_acc = 0
        for i in range(50):
            p = np.random.permutation(len(x_train))
            print("x_train shape",x_train.shape)
            x_train = x_train[p]
            y_train = y_train[p]
            cnt = cnt+1
            print("hello inside x_train1 what are you doing here",x_train.shape)
            early_stopping=EarlyStopping(monitor='val_categorical_accuracy',mode='max')
            self.model.fit(x_train, y_train, batch_size=32, epochs=4,validation_split=0.2,callbacks=[early_stopping])
            loss, acc = self.model.evaluate(x_val, y_val)
            print("accuracy",acc)
            if acc > best_acc:
                best_acc = acc
        self.trained = True
        print(cnt)

    def make_default_model(self):
        """
        Make the model with default hyper parameters
        """
        raise NotImplementedError()


class CNN(DNN):
    """
    This class handles CNN for speech emotion recognitions
    """

    def __init__(self, input_shape, num_classes, **params):
        params['name'] = 'CNN'
        super(CNN, self).__init__(input_shape, num_classes, **params)

    def make_default_model(self):
        self.model.add(Conv2D(8, (13, 13),
                              input_shape=(self.input_shape[0], self.input_shape[1], 1)))
        self.model.add(BatchNormalization(axis=-1))
        self.model.add(Activation('relu'))
        self.model.add(MaxPooling2D(pool_size=(2, 1)))
        self.model.add(Flatten())
        self.model.add(Dense(64))
        self.model.add(BatchNormalization())
        self.model.add(Activation('relu'))
        self.model.add(Dropout(0.2))


class LSTM(DNN):
    """
    This class handles CNN for speech emotion recognitions
    """

    def __init__(self, input_shape, num_classes, **params):
        params['name'] = 'LSTM'
        super(LSTM, self).__init__(input_shape, num_classes, **params)

    def make_default_model(self):
        self.model.add(lstm(128, input_shape=(self.input_shape[0], self.input_shape[1])))
        self.model.add(Dropout(0.5))
        self.model.add(Dense(32, activation='relu'))
        self.model.add(Dense(16, activation='tanh'))

In [54]:

import pickle
import sys

import matplotlib.pyplot as plt

from sklearn.metrics import accuracy_score, confusion_matrix

from sklearn.ensemble import RandomForestClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.svm import LinearSVC


class MLModel(Model):
  

    def __init__(self, **params):
        super(MLModel, self).__init__(**params)
        self.save_path = 'models/' + self.name + '_best_model_testing.h5' 

    def evaluate(self, x_test, y_test):
        y_pred = self.predict(x_test)
        print('Accuracy',accuracy_score(y_pred=y_pred, y_true=y_test))
        cm = confusion_matrix(y_pred=y_pred, y_true=y_test)
        plot_confusion_matrix(cm,normalize=False,target_names =['Neutral', 'Angry', 'Happy', 'Sad'],title = "Confusion Matrix")

    def save_model(self):
        pickle.dump(self.model, open(self.save_path, "wb"))

    def load_model(self):
        try:
            print('reading model')
            print(self.save_path)
            self.model = pickle.load(open(self.save_path, "rb"))
        except:
            sys.stderr.write("Invalid saved file provided")
            sys.exit(-1)


class SVM(MLModel):


    def __init__(self, **params):
        params['name'] = 'SVM'
        super(SVM, self).__init__(**params)
        self.model = LinearSVC(multi_class='crammer_singer')


class RF(MLModel):

    def __init__(self, **params):
        params['name'] = 'Random Forest'
        super(RF, self).__init__(**params)
        self.model = RandomForestClassifier(n_estimators=100)


class NN(MLModel):
    """
    NN implements use of Neural networks for speech emotion recognition
    """

    def __init__(self, **params):
        params['name'] = 'Neural Network'
        super(NN, self).__init__(**params)
        self.model = MLPClassifier(activation='logistic', verbose=True,
                                   hidden_layer_sizes=(512,), batch_size=32)

In [56]:
"""
This example script uses the library `speechemotionrecognition` and do the training and evaluating the models on
"""
from keras.utils import np_utils

import scipy.io.wavfile as wav
import os
import speechpy
import numpy as np
dataset_path = 'dataset'
mslen = 32000 

def dnn_example():
    x_train, x_test, y_train, y_test = get_data(dataset_path=dataset_path, flatten=False)
    print(y_train.shape)
    y_train = np_utils.to_categorical(y_train)
    print(y_train.shape)
    y_test = np_utils.to_categorical(y_test)
    print('Starting LSTM')
    model = LSTM(input_shape=x_train[0].shape, num_classes=len(class_labels))
    model.train(x_train, y_train, x_test, y_test)
    model.evaluate(x_test, y_test)
    model.save_model()
    print('LSTM Done\n Starting CNN')
    in_shape = x_train[0].shape
    x_train = x_train.reshape(x_train.shape[0], in_shape[0], in_shape[1], 1)
    x_test = x_test.reshape(x_test.shape[0], in_shape[0], in_shape[1], 1)
    model = CNN(input_shape=x_train[0].shape, num_classes=len(class_labels))
    model.train(x_train, y_train, x_test, y_test)
    model.evaluate(x_test, y_test)
    print('CNN Done')


def ml_example():
    x_train, x_test, y_train, y_test = get_data(dataset_path=dataset_path)
    print(x_train.shape)
    models = [NN, RF, SVM]
    for M in models:
        model = M()
        print('Starting', model.name)
        model.train(x_train, y_train)
        model.evaluate(x_test, y_test)
        count=0
        for val in y_test:
            if val == 0 :
                count=count+1
        print(count)       
        model.save_model()
        print(model.name, 'Done')

def loadsvm():
    obj = SVM()
    cur_dir = os.getcwd()
    print('curr_dir',cur_dir)
    obj.save_path = "models/SVM_best_model.h5"
    obj.load_model()
    print('SVM model loaded')
    os.chdir('test')
    max_fs = 0
    data = []
    for filename in os.listdir('.'):
        fs, signal = wav.read(filename)
        max_fs = max(max_fs, fs)
        s_len = len(signal)
        # pad the signals to have same size if lesser than required
        # else slice them
        if s_len < mslen:
            pad_len = mslen - s_len
            pad_rem = pad_len % 2
            pad_len //= 2
            signal = np.pad(signal, (pad_len, pad_len + pad_rem), 'constant', constant_values=0)
        else:
            pad_len = s_len - mslen
            pad_len //= 2
            signal = signal[pad_len:pad_len + mslen]
        mfcc = speechpy.feature.mfcc(signal, fs, num_cepstral=39)
        mfcc = mfcc.flatten()
        print(mfcc.shape)
        data.append(mfcc)
    print(obj.model.predict(data))

def loadlstm():
    print('LSTM model loaded')
    print('Predicting on new data')
    max_fs = 0
    data = []
    cur_dir = os.getcwd()
    print('curr_dir',cur_dir)
    os.chdir('test')
    obj = LSTM(input_shape=(198,39), num_classes=len(class_labels))
    obj.load_model('models/best_model_LSTM.h5')
    os.chdir('test')
    for filename in os.listdir('.'):
        fs, signal = wav.read(filename)
        max_fs = max(max_fs, fs)
        s_len = len(signal)
        if s_len < mslen:
            pad_len = mslen - s_len
            pad_rem = pad_len % 2
            pad_len //= 2
            signal = np.pad(signal, (pad_len, pad_len + pad_rem), 'constant', constant_values=0)
        else:
            pad_len = s_len - mslen
            pad_len //= 2
            signal = signal[pad_len:pad_len + mslen]
        mfcc = speechpy.feature.mfcc(signal, fs, num_cepstral=39)
        print(mfcc.shape)
        data.append(mfcc)
    print(np.array(data).shape)
    print(obj.model.predict(np.array(data)))
    



if __name__ == "__main__":
    loadsvm()
    #ml_example()
    #dnn_example()

curr_dir /home/deepak/Desktop/project
reading model
models/SVM_best_model.h5
SVM model loaded
(7722,)
(7722,)
(7722,)
[2 2 2]


In [53]:
cur_dir = os.getcwd()
print('curr_dir',cur_dir)

curr_dir /home/deepak/Desktop/project
