# Speech Emotion Recognition

1. Gathering data
2. Quick EDA
3. Preprocess
4. Extract features
5. Train LSTM models with different parameters
6. Evaluate

Datasets:
* Crowd-sourced Emotional Multimodal Actors Dataset (Crema-D)
* Ryerson Audio-Visual Database of Emotional Speech and Song (Ravdess)
* Surrey Audio-Visual Expressed Emotion (Savee)
* Toronto Emotional Speech Set (Tessa)

In [None]:
%pip install pydub kagglehub

In [None]:
import os
import pandas as pd
import numpy as np
import kagglehub

In [None]:
# from tensorflow.keras.models import load_model
import tensorflow as tf
tf.config.experimental.enable_op_determinism()
# model = load_model("../input/lstm/keras/default/1/multi.h5")

In [None]:
# RAVDESS = "/kaggle/input/ravdess-emotional-speech-audio/audio_speech_actors_01-24/"
# CREMA = "/kaggle/input/cremad/AudioWAV/"
# TESS = "/kaggle/input/toronto-emotional-speech-set-tess/tess toronto emotional speech set data/TESS Toronto emotional speech set data/"
# SAVEE = "/kaggle/input/surrey-audiovisual-expressed-emotion-savee/ALL/"

**1. Ravdess Dataframe**

There are 1440 audio files, for example, 03-01-**06**-01-02-01-12.wav.

In [None]:
CREMA = kagglehub.dataset_download('ejlok1/cremad') + "/AudioWAV/"
RAVDESS = kagglehub.dataset_download('uwrfkaggler/ravdess-emotional-speech-audio') + "/audio_speech_actors_01-24/"
SAVEE = kagglehub.dataset_download('ejlok1/surrey-audiovisual-expressed-emotion-savee') + "/ALL/"
TESS = kagglehub.dataset_download('ejlok1/toronto-emotional-speech-set-tess') + "/tess toronto emotional speech set data/TESS Toronto emotional speech set data/"

In [None]:
!ls /root/.cache/kagglehub/datasets/ejlok1/toronto-emotional-speech-set-tess/versions/1

In [None]:
ravdess_dir_lis = os.listdir(RAVDESS)
path_list = []
gender_list = []
emotion_list = []

emotion_dic = {
    '03' : 'happy',
    '01' : 'neutral',
    '04' : 'sad',
    '05' : 'angry',
    '06' : 'fear',
    '07' : 'disgust',
}

for directory in ravdess_dir_lis:
    actor_files = os.listdir(os.path.join(RAVDESS, directory))
    for audio_file in actor_files:
        part = audio_file.split('.')[0]
        key = part.split('-')[2]
        if key in emotion_dic:
            gender_code = int(part.split('-')[6])
            path_list.append(f"{RAVDESS}{directory}/{audio_file}")
            gender_list.append('female' if gender_code & 1 == 0 else 'male')
            emotion_list.append(emotion_dic[key])

ravdess_df = pd.concat([
    pd.DataFrame(path_list, columns=['path']),
    pd.DataFrame(gender_list, columns=['sex']),
    pd.DataFrame(emotion_list, columns=['emotion'])
], axis=1)

ravdess_df.head()

**2. Crema-D Dataframe**

There are 7,442 audio files, for example, 1001_DFA_**ANG**_XX.wav.

In [None]:
crema_dir_list = os.listdir(CREMA)
path_list = []
gender_list = []
emotion_list = []

emotion_dic = {
    'HAP' : 'happy',
    'NEU' : 'neutral',
    'SAD' : 'sad',
    'ANG' : 'angry',
    'FEA' : 'fear',
    'DIS' : 'disgust',
}

female_id_list = [
    '1002', '1003', '1004', '1006', '1007', '1008', '1009', '1010', '1012', '1013', '1018',
    '1020', '1021', '1024', '1025', '1028', '1029', '1030', '1037', '1043', '1046', '1047',
    '1049', '1052', '1053', '1054', '1055', '1056', '1058', '1060', '1061', '1063', '1072',
    '1073', '1074', '1075', '1076', '1078', '1079', '1082', '1084', '1089', '1091',
]

for audio_file in crema_dir_list:
    part = audio_file.split('_')
    key = part[2]
    if key in emotion_dic and part[3] == 'HI.wav':
        path_list.append(f"{CREMA}{audio_file}")
        gender_list.append('female' if part[0] in female_id_list else 'male')
        emotion_list.append(emotion_dic[key])

crema_df = pd.concat([
    pd.DataFrame(path_list, columns=['path']),
    pd.DataFrame(gender_list, columns=['sex']),
    pd.DataFrame(emotion_list, columns=['emotion'])
], axis=1)

