In [None]:
import os

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import matplotlib.pyplot as plt
import seaborn as sns

# librosa is a Python library for analyzing audio and music. 
# It can be used to extract the data from the audio files we will see it later
import librosa 
import torch
import librosa.display
import warnings
warnings.filterwarnings("ignore") 
# to play the audio files
from IPython.display import Audio
plt.style.use('seaborn-white')

# Feature Extractor with x-vector

In [None]:
# Reference; https://huggingface.co/speechbrain/spkrec-xvect-voxceleb
! pip install speechbrain

In [None]:
# audio file is decoded on the fly
import torchaudio
from speechbrain.pretrained import EncoderClassifier
classifier = EncoderClassifier.from_hparams(source="speechbrain/spkrec-xvect-voxceleb", savedir="pretrained_models/spkrec-xvect-voxceleb")

def extract_features(path):
    signal, fs =torchaudio.load(path)
    embeddings = classifier.encode_batch(signal)
    return np.array(embeddings.mean(axis = 0).squeeze())

# CREMA-D

In [None]:
crema = "/kaggle/input/cremad/AudioWAV/"
crema_directory_list = os.listdir(crema)
file_name = []
file_emotion = []
file_path = []

for file in crema_directory_list:
    # storing file paths
    file_name.append(file.split('.')[0])
    file_path.append(crema + file)
    # storing file emotions
    part=file.split('_')
    if part[2] == 'SAD':
        file_emotion.append('Sadness')
    elif part[2] == 'ANG':
        file_emotion.append('Anger')
    elif part[2] == 'DIS':
        file_emotion.append('Disgust')
    elif part[2] == 'FEA':
        file_emotion.append('Fear')
    elif part[2] == 'HAP':
        file_emotion.append('Happiness')
    elif part[2] == 'NEU':
        file_emotion.append('Neutral')
    else:
        file_emotion.append('Unknown')

        
filename_df = pd.DataFrame(file_name, columns=['Name'])
path_df = pd.DataFrame(file_path, columns=['Path'])
emotion_df = pd.DataFrame(file_emotion, columns=['Emotions'])
Crema_df = pd.concat([filename_df,path_df, emotion_df], axis=1)
Crema_df['source'] = 'cremad'
cdataset = Crema_df
cemotions = ['Sadness', 'Happiness', 'Anger', 'Fear', 'Neutral', 'Disgust']
cdataset = cdataset[cdataset['Emotions'].isin(cemotions)].reset_index(drop = True)
#cdataset['Emotions'].value_counts()
cdataset['Emotions'].value_counts()

In [None]:
from sklearn import preprocessing
lec = preprocessing.LabelEncoder()
cdataset['labels'] = lec.fit_transform(cdataset['Emotions'])
lec_name_mapping = dict(zip(lec.classes_, lec.transform(lec.classes_)))
print(lec_name_mapping)

In [None]:
cdataset.head()

In [None]:
feats = np.array(extract_features(cdataset['Path'][0]))
feats.shape

In [None]:
waveform_embeddings = []
for i in range(len(cdataset)):
    features = extract_features(cdataset['Path'][i])
    waveform_embeddings.append(features)

waveform_embeddings = np.array(waveform_embeddings)
print(waveform_embeddings.shape)

In [None]:
# Reference t-SNE: https://www.kaggle.com/code/colinmorris/visualizing-embeddings-with-t-sne
from sklearn.manifold import TSNE

# The default of 1,000 iterations gives fine results, but I'm training for longer just to eke
# out some marginal improvements. NB: This takes almost an hour!
tsne = TSNE(random_state=1, n_iter=1000, metric="cosine")

tsne_proj = tsne.fit_transform(waveform_embeddings)
# Plot those points as a scatter plot and label them based on the pred labels
from matplotlib import cm
cmap = cm.get_cmap('tab20')
fig, ax = plt.subplots(figsize=(8,8))
num_categories = 6
# {'Anger': 0, 'Disgust': 1, 'Fear': 2, 'Happiness': 3, 'Neutral': 4, 'Sadness': 5}
labels = ['Anger', 'Disgust', 'Fear', 'Happiness', 'Neutral', 'Sadness']
for lab in range(num_categories):
    indices = cdataset['labels']==lab
    ax.scatter(tsne_proj[indices,0],tsne_proj[indices,1], c=np.array(cmap(lab)).reshape(1,4), label = labels[lab] ,alpha=0.5)
