In [0]:
!wget -q https://www.dropbox.com/s/wr1fjnnt254rcq7/Patrick%20Jean-Baptiste%20-%20Emotion_Images.zip?dl=1 -O Emotion_Images.zip

!wget -q https://zenodo.org/api/files/c8f9b6fe-82ac-481c-ad9c-12b5581cb4b4/Audio_Song_Actors_01-24.zip
!wget -q https://zenodo.org/api/files/c8f9b6fe-82ac-481c-ad9c-12b5581cb4b4/Audio_Speech_Actors_01-24.zip

!wget -q https://www.dropbox.com/s/qdhtexle4p0ngc3/DatasetSplitCSV.zip?dl=1 -O DatasetSplitCSV.zip


!unzip -q -d ./song Audio_Song_Actors_01-24.zip 
!unzip -q -d ./speech Audio_Speech_Actors_01-24.zip
!unzip -q Emotion_Images.zip
!unzip -q DatasetSplitCSV.zip

In [2]:
!pip install speechpy

Collecting speechpy
  Downloading https://files.pythonhosted.org/packages/8f/12/dbda397a998063d9541d9e149c4f523ed138a48824d20598e37632ba33b1/speechpy-2.4-py2.py3-none-any.whl
Installing collected packages: speechpy
Successfully installed speechpy-2.4


In [3]:

import os
import pandas as pd
import numpy as np
from tqdm import tqdm
import scipy.io.wavfile as wav
from speechpy.feature import mfcc
import librosa
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.metrics import confusion_matrix
from sklearn.metrics import classification_report

import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras import models
from tensorflow.keras import backend as K
from tensorflow.keras.callbacks import ReduceLROnPlateau
from tensorflow import keras
from keras.utils import np_utils
from sklearn.utils.multiclass import unique_labels

Using TensorFlow backend.


In [0]:
# FILEPATH_SPEECH= 'C:\\Users\\ZhaoY\\Downloads\\DL_Project\\dataset\\Audio_Song_Actors_01-24\\'
# dir_list_speech = os.listdir(FILEPATH_SPEECH)

In [0]:
params={}
params["embeddingType"]="mfcc" # 
params["mapReduceFunc"]="None" # Avg

