## Importing libraries

For any issues running these modules, use python -m pip install -r requirements.txt

In [1]:
import librosa
import pandas as pd
import numpy as np
import sklearn

import os

import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings("ignore")

## Load data, separate into train/test

In [2]:
from fractions import Fraction
import random

path = "./data/audio_speech_actors_01-24/"
actors = os.listdir(path)

# We need to categorize the data files according to their emotion. Since the dataset is labelled by emotion (which is encoded into their filenames), we need to break that down
# Filename identifiers:
# Modality (01 = full-AV, 02 = video-only, 03 = audio-only).
#
# Vocal channel (01 = speech, 02 = song).
#
# Emotion (01 = neutral, 02 = calm, 03 = happy, 04 = sad, 05 = angry, 06 = fearful, 07 = disgust, 08 = surprised).
#
# Emotional intensity (01 = normal, 02 = strong). NOTE: There is no strong intensity for the 'neutral' emotion.
#
# Statement (01 = "Kids are talking by the door", 02 = "Dogs are sitting by the door").
#
# Repetition (01 = 1st repetition, 02 = 2nd repetition).
#
# Actor (01 to 24. Odd numbered actors are male, even numbered actors are female).

# We are only focusing on the emotion, so we categorize by the third number (01-08)
# according to the dataset's site
mapping = {1:"neutral", 2:"calm", 3:"happy", 4:"sad", 5:"angry", 6:"fearful", 7:"disgust", 8:"surprised"}

def load_data(path, return_train_test=False, test_percentage=0.20):
    # 1 slot for each of the emotions
    emot = []
    paths = []
    train_test_labels = []
    
    # Custom making our train/test split
    test_threshold = int(len(os.listdir(path)) * test_percentage + 1)    # (At least) 20% of data reserved for testing (important that we do this by actor to prevent data leakage)
    print("Test data is {:0.2f}% of the overall data".format(1 / (len(os.listdir(path))/test_threshold)))
    
    # Make a list of actors so we can shuffle order (last few actors will not always be test data each time we load data)
    actors = []
    for directory in os.listdir(path):
        actors.append(directory)
    random.shuffle(actors)
    
    count = 0
    data_label = "test"
    for directory in actors:
        count += 1
        if (count == test_threshold):
            data_label = "train"
        files = os.path.join(path, directory)
        for file in os.listdir(files):
            em_num = int(file.split("-")[2])
            emot.append(em_num)
            train_test_labels.append(data_label)
            paths.append(path + directory + "/" + file)
    
    tts = pd.DataFrame(train_test_labels, columns=["train_test"])
    ems = pd.DataFrame(emot, columns=['emotion']).replace(mapping)
    pths = pd.DataFrame(paths, columns = ["path"])
    data_file = pd.concat(
        [
            tts.reset_index(drop=True),
            ems.reset_index(drop=True),
            pths.reset_index(drop=True)
        ],
        axis=1,
    )
    
    if return_train_test:
        return get_train_test(data_file)
    return data_file

def get_train_test(data):
    grouped = data.groupby(data.train_test)
    train = grouped.get_group("train")
    test = grouped.get_group("test")
    return train, test

print("--- Building Train/Test Data ---")
data = load_data(path)
train, test = get_train_test(data)
train_size = train.train_test.value_counts().train
test_size = test.train_test.value_counts().test
print("Train has", train_size, "files.")
print("Test has", test_size, "files.")

ratio = Fraction(train_size, test_size)
print("Split is", str(ratio.numerator)+":"+str(ratio.denominator), "train:test")

--- Building Train/Test Data ---
Test data is 0.21% of the overall data
Train has 1200 files.
Test has 240 files.
Split is 5:1 train:test


## Extract data

In [4]:
import numpy as np

X = []
y = []

# for m in mapping:
#     for i in train[m]:
#         X.append(i)
#         y.append(m)

