## About the data

The dataset can be found [here](http://emodb.bilderbar.info/index-1280.html).

It contains samples of emotional speech in German, labeled with one of 7 different emotions: Anger, Boredom, Disgust, Fear, Happiness, Sadness and Neutral. 

Please download the full database and refer to the documentation to understand how the samples are labeled (see "Additional information").

# Libraries Loading

In [1]:
%matplotlib inline

import os
import argparse
import urllib.request
from flask import Flask
import zipfile36 as zipfile
from joblib import dump, load

import librosa
import numpy as np
import pandas as pd
import seaborn as sns
import librosa.display
import matplotlib.pyplot as plt

import sklearn
from sklearn.preprocessing import OneHotEncoder
from sklearn.metrics import classification_report
from sklearn.model_selection import train_test_split

import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.layers import Conv1D, MaxPooling1D, BatchNormalization, Dropout, Flatten, Dense

In [2]:
# Creates parser and defines arguments 
def flags():
    parser = argparse.ArgumentParser()

    parser.add_argument(
        '--dataset_url',
        type=str,
        default='http://emodb.bilderbar.info/download/download.zip',
        help='URL to access the dataset')
    
    parser.add_argument(
        '--source_path',
        type=str,
        default='./',
        help='path to the source directory')
    
    parser.add_argument(
        '--directory_name',
        type=str,
        default='emodb.zip',
        help='names the downloaded data directory')
    
    parser.add_argument(
        '--features_type',
        type=str,
        default='mfcc',
        help='training features [melspec, mfcc]')
    
    parser.add_argument(
        '--test_split',
        type=float,
        default=0.15,
        help='split percentages fot the test set')
    
    parser.add_argument(
        '--val_split',
        type=float,
        default=0.15,
        help='split percentages fot the validation set')
    
    parser.add_argument(
        '--learning_rate',
        type=float,
        default=1e-3,
        help='learning rate')
    
    parser.add_argument(
        '--batch_size',
        type=int,
        default=32,
        help='size of the training batch')
    
    parser.add_argument(
        '--num_epochs',
        type=int,
        default=200,
        help='number of training epochs')
    
    parser.add_argument(
        '--patience',
        type=int,
        default=20,
        help='early stopping patience')
    
    parser.add_argument(
        '--port_number',
        type=int,
        default=105,
        help='Flask API port')

        
    FLAGS, unparsed = parser.parse_known_args()
    return FLAGS, unparsed

# Feature Engineering & Modeling

In [3]:
def download_and_extract(source, src_dir, dir_name):
    # Download directory from url
    urllib.request.urlretrieve(source, dir_name)

    # Extract downloaded dataset directory
    source_zip = src_dir+dir_name
    dataset_dir = dir_name.split('.')[0]
    with zipfile.ZipFile(source_zip, 'r') as zip_ref:
        zip_ref.extractall(dataset_dir)
    return dataset_dir

In [4]:
def dataset_reading(src_dir, dataset_dir):
    audio_files = sorted(os.listdir(os.path.join(src_dir+dataset_dir, 'wav')))
    audio_directory = src_dir+dataset_dir+'/wav/'
    description = []
    for f in audio_files:
        description.append(f.split('.')[0])

    char_to_emotion = {
        'W': 'anger',
        'L': 'boredom',
        'E': 'disgust',
        'A': 'fear',
        'F': 'happiness',
        'T': 'sadness',
        'N': 'neutral',
    }
    emotions = []
    for d in description:
        emotions.append(char_to_emotion[d[5]])

    emotion_data = {"emotions": emotions}
    emotion_df = pd.DataFrame(emotion_data, index = audio_files)
    return audio_files, audio_directory, emotion_df, emotions

In [5]:
def class_histogram(emotion_df, emotions):
    # Count emotion occurrences
    print(emotion_df.emotions.value_counts())
    
    # Plot histogram of emotion occurrences
    plt.title('Emotion occurrences', size=16)
    sns.countplot(x=emotions)
    plt.xlabel('Emotion')
    plt.ylabel('Count')
    plt.show()

In [6]:
def zero_padding_or_cut(sound, avgLen):
    samples, sr = librosa.load(sound) 
    lenDifference = abs(len(samples)-avgLen)
    cutSize = addSize = int(lenDifference/2)
    
    if len(samples) < avgLen:
        # Padding with 0s
        zeros_front = [0] * addSize
        if lenDifference % 2 == 0:
            zeros_back = [0] * addSize
        else:
            zeros_back = [0] * (addSize+1)
        samples = np.concatenate((zeros_front, samples, zeros_back), axis=0)  
        
    elif len(samples) > avgLen:
        # Length cutting
        remaining = samples[cutSize:]
        if lenDifference % 2 == 0:
            samples = remaining[:len(remaining)-cutSize]
        else:
            samples = remaining[:len(remaining)-cutSize-1]
    return samples, sr

def audio_to_melspec(samples, sr):
    # Plot raw audio file
    pd.Series(samples).plot(figsize=(10, 5))
    plt.xlabel('Time Domain')
    plt.ylabel('Amplitude')
    #plt.show()
    # Create audio spectogram
    spectogram = librosa.stft(samples)
    # Represent frequency on a mel scale
    mag, phase = librosa.magphase(spectogram)
    mel_scaled_spec = librosa.feature.melspectrogram(S=mag, sr=sr)
    # Represent amplitude on a decibel scale
    decibel_scaled_spec = librosa.amplitude_to_db(S=mel_scaled_spec, ref=np.min)  
    return decibel_scaled_spec

def avg_audio_length(audio_files, audio_directory):
    sound_lengths = []
    for f in audio_files:
        sound = audio_directory+f
        samples, _ = librosa.load(sound)
        sound_lengths.append(len(samples))
    avgLen = int(np.mean(sound_lengths))
    return avgLen

def extract_melspec(samples, sr):
    melspec_val = np.mean(audio_to_melspec(samples, sr).T, axis=0)
    return melspec_val

def extract_mfcc(samples, sr):
    mfcc_val = np.mean(librosa.feature.mfcc(y=samples, sr=sr, n_mfcc=40).T, axis=0)
    return mfcc_val

In [7]:
def data_generator(audio_files, audio_directory, features_type):
    data=[]
    avgLen = avg_audio_length(audio_files, audio_directory)
    
    for f in audio_files:
        sound = audio_directory+f
                
        # Length transformation
        samples, sr = zero_padding_or_cut(sound, avgLen)
        
        # Feature extraction
        if features_type == 'mfcc':
            values = extract_mfcc(samples, sr)
        else:
            values = extract_melspec(samples, sr)
        
        data.append(values)
    return data

def labels_generator(emotion_df):
    labels = emotion_df['emotions'].values
    encoder = OneHotEncoder()
    labels = encoder.fit_transform(np.array(labels).reshape(-1,1)).toarray()
    return labels, encoder

In [8]:
# Prepate dataset for deep-learning
def dataset_split(audio_files, audio_directory, features_type, emotion_df, test_split, val_split):
    data = data_generator(audio_files, audio_directory, features_type)
    data = pd.DataFrame(data).iloc[:,:].values
    labels, encoder = labels_generator(emotion_df)
    print(labels.shape, data.shape)

    data, X_test, labels, Y_test = train_test_split(data, labels, test_size=test_split, stratify=labels, shuffle = True)
    X_train, X_val, Y_train, Y_val = train_test_split(data, labels, test_size=val_split, stratify=labels, shuffle=True)
    print(X_train.shape, Y_train.shape, X_test.shape, Y_test.shape, X_val.shape, Y_val.shape)

    X_train = np.expand_dims(X_train, axis=2)
    X_test = np.expand_dims(X_test, axis=2)
    X_val = np.expand_dims(X_val, axis=2)
    return X_train, X_test, Y_train, Y_test, X_val, Y_val, encoder


In [9]:
def CNN(X_train, Y_train, X_val, Y_val, batch_size, learning_rate, num_epochs, patience):
    # Define model type
    model = Sequential()
    
    # Define layers
    model.add(Conv1D(filters=64, kernel_size=5, padding="same",activation="relu", input_shape=(X_train.shape[1],1)))
    model.add(BatchNormalization())
    model.add(MaxPooling1D(pool_size=2))
    model.add(Dropout(0.2))
    
    model.add(Conv1D(filters=125, kernel_size=10, padding="same",activation="relu"))
    model.add(BatchNormalization())
    model.add(MaxPooling1D(pool_size=2))
    model.add(Dropout(0.2))

    model.add(Conv1D(filters=64, kernel_size=5, padding="same",activation="relu"))
    model.add(BatchNormalization())
    model.add(MaxPooling1D(pool_size=2))
    model.add(Dropout(0.2))

    model.add(Flatten())
    model.add(Dense(7, activation="relu"))
    model.add(BatchNormalization())
    model.add(Dropout(0.3))
    model.add(Dense(7, activation="softmax"))
    
    # Model configuration
    model.compile(loss='categorical_crossentropy', optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate),  metrics=['accuracy'])
    model.summary()

    # Model training
    early_stopping = EarlyStopping(monitor='val_loss', mode='min', verbose=0, patience=patience)
    history = model.fit(x=X_train, y=Y_train, batch_size=batch_size, epochs=num_epochs, validation_data=(X_val, Y_val), callbacks=[early_stopping], verbose=2)
    return model, history