crema_df.head()

**3. Tess Dataframe**

There are 2,800 audio files, for example, OAF_base_**fear**.wav.

In [None]:
tess_dir_list = os.listdir(TESS)
path_list = []
gender_list = []
emotion_list = []

emotion_dic = {
    'happy'   : 'happy',
    'neutral' : 'neutral',
    'sad'     : 'sad',
    'Sad'     : 'sad',
    'angry'   : 'angry',
    'fear'    : 'fear',
    'disgust'  : 'disgust',
}

for directory in tess_dir_list:
    audio_files = os.listdir(os.path.join(TESS, directory))
    for audio_file in audio_files:
        part = audio_file.split('.')[0]
        key = part.split('_')[2]
        if key in emotion_dic:
            path_list.append(f"{TESS}{directory}/{audio_file}")
            gender_list.append('female') # female only dataset
            emotion_list.append(emotion_dic[key])

tess_df = pd.concat([
    pd.DataFrame(path_list, columns=['path']),
    pd.DataFrame(gender_list, columns=['sex']),
    pd.DataFrame(emotion_list, columns=['emotion'])
], axis=1)

tess_df.head()

**4. Savee Dataframe**

There are 480 audio files, for example, DC_**a**02.wav.

In [None]:
savee_dir_list = os.listdir(SAVEE)
path_list = []
gender_list = []
emotion_list = []

emotion_dic = {
    'h'  : 'happy',
    'n'  : 'neutral',
    'sa' : 'sad',
    'a'  : 'angry',
    'f'  : 'fear',
    'd'  : 'disgust'
}

for audio_file in savee_dir_list:
    part = audio_file.split('_')[1]
    key = part[:-6]
    if key in emotion_dic:
        path_list.append(f"{SAVEE}{audio_file}")
        gender_list.append('male') # male only dataset
        emotion_list.append(emotion_dic[key])

savee_df = pd.concat([
    pd.DataFrame(path_list, columns=['path']),
    pd.DataFrame(gender_list, columns=['sex']),
    pd.DataFrame(emotion_list, columns=['emotion'])
], axis=1)

savee_df.head()

In [None]:
df = pd.concat([
    ravdess_df,
    crema_df,
    tess_df,
    savee_df
], axis=0)
df.head()

In [None]:
df.iloc[0]["path"]

# 2. Quick EDA

We check for imbalances like male to female ratio.

In [None]:
import librosa
import matplotlib.pyplot as plt

plt.style.use('ggplot')

In [None]:
def plot_distribution(df):
    countTable = df.groupby(['emotion', 'sex']).count()
    pivotTable = countTable.pivot_table(index='emotion', columns='sex', values='path')

    pivotTable.plot(kind='bar', figsize=(6, 3), color=['pink', 'blue'])
    plt.title('Emotion and Gender Distribution')
    plt.xlabel('Emotion')
    plt.ylabel('Count')
    plt.show()

plot_distribution(df)

In [None]:
# I decided to go with a female specific model
df = df[df['sex'] == 'female']
plot_distribution(df)

In [None]:
import shutil

shutil.copy(df.iloc[8]["path"], "/kaggle/working/f5.wav") 

In [None]:
df.drop('sex', axis=1, inplace=True)
df.head()

In [None]:
df.iloc[0]["path"]

In [None]:
from IPython.display import Audio

def create_waveplot(y, sr, title):
    plt.figure(figsize=(8, 2))
    plt.title(title)
    librosa.display.waveshow(y, sr=sr)
    plt.show()

In [None]:
emotion_label = 'sad'
title = f"Waveplot for {emotion_label} emotion"
path = np.array(df.path[df.emotion == emotion_label])[1]
y, sr = librosa.load(path)

create_waveplot(y, sr, title)
Audio(path)

# 3. Preprocess

The following steps will be followed to preprocess the audio:

1. Get an array of samples
2. Trim the silence  
3. Padding for equal length

In [None]:
from pydub import AudioSegment, effects

In [None]:
def preprocess_audio(path):
    _, sr = librosa.load(path)
    raw_audio = AudioSegment.from_file(path)

    samples = np.array(raw_audio.get_array_of_samples(), dtype='float32')
    trimmed, _ = librosa.effects.trim(samples, top_db=25)
    padded = np.pad(trimmed, (0, 180000-len(trimmed)), 'constant')
    return padded, sr

In [None]:
emotion_dic = {
    'neutral' : 0,
    'happy'   : 1,
    'sad'     : 2,
    'angry'   : 3,
    'fear'    : 4,
    'disgust' : 5
}