ax.legend(fontsize='large', markerscale=2)
plt.savefig("./xvector_cremad_tsne")
plt.show()

In [None]:
waveform_embeddings1 = np.expand_dims(waveform_embeddings, -1)
print(waveform_embeddings1.shape)
"""
waveform_embeddings1 = waveform_embeddings1.reshape(7442, 1, 512)
print(waveform_embeddings1.shape)
"""

# Training and Test Data Split

Refrence for k-fold: https://medium.com/towards-artificial-intelligence/importance-of-k-fold-cross-validation-in-machine-learning-a0d76f49493e

The general procedure for k-fold cross-validation is as follows:

1. Shuffle the dataset randomly

2. Split the dataset into k groups

3. For each unique group:


3(i). Take the group as a holdout or test data set

3(ii). Take the remaining groups as a training data set

3(iii). Fit a model on the training set and evaluate it on the test set


3(iv). Retain the evaluation score and discard the model


3(v). Summarize the skill of the model using the sample of model evaluation scores 


N.B: The test set in each fold does not overlap with each other.



In [None]:
# Train and test split for Speaker Recognition Embeddings

seed = 0
# wav2clip CREMA-D
# Train and test split
from sklearn.model_selection import train_test_split, cross_val_predict
x_train, x_test, y_train, y_test = train_test_split(waveform_embeddings1, cdataset['labels'], test_size = 0.2, random_state = seed)
print(x_train.shape)
print(x_test.shape)
print(y_train.shape)
print(y_test.shape)

x = np.concatenate([x_train, x_test], axis = 0)
y = np.concatenate([y_train, y_test], axis = 0)
"""
# Fold 1
x_train, x_test = x[:5953], x[5953:]
y_train, y_test = y[:5953], y[5953:]
print(x_train.shape)
print(x_test.shape)
print(y_train.shape)
print(y_test.shape)

# Fold 2
x_train, x_test = np.concatenate([x[:4465], x[5953:]], axis = 0), x[4465:5953]
y_train, y_test = np.concatenate([y[:4465], y[5953:]], axis = 0), y[4465:5953]
print(x_train.shape)
print(x_test.shape)
print(y_train.shape)
print(y_test.shape)

# Fold 3
x_train, x_test = np.concatenate([x[:2977], x[4465:]], axis = 0), x[2977:4465]
y_train, y_test = np.concatenate([y[:2977], y[4465:]], axis = 0), y[2977:4465]
print(x_train.shape)
print(x_test.shape)
print(y_train.shape)
print(y_test.shape)

# Fold 4
x_train, x_test = np.concatenate([x[:1489], x[2977:]], axis = 0), x[1489:2977]
y_train, y_test = np.concatenate([y[:1489], y[2977:]], axis = 0), y[1489:2977]
print(x_train.shape)
print(x_test.shape)
print(y_train.shape)
print(y_test.shape)
"""
# Fold 5
x_train, x_test = x[1489:], x[:1489]
y_train, y_test = y[1489:], y[:1489]
print(x_train.shape)
print(x_test.shape)
print(y_train.shape)
print(y_test.shape)


# CNN LSTM-Attention

In [None]:
#! pip install attention

In [None]:
#from attention import Attention

In [None]:
import pickle
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from time import time
import tensorflow as tf
import keras
from keras.preprocessing.sequence import TimeseriesGenerator
from scipy import stats
from IPython.display import display, HTML

from sklearn import metrics
from sklearn.metrics import classification_report
from sklearn import preprocessing
from sklearn.model_selection import train_test_split