def evaluate(model, history, encoder, X_train, X_test, Y_train, Y_test, X_val, Y_val):
    # Model evaluation
    _, train_accuracy = model.evaluate(X_train, Y_train)
    _, val_accuracy = model.evaluate(X_val, Y_val)
    _, test_accuracy = model.evaluate(X_test, Y_test)
    
    print('Train accuracy: %.3f, Validation accuracy: %.3f, Test accuracy: %.3f'  % (train_accuracy, val_accuracy, test_accuracy))
    # Plot training and validation loss
    plt.plot(history.history['loss'], label='train')
    plt.plot(history.history['val_loss'], label='validation')
    plt.title('Model Loss')
    plt.ylabel('loss')
    plt.xlabel('epoch')
    plt.legend()
    plt.show()
    
    # Plot training and validation accuracy
    plt.plot(history.history['accuracy'], label='train')
    plt.plot(history.history['val_accuracy'], label='validation')
    plt.title('Model Accuracy')
    plt.ylabel('accuracy')
    plt.xlabel('epoch')
    plt.legend()
    plt.show()

    # Labels prediction
    predicted_values = model.predict(X_test)
    Y_pred = encoder.inverse_transform(predicted_values)
    Y_test = encoder.inverse_transform(Y_test)
    print(classification_report(Y_test, Y_pred))
    