def encode(label):
    return emotion_dic.get(label)

# 4. Extract features

We will only extract these features:

1. Mel-Frequency Cepstral Coefficients: captures the shape of the spectral envelope of a signal
2. Zero Crossing Rate: captures the number of times a signal changes sign per second
3. Root Mean Square Energy: captures the root mean square amplitude of the audio signal

In [None]:
zcr_list = []
rms_list = []
mfccs_list = []
emotion_list = []

FRAME_LENGTH = 2048
HOP_LENGTH = 512

for row in df.itertuples(index=False):
    try:
        y, sr = preprocess_audio(row.path)

        zcr = librosa.feature.zero_crossing_rate(y, frame_length=FRAME_LENGTH, hop_length=HOP_LENGTH)
        rms = librosa.feature.rms(y=y, frame_length=FRAME_LENGTH, hop_length=HOP_LENGTH)
        mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13, hop_length=HOP_LENGTH)

        zcr_list.append(zcr)
        rms_list.append(rms)
        mfccs_list.append(mfccs)

        emotion_list.append(encode(row.emotion))
    except:
        print(f"Failed for path: {row.path}")

In [None]:
X = np.concatenate((
    np.swapaxes(zcr_list, 1, 2),
    np.swapaxes(rms_list, 1, 2),
    np.swapaxes(mfccs_list, 1, 2)),
    axis=2
)
X = X.astype('float32')

y = np.asarray(emotion_list)
y = np.expand_dims(y, axis=1).astype('int8')

In [None]:
X

# 5. Build a LSTM

Before building the model, we will have to setup the data. LSTM are great for sequences.  

In [None]:
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical

In [None]:
X_train, X_to_split, y_train, y_to_split = train_test_split(X, y, test_size=0.12, random_state=1)
X_val, X_test, y_val, y_test = train_test_split(X_to_split, y_to_split, test_size=0.3, random_state=1)

y_train_class = to_categorical(y_train, 6)
y_val_class = to_categorical(y_val, 6)

In [None]:
from keras.models import Sequential
from keras import layers, optimizers, callbacks, Model

In [None]:
X.shape

In [None]:
import numpy as np

def accuracy(y_true, y_pred_probs):
    # print(y_true)
    # print(y_pred_probs)
    y_pred = np.argmax(y_pred_probs, axis=1)
    return np.sum(y_true == y_pred) / len(y_true)

def precision(y_true, y_pred_probs, average='macro'):
    y_pred = np.argmax(y_pred_probs, axis=1)
    unique_classes = np.unique(y_true)
    precisions = []

    for cls in unique_classes:
        tp = np.sum((y_true == cls) & (y_pred == cls))
        fp = np.sum((y_true != cls) & (y_pred == cls))
        p = tp / (tp + fp) if (tp + fp) > 0 else 0
        precisions.append(p)

    return np.mean(precisions) if average == 'macro' else np.sum(precisions * np.bincount(y_true) / len(y_true))

def recall(y_true, y_pred_probs, average='macro'):
    y_pred = np.argmax(y_pred_probs, axis=1)
    unique_classes = np.unique(y_true)
    recalls = []

    for cls in unique_classes:
        tp = np.sum((y_true == cls) & (y_pred == cls))
        fn = np.sum((y_true == cls) & (y_pred != cls))
        r = tp / (tp + fn) if (tp + fn) > 0 else 0
        recalls.append(r)

    return np.mean(recalls) if average == 'macro' else np.sum(recalls * np.bincount(y_true) / len(y_true))

def f1_score(y_true, y_pred_probs, average='macro'):
    p = precision(y_true, y_pred_probs, average=average)
    r = recall(y_true, y_pred_probs, average=average)
    return 2 * (p * r) / (p + r) if (p + r) > 0 else 0


In [None]:
import keras

class BaseModel(Model):
  def __init__(self, **kwargs):
    super().__init__()
    self.seq = Sequential()

  def call(self, inputs):
    return self.seq(inputs)

  def build(self):
    self.seq.build()

@keras.saving.register_keras_serializable()
class LSTM1(BaseModel):
  def __init__(self, activation):
    super().__init__()
    self.seq = Sequential([
        layers.Input(shape=(352, 15)),
        layers.LSTM(64, activation),
        layers.Dense(6, activation="softmax")
    ])

@keras.saving.register_keras_serializable()
class LSTM2(BaseModel):
  def __init__(self, activation):
    super().__init__()
    self.seq = Sequential([
        layers.Input(shape=(352, 15)),
        layers.LSTM(84, activation, return_sequences=True),
        layers.Dropout(0.2),
        layers.LSTM(40, activation),
        layers.Dropout(0.2),
        layers.Dense(6, activation="softmax")
    ])