import keras
from keras import optimizers
from keras.models import Sequential
from keras.layers import Dense, Dropout, Flatten, Reshape
from keras.layers import Conv3D,Conv2D, MaxPooling2D,TimeDistributed,LSTM,ConvLSTM2D
from keras.utils import np_utils

from tensorflow.keras.layers import Input, Lambda, Dense, Flatten,Conv2D
from tensorflow.keras.applications.resnet50 import preprocess_input
from tensorflow.keras.preprocessing import image
from tensorflow.keras.preprocessing.image import ImageDataGenerator,load_img
from tensorflow.keras import datasets, layers, models
from tensorflow.keras.models import Sequential
import numpy as np
from glob import glob
import matplotlib.pyplot as plt
from tensorflow.keras.layers import MaxPooling2D
import keras
from keras.layers import Input, Conv2D, Dense, concatenate, Embedding, GlobalAveragePooling1D
from keras.models import Model

In [None]:
# LSTM + Attention

"""
ip = Input((512,1))
lstm1 = tf.keras.layers.LSTM(30, return_sequences=False, activation=tf.nn.relu)(ip)
#lstm2 = tf.keras.layers.LSTM(20, return_sequences=True, activation=tf.nn.relu)(lstm1)
#attention = Attention(5)(lstm1)
dense1 = keras.layers.Dense(30, activation='relu')(lstm1)
output = keras.layers.Dense(6, activation='softmax')(dense1)
model = Model(inputs=ip, outputs=output)
model.summary()  
"""

In [None]:
#! pip install keras_nlp

In [None]:
# CNN + Attention
from keras.layers import Input, Conv2D, Dense, concatenate, Embedding, GlobalAveragePooling1D, Attention
#import keras_nlp


def create_model():
    
    # embeddings from xvector
    input_speakrec = keras.Input(shape=[512, 1])
    x1 = tf.keras.layers.Conv1D(32, 3, activation = 'relu', padding = 'same')(input_speakrec)
    x1 = tf.keras.layers.MaxPooling1D()(x1)
    #x1 = tf.keras.layers.Conv1D(64, 3, activation = 'relu', padding = 'same')(x1)
    #x1 = tf.keras.layers.MaxPooling1D()(x1)
    #x = tf.keras.layers.Attention()([x, x])
    x1 = tf.keras.layers.Flatten()(x1)
    
    """
    # MFCC
    input_mfcc = keras.Input(shape=[40, 1])
    x2 = tf.keras.layers.Conv1D(32, 3, activation = 'relu', padding = 'same')(input_mfcc)
    x2 = tf.keras.layers.MaxPooling1D()(x2)
    #x2 = tf.keras.layers.Conv1D(64, 3, activation = 'relu', padding = 'same')(x2)
    #x2 = tf.keras.layers.MaxPooling1D()(x2)
    #x = tf.keras.layers.Attention()([x, x])
    x2 = tf.keras.layers.Flatten()(x2)
    """
    
    #oncat = concatenate([x1, x2],axis = 1)
    #concat = tf.reshape(concat, [concat,1])
    #x = tf.keras.layers.Dropout(0.2)(x)
    #x = tf.keras.layers.Dropout(0.2)(x)
    #x = tf.keras.layers.BatchNormalization()(x)
    #concat = tf.keras.layers.Reshape((1, 8832))(concat)
    
    #encoder = keras_nlp.layers.TransformerEncoder(intermediate_dim=64, num_heads=4)
    #x = encoder(concat)
    #x = tf.keras.layers.GlobalAveragePooling1D()(x)

    #x = tf.keras.layers.Reshape((1, 8832))(x)
    x = tf.keras.layers.Dense(200, activation='relu')(x1)
    x = tf.keras.layers.BatchNormalization()(x)
    #x = tf.keras.layers.Dense(224, activation = 'relu')(x)
    #x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.Dense(90, activation = 'relu')(x)
    x = tf.keras.layers.Dropout(0.2)(x)
    x = tf.keras.layers.Dense(56, activation = 'relu')(x)
    output = tf.keras.layers.Dense(6, activation='softmax')(x)
    model = keras.Model(inputs= input_speakrec, outputs=output)
    return model