def gen_mfccs(data, NUM_MFCCs=13):
    mfccs = pd.DataFrame(columns=['mfccs'])
    
    # Get mfccs from each audio file
    count=0
    for i, j in data.iterrows():
        for item in j.items():
            if item[0] == 'path':
                # Sample rate and duration taken from the kaggle dataset description
                file, sample_rate = librosa.load(item[1], res_type='kaiser_fast',duration=2.5,sr=22050*2,offset=0.5)
                sample_rate = np.array(sample_rate)
                mfcc = np.mean(librosa.feature.mfcc(y=file, sr=sample_rate, n_mfcc=NUM_MFCCs), axis=0)
                mfccs.loc[count] = [mfcc]
                count+=1
                break
    
    # Gen list of mfccs as a dataframe to **manually** concatenate onto data
#     mfccs = pd.DataFrame(mfccs, columns = [("mfcc_" + str(num)) for num in range(len(mfccs[0]))])
#     data = pd.concat(
#         [
#             data.reset_index(drop=True),
#             mfccs.reset_index(drop=True)
#         ],
#         axis=1
#     )

    # Add on these mfccs to the data DataFrame
    return pd.concat([data.reset_index(drop=True), pd.DataFrame(mfccs["mfccs"].values.tolist())], axis=1)
        
data = gen_mfccs(data)
data = data.fillna(0)
train, test = get_train_test(data)

# Save train + test DataFrame file as a csv
data.to_csv("extracted_data.csv",index=False)

In [5]:
def get_x_y(data):
    rev_mapping = {emotion: num for num, emotion in mapping.items()}
    x, y = [], []
    for i, j in data.iterrows():
        col = (label for label in j.items() if label[0] == 'emotion')
        for item in col:
            y.append(rev_mapping[item[1]])
        count = 0
        xs = []
        for k in j.items():
            if count > 5:
                xs.append(k)
#                 print(xs)
            count += 1
#             print(k)
        x.append(xs)
    return x, y

# The numbered columns are mfccs
print(train)
train_x, trian_y = get_x_y(train)
test_x, test_y = get_x_y(test)

     train_test    emotion                                               path  \
240       train    neutral  ./data/audio_speech_actors_01-24/Actor_08/03-0...   
241       train    neutral  ./data/audio_speech_actors_01-24/Actor_08/03-0...   
242       train    neutral  ./data/audio_speech_actors_01-24/Actor_08/03-0...   
243       train    neutral  ./data/audio_speech_actors_01-24/Actor_08/03-0...   
244       train       calm  ./data/audio_speech_actors_01-24/Actor_08/03-0...   
...         ...        ...                                                ...   
1435      train  surprised  ./data/audio_speech_actors_01-24/Actor_06/03-0...   
1436      train  surprised  ./data/audio_speech_actors_01-24/Actor_06/03-0...   
1437      train  surprised  ./data/audio_speech_actors_01-24/Actor_06/03-0...   
1438      train  surprised  ./data/audio_speech_actors_01-24/Actor_06/03-0...   
1439      train  surprised  ./data/audio_speech_actors_01-24/Actor_06/03-0...   

              0          1 

## Baselines

In [9]:
from sklearn.dummy import DummyClassifier

# train a dummy classifier to make predictions based on the most_frequent class value
frequent_dummy_classifier = DummyClassifier(strategy="most_frequent")
frequent_dummy_classifier.fit(train_x, trian_y)

print("Highest frequency baseline:", frequent_dummy_classifier.score(test_x, test_y))

# train a dummy classifier to make predictions based on the class values
stratified_dummy_classifier = DummyClassifier(strategy="stratified")
stratified_dummy_classifier.fit(train_x,trian_y)

print("Random baseline", stratified_dummy_classifier.score(test_x, test_y))

# train a dummy classifier to make predictions based on uniform selection
uniform_dummy_classifier = DummyClassifier(strategy="uniform")
uniform_dummy_classifier.fit(train_x,trian_y)

print("Random uniform baseline", uniform_dummy_classifier.score(test_x, test_y))

Highest frequency baseline: 0.13333333333333333
Random baseline 0.11666666666666667
Random uniform baseline 0.13333333333333333


In [None]:
#Explore dataset
hist.head()
hist.dtypes

## Plot Data

Visualize the data that we have extracted to better understand trends to expect and routes to go for changing parameters.