def save_model(model, src_dir):
    # Save last trained model
    model.save(src_dir+'my_model')
    

# Flask API endpoints

In [None]:
# Get arguments
FLAGS, unparsed = flags()
source = FLAGS.dataset_url
src_dir = FLAGS.source_path
dir_name = FLAGS.directory_name
features_type = FLAGS.features_type
test_split = FLAGS.test_split
val_split = FLAGS.val_split
batch_size = FLAGS.batch_size
learning_rate = FLAGS.learning_rate
num_epochs = FLAGS.num_epochs
patience = FLAGS.patience
port_number = FLAGS.port_number
    
app = Flask(__name__)
    
# Endpoint for model training. Enter 1 in the url variable section to train the model
@app.route('/<int:train>/')
def model_training(train):
    if train==1:
        dataset_dir = download_and_extract(source, src_dir, dir_name)
        audio_files, audio_directory, emotion_df, emotions = dataset_reading(src_dir, dataset_dir)
        class_histogram(emotion_df, emotions)
        X_train, X_test, Y_train, Y_test, X_val, Y_val, encoder = dataset_split(audio_files, audio_directory, features_type, emotion_df, test_split, val_split)
        dump(encoder, 'encoder.joblib')
        model, history = CNN(X_train, Y_train, X_val, Y_val, batch_size, learning_rate, num_epochs, patience)
        evaluate(model, history, encoder, X_train, X_test, Y_train, Y_test, X_val, Y_val)
        save_model(model, src_dir)
        return "The newly trained model is saved in the my_model directory."
    else:
        return "To train the model type enter 1 at the end of the url. If the model is already trained, in the url variable section, you can enter the name of the audio file for which you want to predict the emotion."

    
# Endpoint for querying the last trained model with an audio file of our choice. Enter the name of the audio file of interest in the url variable section
@app.route('/<string:name>/')
def emotion_prediction(name):
    
    model = tf.keras.models.load_model(src_dir+'my_model')
    encoder = load('encoder.joblib') 

    sound = src_dir+dir_name.split('.')[0]+'/wav/'+name
    samples, sr = librosa.load(sound)

    if features_type == 'mfcc':
        values = extract_mfcc(samples, sr)
    else:
        values = extract_melspec(samples, sr)

    x = pd.DataFrame([values]).iloc[:,:].values
    x = np.expand_dims(x, axis=2)

    # Predict emotion for one audio sample
    prediction = model.predict(x)
    predClass = encoder.inverse_transform(prediction)
    return "Predicted emotion for the audio file " + name + " is: " + predClass[0][0]
    
    
if __name__ == "__main__":
    app.run(host='0.0.0.0', port=port_number)
    

 * Serving Flask app '__main__'
 * Debug mode: off


 * Running on all addresses (0.0.0.0)
 * Running on http://127.0.0.1:105
 * Running on http://172.18.0.2:105
[33mPress CTRL+C to quit[0m