model = create_model()
model.summary()

In [None]:
#! pip install --upgrade keras-nlp

In [None]:
# Reference: Transformer Encoder (https://keras.io/api/keras_nlp/layers/transformer_encoder/)
"""
import keras_nlp
from tensorflow import keras


def transformer_model():
 # Create a single transformer encoder layer.
  encoder = keras_nlp.layers.TransformerEncoder(intermediate_dim=120, num_heads=8)
  # Create a simple model containing the encoder.
  input = keras.Input(shape=[1, 512])
  x = encoder(input)
  x = tf.keras.layers.GlobalAveragePooling1D()(x)
  x = tf.keras.layers.Dense(300, activation='relu')(x)
  x = tf.keras.layers.BatchNormalization()(x)
  tf.keras.layers.Dropout(0.2),
  tf.keras.layers.Dense(224, activation = 'relu'),
  tf.keras.layers.BatchNormalization(),
  tf.keras.layers.Dense(128, activation = 'relu'),
  x = tf.keras.layers.Dropout(0.2)(x)
  x =  tf.keras.layers.Dense(36, activation = 'relu')(x)
  output = tf.keras.layers.Dense(6, activation='softmax')(x)
  model = keras.Model(inputs=input, outputs=output)
  return model


# Call encoder on the inputs.
input_data = tf.random.uniform(shape=[10, 1, 512])
output = model(input_data)
print(output.shape)

model = transformer_model()
model.summary()
"""

In [None]:
lr = 1e-3
import tensorflow_addons as tfa
optimizer = tfa.optimizers.RectifiedAdam(learning_rate= lr)
#optimizer = tf.keras.optimizers.Adam(learning_rate= lr)
# Compile the model with the Riemannian optimizer            
model.compile(
    optimizer=optimizer,
    loss=tf.keras.losses.SparseCategoricalCrossentropy(),  #from_logits=True
    metrics= ['accuracy']  #[tf.keras.metrics.SparseCategoricalAccuracy()],
)


reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(monitor = 'val_accuracy',
                                                 factor = 0.2,
                                                 patience = 1,
                                                 verbose = 1,
                                                 min_delta = 1e-4,
                                                 min_lr = 1e-15,
                                                 mode = 'max')

earlystopping = tf.keras.callbacks.EarlyStopping(monitor = 'val_accuracy',
                                                 min_delta = 1e-4,
                                                 patience = 70,
                                                 mode = 'max',
                                                 restore_best_weights = True,
                                                 verbose = 1)

checkpointer = tf.keras.callbacks.ModelCheckpoint(filepath = './cnn_cremad.hdf5',
                                                  monitor = 'val_accuracy', 
                                                  verbose = 1, 
                                                  save_best_only = True,
                                                  save_weights_only = True,
                                                  mode = 'max')

callbacks = [earlystopping, checkpointer, reduce_lr]#reduce_lr]

In [None]:
import time
start_time = time.time()
history = model.fit( x_train, y_train,
                   validation_data =  (x_test, y_test),
                   batch_size = 32,
                   epochs = 50,
                   callbacks = callbacks)

print("--- %s seconds ---" % (time.time() - start_time))

In [None]:
y_pred = model.predict(x_test)
print(y_pred.shape)
y_predmax = tf.math.argmax(y_pred, axis = 1)
from sklearn.metrics import classification_report
cr = classification_report(y_test, y_predmax, digits = 6)
print(cr)

In [None]:
# Load the saved model
"""
def load_trained_model(weights_path):
   model = create_model()
   model.load_weights(weights_path)
   return model

new_model = load_trained_model('./cnn_cremad.hdf5')
new_model.summary()
y_predmax = tf.math.argmax(y_pred, axis = 1)
from sklearn.metrics import classification_report
cr = classification_report(y_test, y_predmax, digits = 6)
print(cr)
y_predmax = tf.math.argmax(y_pred, axis = 1)
from sklearn.metrics import classification_report
cr = classification_report(y_test, y_predmax, digits = 6)
print(cr)
"""