In [0]:
def get_feature_vector_from_mfcc(signal, mean_signal_length: int, flatten: bool) -> np.ndarray:
    """
    Make feature vector from MFCC for the given wav file.

    Args:
        file_path (str): path to the .wav file that needs to be read.
        flatten (bool) : Boolean indicating whether to flatten mfcc obtained.
        mfcc_len (int): Number of cepestral co efficients to be consider.

    Returns:
        numpy.ndarray: feature vector of the wav file made from mfcc.
    """
    #fs, signal = wav.read(file_path)
    #signal, fs = librosa.load(file_path, sr=16000, mono=True)
    s_len = len(signal)

    # pad the signals to have same size if lesser than required
    # else slice them    
    
    if s_len < mean_signal_length:
        pad_len = mean_signal_length - s_len
        pad_rem = int(pad_len % 2)
        pad_len = int(pad_len// 2)
        signal = np.pad(signal, (pad_len, pad_len + pad_rem),  'constant', constant_values=0)
    else:
        pad_len = s_len - mean_signal_length
        pad_len //= 2
        signal = signal[pad_len:pad_len + mean_signal_length]
        
    # sample/frame = mean_signal_length*frame_length
    mel_coefficients = mfcc(signal, fs, frame_length=0.048, frame_stride=0.024, num_filters=30, num_cepstral=30, low_frequency=60, high_frequency=7600)
    if flatten:
        # Flatten the data
        mel_coefficients = np.ravel(mel_coefficients)
    return mel_coefficients


def mapReduce(embed,funcName):
    if funcName=="Avg":
        embed= [np.average(embed,axis=0)]
    if funcName=="Pad":
        embed=np.pad(embed, [( 0,6-embed.shape[0]), (0, 0)], mode='constant', constant_values=0)
        embed= [embed.reshape(-1)]
    if funcName=="Many2One":
        embed=[embed[i,:] for i in range(embed.shape[0])]
    if funcName=="None":
        embed=[embed]
    return embed

def uint8_to_float32(x):
    return (np.float32(x) - 128.) / 128.

In [0]:
import glob

file_list=glob.glob("./speech/**/*.wav")+glob.glob("./song/**/*.wav")


mean_signal_length = 0
signals = []
for wavFile in file_list:
    signal, fs = librosa.load(wavFile, sr=16000, mono=True)
    mean_signal_length += len(signal)
    signals.append((wavFile,signal))

mean_signal_length = mean_signal_length//(len(file_list))


In [0]:
features = []
embeddings={"mfcc":{}}
for wavFile,signal in signals:
    f=get_feature_vector_from_mfcc(signal, mean_signal_length, flatten=False)
    features.append((wavFile.split("/")[-1],f))
    embeddings["mfcc"][wavFile.split("/")[-1]]=f

    


In [0]:

train=pd.read_csv("train.csv",header=None)
valid=pd.read_csv("valid.csv",header=None)
test=pd.read_csv("test.csv",header=None)



In [0]:
# embeddings

In [0]:
# x_train, x_test, y_train, y_test = train_test_split(features, labels, test_size=0.2, random_state=42)

x_train=[]
x_val=[]
x_test=[]
for x in train[0]:
    embed=embeddings[params["embeddingType"]][x]
    x_train.extend(mapReduce(embed,params["mapReduceFunc"]))

for x in valid[0]:
    embed=embeddings[params["embeddingType"]][x]
    x_val.extend(mapReduce(embed,params["mapReduceFunc"]))

for x in test[0]:
    embed=embeddings[params["embeddingType"]][x]
    x_test.extend(mapReduce(embed,params["mapReduceFunc"]))

x_train=np.array(x_train)
x_val=np.array(x_val)
x_test=np.array(x_test)

y_train=np.array(train[1].astype('category').cat.codes)
y_val=np.array(valid[1].astype('category').cat.codes)
y_test=np.array(test[1].astype('category').cat.codes)

y_train = np_utils.to_categorical(y_train)
y_val = np_utils.to_categorical(y_val)
y_test = np_utils.to_categorical(y_test)


In [0]:

def build_model(input_shape, num_classes):
    model = models.Sequential()
    model.add(layers.LSTM(128, input_shape=(input_shape[0], input_shape[1])))
    model.add(layers.BatchNormalization())
    model.add(layers.Dropout(0.5))
    model.add(layers.Dense(32, activation='relu'))
    model.add(layers.Dense(16, activation='tanh'))
    model.add(layers.Dense(num_classes, activation='softmax'))
    model.compile(loss='categorical_crossentropy',
                  optimizer='adam',
                  metrics=['accuracy'])
    return model

In [0]:
model = build_model(input_shape=(x_train[0].shape[0], x_train[0].shape[1]), num_classes=8)

In [0]:
lr_reducer = ReduceLROnPlateau(monitor='val_loss', 
                               factor=0.2,
                               patience=5,
                               min_lr=1e-6,
                               verbose=1)

In [0]:
my_callbacks=[lr_reducer,
              tf.keras.callbacks.EarlyStopping(patience=5),
              tf.keras.callbacks.TensorBoard(log_dir='./logs')]

In [15]:
hist = model.fit(x_train, y_train, batch_size=32, epochs=34, validation_data=(x_test, y_test), callbacks=my_callbacks)


Epoch 1/34
Epoch 2/34
Epoch 3/34
Epoch 4/34
Epoch 5/34
Epoch 6/34
Epoch 7/34
Epoch 8/34
Epoch 9/34
Epoch 10/34
Epoch 11/34
Epoch 12/34
Epoch 13/34
Epoch 14/34
Epoch 15/34
Epoch 16/34
Epoch 17/34
Epoch 18/34
Epoch 19/34
Epoch 20/34
Epoch 21/34
Epoch 22/34
Epoch 23/34
Epoch 24/34
Epoch 25/34
Epoch 26/34
Epoch 27/34
Epoch 28/34
Epoch 29/34
Epoch 30/34
Epoch 31/34
Epoch 00031: ReduceLROnPlateau reducing learning rate to 0.00020000000949949026.


In [0]:

# for val in x_val:
    
results={"train":{},"test":{},"valid":{}}
for x in train[0]:
    embed=embeddings[params["embeddingType"]][x]
    pred=model.predict(embed.reshape(1,168,30))
    # x_val.extend(mapReduce(embed,params["mapReduceFunc"]))
    results["train"][x]=pred

for x in valid[0]:
    embed=embeddings[params["embeddingType"]][x]
    pred=model.predict(embed.reshape(1,168,30))
    # x_val.extend(mapReduce(embed,params["mapReduceFunc"]))
    results["valid"][x]=pred
for x in test[0]:
    embed=embeddings[params["embeddingType"]][x]
    pred=model.predict(embed.reshape(1,168,30))
    # x_val.extend(mapReduce(embed,params["mapReduceFunc"]))
    results["test"][x]=pred

In [0]:
np.save('results.npy',results)