In [None]:
## TO DO
fig = plt.figure(figsize=(14, 8))
ax1 = fig.add_subplot(211)
ax1.set_title('Visualization Title')
ax1.set_xlabel('xLabel')
ax1.set_ylabel('yLabel')

## Data preparation

If we want to transform the data in any way, we can do it here.

In [None]:
from sklearn.preprocessing import LabelEncoder
le = LabelEncoder()


labels=["01/20", "01/21", "01/22"]
# Generates an int for each label
y=le.fit_transform(labels)

# Prints out each date with its int mapping
for c in list(le.classes_):
    print(le.transform([c])[0], c)

In [None]:
## Reshape data here as necessary

In [None]:
#Create train set and test set
train_set_size = 5000
data = np.array(hist)
train_data = data[:train_set_size]
test_data = data[train_set_size:]
x_train = data[:train_set_size,:-1]
y_train = data[:train_set_size,-1]
x_test = data[train_set_size:,:-1]
y_test = data[train_set_size:,-1]
x_train = torch.from_numpy(x_train).type(torch.Tensor)
x_test = torch.from_numpy(x_test).type(torch.Tensor)
y_train = torch.from_numpy(y_train).type(torch.Tensor)
y_test = torch.from_numpy(y_test).type(torch.Tensor)

# We could try this as it might be a bit easier to use
from sklearn.model_selection import train_test_split
x_tr, x_val, y_tr, y_val = train_test_split(np.array(data_to_be_added),np.array(y),stratify=y,test_size = 0.2,random_state=777,shuffle=True)


## Initializing the model

In [None]:
# from keras.layers import Dense, Dropout, Flatten, Conv1D, Input, MaxPooling1D
# from keras.models import Model
# from keras.callbacks import EarlyStopping, ModelCheckpoint
# from keras import backend as K

In [None]:
# K.clear_session()

# inputs = Input(shape=(8000,1))

# #First Conv1D layer
# conv = Conv1D(8,13, padding='valid', activation='relu', strides=1)(inputs)
# conv = MaxPooling1D(3)(conv)
# conv = Dropout(0.3)(conv)

# #Second Conv1D layer
# conv = Conv1D(16, 11, padding='valid', activation='relu', strides=1)(conv)
# conv = MaxPooling1D(3)(conv)
# conv = Dropout(0.3)(conv)

# #Third Conv1D layer
# conv = Conv1D(32, 9, padding='valid', activation='relu', strides=1)(conv)
# conv = MaxPooling1D(3)(conv)
# conv = Dropout(0.3)(conv)

# #Flatten layer
# conv = Flatten()(conv)

# #Dense Layer 1
# conv = Dense(128, activation='relu')(conv)
# conv = Dropout(0.3)(conv)

# # Output layer
# outputs = Dense(len(labels), activation='softmax')(conv)

# model = Model(inputs, outputs)
# model.summary()

In [None]:
# model.compile(loss='categorical_crossentropy',optimizer='adam',metrics=['accuracy'])
# es = EarlyStopping(monitor='val_loss', mode='min', verbose=1, patience=10, min_delta=0.0001) 
# mc = ModelCheckpoint('best_model.hdf5', monitor='val_accuracy', verbose=1, save_best_only=True, mode='max')

## Training the model

In [None]:
history=model.fit(x_tr, y_tr ,epochs=10, callbacks=[es,mc], batch_size=32, validation_data=(x_val,y_val))

print("MODEL TRAINING COMPLETE!")

## Post Visualization

Now we can take a look at how successful our model is and can easily find where overfitting takes place (if at all)

In [None]:
from matplotlib import pyplot as plt
plt.plot(history.history['loss'], label='train') 
plt.plot(history.history['val_loss'], label='test') 
plt.legend() 
plt.show()

plt.plot(history.history['accuracy'], label='train') 
plt.plot(history.history['val_accuracy'], label='test') 
plt.legend() 
plt.show()

In [None]:
# For the application part (maybe a separate file), we can use our model to predict the future performance

from keras.models import load_model
model=load_model('best_model.hdf5')