In [None]:
# This project uses the CREMA-D dataset (you can downlaod it following the instructions at https://github.com/CheyneyComputerScience/CREMA-D)

In [76]:
import os
import librosa
import librosa.display
import numpy as np
import matplotlib.pyplot as plt

# Locate the folder that contains all the audio file
cwd = os.getcwd() 
audio_path = os.path.join(cwd, "AudioWAV")
audio_wav = [f for f in os.listdir(audio_path) if f.lower().endswith('.wav')]
# Emotions (Anger, Disgust, Fear, Happy, Neutral, and Sad)

print("Done")


Done


In [77]:
MAX_LEN = 174 # Max length of audio changable

X = []
y = []

for wav_files in audio_wav:
    # waveform is audio time series (Amplitude over time)
    # sr is sampling rate, number of samples per sec (Hz)

    path = os.path.join(audio_path, wav_files)
    
    emotion_code = wav_files.split("_")[2] # Getting what emotion the file is getting
    waveform, sr = librosa.load(path, sr = 22050) # 22050 used everything and saves space

    # Converting the wav file into mel_spec
    mel_spec = librosa.feature.melspectrogram(y = waveform, sr = sr, n_mels = 128)
    log_mel_spec = librosa.power_to_db(mel_spec, ref = np.max)
    
    # Used to make sure everything is the same size
    if log_mel_spec.shape[1] < MAX_LEN:
        pad_width = MAX_LEN - log_mel_spec.shape[1]
        log_mel_spec = np.pad(log_mel_spec, pad_width=((0, 0), (0, pad_width)), mode = 'constant')
    else:
        log_mel_spec = log_mel_spec[:, :MAX_LEN]

        
    log_mel_spec = log_mel_spec[..., np.newaxis] # Adds a channel (gray scale) for training

    # Appending into X y, 
    X.append(log_mel_spec)
    y.append(emotion_code)    

print("Done")


Done


In [3]:
import tensorflow as tf
from tensorflow.keras import layers, models, Input, optimizers
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.layers import Dropout 
from sklearn.model_selection import train_test_split # Splitting 70/30 
# from tensorflow.keras.models import load_model
import numpy as np
from tensorflow.keras.callbacks import ReduceLROnPlateau, ModelCheckpoint, EarlyStopping 

In [78]:

# We have X and y time to split it in 70/30

# Encoding into 0,1,2,3,4,5
y_encoded = LabelEncoder().fit_transform(y)

X_train, X_val, y_train, y_val = train_test_split(
    X, y_encoded,
    test_size = 0.3, # 30% is to test it
    stratify = y_encoded,
    random_state = 1
)

X_train = np.array(X_train)
X_val = np.array(X_val)
y_train = np.array(y_train)
y_val = np.array(y_val)

# Normalizing
min_val = np.min(X_train)
max_val = np.max(X_train)
X_train = (X_train - min_val) / (max_val - min_val)
X_val = (X_val - min_val) / (max_val - min_val)

print("Done")


Done


In [21]:
# Base model used for training (Added dropout to make sure it won't overfit and make more generalize learning)
# This is not that good and only achieved 47% validation accuracy

model = models.Sequential()
model.add(layers.Conv2D(32, (3, 3), activation = "relu", input_shape = (128, MAX_LEN, 1))) 
model.add(layers.MaxPooling2D((2, 2)))
model.add(Dropout(0.25))
model.add(layers.Conv2D(64, (3, 3), activation = "relu"))
model.add(layers.MaxPooling2D((2, 2)))
model.add(Dropout(0.25))
model.add(layers.Conv2D(64, (3, 3), activation = "relu"))
model.add(layers.Flatten())
model.add(layers.Dense(64, activation = "relu"))
model.add(Dropout(0.5)) 
model.add(layers.Dense(6, activation = "softmax")) 
model.compile(optimizer = "adam", loss = "sparse_categorical_crossentropy", metrics = ["accuracy"])

# Training the model
history = model.fit(
    X_train, y_train,
    epochs = 20, 
    validation_data = (X_val, y_val),
    verbose = 1 
)

print("Done")



Done


In [102]:
# model.summary() 

In [19]:
# Saving model once you finish training
# model.save("insertname.keras")

In [None]:
# This is used if wanting to continue training from a saved state


# Choose the model file to load
model = load_model('emotion_model_fix.keras')

previous_epochs = 80 # Previous amount of epoch
total_epochs = 90 # Put how much further you want to train it to
# More training
history = model.fit(
    X_train, y_train,
    epochs = total_epochs,  
    validation_data = (X_val, y_val),
    initial_epoch = previous_epochs
)



In [None]:
# Same as above but with some guassian noise added to the input

# Simply adding some noises into the input
data_augmentation = models.Sequential([
    layers.GaussianNoise(0.02),
], name="data_augmentation")

MAX_LEN = 174 

model = models.Sequential([
    Input(shape=(128, MAX_LEN, 1)),
    data_augmentation,
    
    layers.Conv2D(32, (3, 3), padding = "same"),
    layers.BatchNormalization(),
    layers.Activation("relu"),
    layers.MaxPooling2D((2, 2)),
    layers.Dropout(0.25),
    layers.Conv2D(64, (3, 3), padding = "same"),
    layers.BatchNormalization(),
    layers.Activation("relu"),
    layers.MaxPooling2D((2, 2)),
    layers.Dropout(0.25),
    layers.Flatten(),
    layers.Dense(128, activation = "relu"),
    layers.Dropout(0.5),
    layers.Dense(6, activation = "softmax")
])

model.compile(optimizers.Adam(learning_rate = 0.0001), loss = "sparse_categorical_crossentropy", metrics = ["accuracy"])