@keras.saving.register_keras_serializable()
class LSTM3(BaseModel):
  def __init__(self, activation, **kwargs):
    super().__init__( **kwargs)
    self.activation = activation
    self.seq = Sequential([
        layers.Input(shape=(352, 15)),
        layers.Bidirectional(layers.LSTM(128, activation, return_sequences=True)),
        layers.Dropout(0.3),
        layers.Bidirectional(layers.LSTM(64, activation)),
        layers.Dropout(0.4),
        layers.Dense(6, activation="softmax")
    ])
  def get_config(self):
      return {"activation": self.activation}

@keras.saving.register_keras_serializable()
class LSTM4(BaseModel):
  def __init__(self, activation):
    super().__init__()
    self.seq = Sequential([
        layers.Input(shape=(352, 15)),
        layers.LSTM(128, activation, return_sequences=True),
        layers.Dropout(0.2),
        layers.LSTM(64, activation, return_sequences=True),
        layers.Dropout(0.2),
        layers.LSTM(64, activation),
        layers.Dropout(0.2),
        layers.Dense(6, activation="softmax")
    ])

In [None]:
model = LSTM1("relu")
model.compile()
# model.predict(X)

model = LSTM2("relu")
model.compile()
# model.predict(X)

model = LSTM3("relu")
model.compile()
# model.predict(X)

model = LSTM4("relu")
model.compile()
# model.predict(X)


In [None]:
def evaluate(model, X, y):
  y_pred = model.predict(X)
  y = y.flatten()
  return {
      "accuracy": accuracy(y, y_pred),
      "precision": precision(y, y_pred),
      "recall": recall(y, y_pred),
      "f1_score": f1_score(y, y_pred)
  }

In [None]:
import keras.optimizers as optim
import keras.utils

def train_all_models(X_train, y_train, X_val, y_val):
  """Train all architectures with some predefined hyperparameters"""
  models = {model.__name__: model for model in (LSTM1, LSTM2, LSTM3, LSTM4)}

  print(models)
  histories = {}
  for name, model in models.items():
    print(f"Model: {name}")
    keras.utils.set_random_seed(14)
    model = model("sigmoid")
    model.compile(loss='categorical_crossentropy', optimizer=optim.RMSprop(learning_rate=0.001), metrics=['categorical_accuracy'])
    history = model.fit(X_train, y_train, epochs=10, batch_size=8, validation_data=(X_val, y_val))
    histories[name] = history.history
    models[name] = model

  return models, histories

In [None]:
def train_different_optimizers(model_class, optimizers, X_train, y_train, X_val, y_val):
  """Train model with different optimizers"""
  histories = {}
  models = {}
  for name, optimizer in optimizers.items():
    print(f"Model: {model_class.__name__}, Optimizer: {name}")
    keras.utils.set_random_seed(14)
    model = model_class("sigmoid")
    model.compile(loss='categorical_crossentropy', optimizer=optimizer(learning_rate=0.001), metrics=['categorical_accuracy'])

    history = model.fit(X_train, y_train, epochs=10, batch_size=8, validation_data=(X_val, y_val))
    histories[name] = history.history
    models[name] = model

  return models, histories

In [None]:
def train_different_lrs(model_class, optimizer, lrs, X_train, y_train, X_val, y_val):
  """Train model with different learning rates"""
  histories = {}
  models = {}
  for lr in lrs:
    print(f"Model: {model_class.__name__}, Optimizer: {optimizer.__name__}, LR: {lr}")
    keras.utils.set_random_seed(14)
    model = model_class("sigmoid")
    model.compile(loss='categorical_crossentropy', optimizer=optimizer(learning_rate=lr), metrics=['categorical_accuracy'])

    history = model.fit(X_train, y_train, epochs=10, batch_size=8, validation_data=(X_val, y_val))
    histories[str(lr)] = history.history
    models[str(lr)] = model

  return models, histories

def train_different_activations(model_class, optimizer, lr, activations, X_train, y_train, X_val, y_val):
  """Train model with different activation functions"""
  histories = {}
  models = {}
  for activation in activations:
    print(f"Model: {model_class.__name__}, Optimizer: {optimizer.__name__}, LR: {lr}, Activation: {activation}")
    keras.utils.set_random_seed(14)
    model = model_class(activation)
    model.compile(loss='categorical_crossentropy', optimizer=optimizer(learning_rate=lr), metrics=['categorical_accuracy'])

    history = model.fit(X_train, y_train, epochs=10, batch_size=8, validation_data=(X_val, y_val))
    histories[activation] = history.history
    models[activation] = model

  return models, histories