history = model.fit(
    X_train, y_train,
    epochs=100,
    validation_data=(X_val, y_val),
)

print("Done")

In [42]:
# Model with highest validation accuracy
# Res-Net + SpecAugment


# This is used to mask frequency for more generalization to prevent overfitting
class FrequencyMask(layers.Layer):
    def __init__(self, max_mask_size = 16, **kwargs):
        super(FrequencyMask, self).__init__(**kwargs)
        self.max_mask_size = max_mask_size
    def call(self, inputs, training=None):
        if not training:
            return inputs
        num_mels = tf.shape(inputs)[1]
        mask_size = tf.random.uniform(shape=[], minval=0, maxval=self.max_mask_size, dtype=tf.int32)
        mask_start = tf.random.uniform(shape=[], minval=0, maxval=num_mels - mask_size, dtype=tf.int32)
        mask_range = tf.range(num_mels, dtype=tf.int32)
        mask_condition = (mask_range < mask_start) | (mask_range >= mask_start + mask_size)
        mask = tf.cast(mask_condition, inputs.dtype)
        mask = tf.reshape(mask, (1, num_mels, 1, 1))
        return inputs * mask

# augment the data
data_augmentation = models.Sequential([
    layers.GaussianNoise(0.02),
    FrequencyMask(max_mask_size=16)
], name = "data_augmentation")

# ResNet training
def residual_block(x, filters, strides=(1, 1)):
    shortcut = x
    
    # Main
    fx = layers.Conv2D(filters, (3, 3), strides = strides, padding = "same")(x)
    fx = layers.BatchNormalization()(fx)
    fx = layers.Activation("relu")(fx)

    fx = layers.Conv2D(filters, (3, 3), padding = "same")(fx)
    fx = layers.BatchNormalization()(fx)

    # Shortcut 
    if strides != (1, 1) or x.shape[-1] != filters:
        shortcut = layers.Conv2D(filters, (1, 1), strides=strides, padding = "same")(shortcut)
        shortcut = layers.BatchNormalization()(shortcut)

    # Shortcut -> Main
    output = layers.Add()([fx, shortcut])
    output = layers.Activation("relu")(output)
    return output

# Max Length as defined above 
MAX_LEN = 174 

inputs = Input(shape=(128, MAX_LEN, 1))
x = data_augmentation(inputs)

# Features
x = layers.Conv2D(32, (7, 7), strides=(2, 2), padding='same')(x)
x = layers.BatchNormalization()(x)
x = layers.Activation('relu')(x)
x = layers.MaxPooling2D((3, 3), strides=(2, 2), padding='same')(x)

# Skips
x = residual_block(x, filters=64)
x = residual_block(x, filters=64)

x = residual_block(x, filters=128, strides=(2, 2))
x = residual_block(x, filters=128)

# Classify
x = layers.GlobalAveragePooling2D()(x)
x = layers.Dense(128, activation = "relu")(x)
x = layers.Dropout(0.5)(x)
outputs = layers.Dense(6, activation = "softmax")(x)

model = models.Model(inputs=inputs, outputs=outputs)
model.compile(optimizer = optimizers.Adam(learning_rate = 0.001), loss='sparse_categorical_crossentropy', metrics=['accuracy'])


# Checkpoint to save the best model
model_checkpoint = ModelCheckpoint(
    filepath = "best_model.keras",
    monitor = "val_accuracy",
    mode = "max",
    save_best_only = True,
    verbose = 1
)

# Reduce the learning rate if no progress is made in val acc
reduce_lr = ReduceLROnPlateau(
    monitor = "val_loss",
    factor = 0.2,
    patience = 5, # The amount of times before changing lr
    verbose = 1
)

# Stop training after a while if no progress is made
stop_early = EarlyStopping(
    monitor = "val_loss",
    patience = 15, # If after 15 epoch no progress then stop
    verbose = 1,
    restore_best_weights = True
)

history = model.fit(
    X_train, y_train,
    epochs = 100,
    validation_data = (X_val, y_val),
    callbacks = [
        model_checkpoint, 
        reduce_lr, 
        stop_early
    ]
)

print("Done")


Creating data augmentation pipeline...
Defining the ResNet-style model...
Compiling the model...
Setting up callbacks...
Starting training with ResNet model...
Epoch 1/100
[1m163/163[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 376ms/step - accuracy: 0.3258 - loss: 1.6535
Epoch 1: val_accuracy improved from -inf to 0.17062, saving model to resnet_model_best.keras
[1m163/163[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m69s[0m 417ms/step - accuracy: 0.3260 - loss: 1.6530 - val_accuracy: 0.1706 - val_loss: 7.3195 - learning_rate: 0.0010
Epoch 2/100
[1m163/163[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 367ms/step - accuracy: 0.3880 - loss: 1.4694
Epoch 2: val_accuracy improved from 0.17062 to 0.23735, saving model to resnet_model_best.keras
[1m163/163[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m66s[0m 403ms/step - accuracy: 0.3880 - loss: 1.4694 - val_accuracy: 0.2373 - val_loss: 4.3412 - learning_rate: 0.0010
Epoch 3/100
[1m163/163[0m [32m━━━━━━━━━━━━━━━━

In [105]:


# 0 is ang
# 4 is neu
# 1 is Dis
# 5 is sad
# 2 is Fea
# 3 is happy

{
  "0": "anger",
  "1": "disgust",
  "2": "fear",
  "3": "happy",
  "4": "neutral",
  "5": "sad"
}

{'0': 'anger',
 '1': 'disgust',
 '2': 'fear',
 '3': 'happy',
 '4': 'neutral',
 '5': 'sad'}

In [101]:
min_val = np.min(X_train)
max_val = np.max(X_train)

print(min_val)
print(max_val)

-80.0
0.0