def pipeline():
  """Best model selection through consecutive steps"""
  models, histories = train_all_models(X_train, y_train_class, X_val, y_val_class)
  evaluations = {name: evaluate(model, X_test, y_test) for name, model in models.items()}
  best_name = max(evaluations, key=lambda x: evaluations[x]["accuracy"])
  best_model = type(models[best_name])
  print(f"Best accuracy: {best_name}")

  optimizers = {"RMSprop": optim.RMSprop, "Adam": optim.Adam}
  models, histories = train_different_optimizers(best_model, optimizers, X_train, y_train_class, X_val, y_val_class)
  evaluations = {name: evaluate(model, X_test, y_test) for name, model in models.items()}
  best_name = max(evaluations, key=lambda x: evaluations[x]["accuracy"])
  best_optim = optimizers[best_name]
  print(f"Best accuracy: {best_name}")

  lrs = [0.001, 0.01, 0.1, 0.0001]
  models, histories = train_different_lrs(best_model, best_optim, lrs, X_train, y_train_class, X_val, y_val_class)
  evaluations = {name: evaluate(model, X_test, y_test) for name, model in models.items()}
  best_name = max(evaluations, key=lambda x: evaluations[x]["accuracy"])
  best_lr = float(best_name)
  print(f"Best accuracy: {best_name}")

  activations = ["relu", "sigmoid", "tanh"]
  models, histories = train_different_activations(best_model, best_optim, best_lr, activations, X_train, y_train_class, X_val, y_val_class)
  evaluations = {name: evaluate(model, X_test, y_test) for name, model in models.items()}
  best_name = max(evaluations, key=lambda x: evaluations[x]["accuracy"])

  print(f"Best {best_name}")
    
  return models[best_name], evaluations[best_name], histories[best_name]


In [None]:
best_model, best_scores, best_history = pipeline()

In [None]:
y_train

In [None]:
best_model.save("best.keras")

In [None]:
# import keras

best_model = keras.models.load_model("best.keras")

In [None]:
import json

with open("scores_.json", "w+") as f:
    f.write(json.dumps(best_scores))

In [None]:
import json

with open("history_.json", "w+") as f:
    f.write(json.dumps(best_history))

In [None]:
y_pred = best_model.predict(X_test)

In [None]:
y_pred[:3]

In [None]:
np.argmax(y_pred, axis=1)[:3]

In [None]:
y_test[:3]

In [None]:
metrics = evaluate(best_model, X_test, y_test)

In [None]:
metrics

In [None]:
import numpy as np

np.argmax(y_pred, axis=1)
y_test.flatten()

In [None]:
def confusion_matrix(y_pred, y_true):
    shape = np.max(y_true) + 1
    y_pred = np.argmax(y_pred, axis=1)
    # print(y_pred)
    # print(y_true)
    cm = np.zeros((shape, shape), dtype=np.int32)
    for pred, true in zip(y_pred, y_true):
        cm[true][pred] += 1
    
    return cm

In [None]:
cm = confusion_matrix(y_pred, y_test.flatten())

In [None]:
cm

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

emotion_dic = {
    'neutral' : 0,
    'happy'   : 1,
    'sad'     : 2,
    'angry'   : 3,
    'fear'    : 4,
    'disgust' : 5
}
plt.tick_params(axis='both', which='major', labelsize=10, labelbottom = False, bottom=False, top = False, labeltop=True)
sns.heatmap(cm, annot=True, cmap=sns.color_palette("viridis", as_cmap=True), xticklabels=emotion_dic.keys(), yticklabels=emotion_dic.keys())
plt.savefig("heatmap.png")

In [None]:
list(emotion_dic.keys())

In [None]:
best_history

In [None]:
best_model.layers[1].layers[0].backward_layer.activation

In [None]:
fig, axes = plt.subplots(1, 2, figsize=(10, 5))

axes[0].plot(best_history['loss'])
axes[0].plot(best_history['val_loss'])
axes[0].set_title('Loss for Train and Validation Sets')
axes[0].set_ylabel('Loss')
axes[0].set_xlabel('Epochs')
axes[0].legend(['Training', 'Validation'])

axes[1].plot(best_history['categorical_accuracy'])
axes[1].plot(best_history['val_categorical_accuracy'])
axes[1].set_title('Accuracy for Train and Validation Sets')
axes[1].set_ylabel('Accuracy')
axes[1].set_xlabel('Epochs')
axes[1].legend(['Training', 'Validation'])

fig.tight_layout()

plt.savefig("training.png")

# 6. Evaluate and conclude

Let's see how good are model is.