In [None]:
import os
import numpy as np
import librosa as lb
import matplotlib.pyplot as plt
from tensorflow.keras import models, layers, optimizers, callbacks # type: ignore
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.utils import class_weight

In [None]:
PATH = './ravdess'
SAMPLE_RATE = 22050
N_MELS = 256
DURATION = 3
LENGTH = int(DURATION * SAMPLE_RATE)
NOISE_FACTOR = 0.001
STRETCH_FACTOR = 1.2
SHRINK_FACTOR = 0.8
SHIFT_FACTOR = 0.2
CUT_LENGTH = 4000
FREQ_MASK = 25
TIME_MASK = 30
EMOTIONS = {
    '01': 0,
    '02': 1,
    '03': 2,
    '04': 3,
    '05': 4,
    '06': 5,
    '07': 6,
    '08': 7
}

x_train, x_test, x_val = [], [], []
y_train, y_test, y_val = [], [], []
g_train, g_test, g_val = [], [], []
a_train, a_test, a_val = [], [], []

In [None]:
def load(path):
    data, _ = lb.load(path, sr=SAMPLE_RATE, duration=None)
    data, _ = lb.effects.trim(data, top_db=30)
    data = fix(data)
    return data

def fix(data):
    if len(data) < LENGTH:
        padding = int(LENGTH - len(data))
        data = np.pad(data, (0, padding), 'constant')
    elif len(data) > LENGTH:
        data = data[:LENGTH]
    return data

def extract(data):
    graph = lb.feature.melspectrogram(y=data, sr=SAMPLE_RATE, n_mels=N_MELS)
    graph = lb.power_to_db(graph, ref=np.max)
    delta = lb.feature.delta(graph)
    graph = np.stack([graph, delta], axis=-1)
    return graph.astype(np.float32)

def spec_augument(graph):
    spec = graph.copy()
    n_mels = spec.shape[0]
    f = np.random.randint(0, FREQ_MASK)
    f0 = np.random.randint(0, n_mels - f)
    spec[f0:f0 + f, :, :] = 0
    n_time = spec.shape[1]
    t = np.random.randint(0, TIME_MASK)
    t0 = np.random.randint(0, n_time - t)
    spec[:, t0:t0 + t, :] = 0
    return spec

def augument(data):
    full_data = []
    full_data.append(data)
    noise = np.random.randn(LENGTH)
    data_noise = np.array(data + NOISE_FACTOR * noise)
    full_data.append(data_noise)
    data_deep = lb.effects.pitch_shift(data, sr=SAMPLE_RATE, n_steps=-3)
    full_data.append(data_deep)
    data_shrill = lb.effects.pitch_shift(data, sr=SAMPLE_RATE, n_steps=+3)
    full_data.append(data_shrill)
    data_fast = lb.effects.time_stretch(data, rate=STRETCH_FACTOR)
    data_fast = fix(data_fast)
    full_data.append(data_fast)
    data_slow = lb.effects.time_stretch(data, rate=SHRINK_FACTOR)
    data_slow = fix(data_slow)
    full_data.append(data_slow)
    shift = np.random.randint(int(SAMPLE_RATE * SHIFT_FACTOR))
    data_shift = np.roll(data, shift)
    if shift > 0:
        data_shift[:shift] = 0
    data_shift = fix(data_shift)
    full_data.append(data_shift)
    gain_factor = np.random.uniform(0.8, 1.2)
    data_gain = data * gain_factor
    full_data.append(data_gain)
    data_cut = data.copy()
    start = np.random.randint(0, LENGTH - CUT_LENGTH)
    stop = start + CUT_LENGTH
    data_cut[start:stop] = 0
    full_data.append(data_cut)
    return full_data

def get_file_data():
    files_data = []
    for root, _, files in os.walk(PATH):
        for file in files:
            if not file.endswith('.wav'):
                continue
            args = file.removesuffix('.wav').split('-')
            emotion = args[2]
            actor = args[6]
            gender = 0 if int(actor) % 2 == 0 else 1
            if emotion not in EMOTIONS:
                continue
            files_data.append({
                'path': os.path.join(root, file),
                'label': EMOTIONS[emotion],
                'actor': int(actor),
                'gender': gender
            })
    return files_data

def parse(file_data, arr_x, arr_y, arr_g, arr_a):
    data = load(file_data['path'])
    graph = extract(data)
    arr_x.append(graph)
    arr_y.append(file_data['label'])
    arr_a.append(file_data['actor'])
    arr_g.append(file_data['gender'])

def parse_and_augument(file_data, arr_x, arr_y, arr_g, arr_a):
    data = load(file_data['path'])
    full_data = augument(data)
    for item in full_data:
        graph = extract(item)
        arr_x.append(graph)
        arr_x.append(spec_augument(graph))
        for _ in range(2):
            arr_y.append(file_data['label'])
            arr_a.append(file_data['actor'])
            arr_g.append(file_data['gender'])
        
def save():
    x = {
        'train': np.array(x_train, dtype=np.float32),
        'test': np.array(x_test, dtype=np.float32),
        'val': np.array(x_val, dtype=np.float32),
    }
    y = {
        'train': np.array(y_train, dtype=np.int8),
        'test': np.array(y_test, dtype=np.int8),
        'val': np.array(y_val, dtype=np.int8),
    }
    a = {
        'train': np.array(a_train, dtype=np.int8),
        'test': np.array(a_test, dtype=np.int8),
        'val': np.array(a_val, dtype=np.int8),
    }
    g = {
        'train': np.array(g_train, dtype=np.int8),
        'test': np.array(g_test, dtype=np.int8),
        'val': np.array(g_val, dtype=np.int8),
    }
    os.makedirs('data', exist_ok=True)
    np.savez_compressed('data/dataset.npz', x_train=x['train'], x_test=x['test'], x_val=x['val'], 
                        y_train=y['train'], y_test=y['test'], y_val=y['val'], 
                        g_train=g['train'], g_test=g['test'], g_val=g['val'], 
                        a_train=a['train'], a_test=a['test'], a_val=a['val'])

In [None]:
files_data = get_file_data()
for file_data in files_data:
    if file_data['actor'] <= 18:
        parse_and_augument(file_data, x_train, y_train, g_train, a_train)
    elif file_data['actor'] <= 21:
        parse(file_data, x_test, y_test, g_test, a_test)
    else:
        parse(file_data, x_val, y_val, g_val, a_val)

In [None]:
save()

In [None]:
var = int(input("1 => Train\n2 => Test\n3 => Train + Test\n\n: "))

PATH = './data'
EPOCHS = 100
BATCH_SIZE = 64
LEARNING_RATE = 0.0001
EMOTIONS = ('Neutral', 'Calm', 'Happy', 'Sad', 'Angry', 'Fearful', 'Disgust', 'Surprised')

In [None]:
def load():
    ds = np.load(os.path.join(PATH, 'dataset.npz'))
    x_train, x_test, x_val = ds['x_train'], ds['x_test'], ds['x_val']
    y_train, y_test, y_val = ds['y_train'], ds['y_test'], ds['y_val']
    a_train, a_test, a_val = ds['a_train'], ds['a_test'], ds['a_val']
    g_train, g_test, g_val = ds['g_train'], ds['g_test'], ds['g_val']
    if x_train.ndim == 3:
        x_train = np.expand_dims(x_train, axis=-1)
    if x_test.ndim == 3:
        x_test = np.expand_dims(x_test, axis=-1)
    if x_val.ndim == 3:
        x_val = np.expand_dims(x_val, axis=-1)
    return (x_train, x_test, x_val), (y_train, y_test, y_val), (a_train, a_test, a_val), (g_train, g_test, g_val)

def init(shape):
    inputs = layers.Input(shape=shape)
    x = layers.Normalization(axis=-1)(inputs)

    x = layers.Conv2D(32, (3, 3), padding='same', use_bias=False)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('elu')(x)
    x = layers.MaxPooling2D((2, 2))(x)
    x = layers.SpatialDropout2D(0.1)(x)

    x = layers.Conv2D(64, (3, 3), padding='same', use_bias=False)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('elu')(x)
    x = layers.MaxPooling2D((2, 2))(x)
    x = layers.SpatialDropout2D(0.1)(x)

    x = layers.Conv2D(128, (3, 3), padding='same', use_bias=False)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('elu')(x)
    x = layers.MaxPooling2D((2, 2))(x)
    x = layers.SpatialDropout2D(0.2)(x)

    x = layers.Conv2D(256, (3, 3), padding='same', use_bias=False)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('elu')(x)
    x = layers.MaxPooling2D((2, 2))(x)
    x = layers.SpatialDropout2D(0.2)(x)

    gap = layers.GlobalAveragePooling2D()(x)
    gmp = layers.GlobalMaxPooling2D()(x)
    x = layers.Concatenate()[gap, gmp]

    x = layers.Dense(256, use_bias=False)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('elu')(x)
    x = layers.Dropout(0.4)(x)

    x = layers.Dense(128, use_bias=False)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('elu')(x)
    x = layers.Dropout(0.3)(x)

    outputs = layers.Dense(8, activation='softmax', dtype='float32')(x)

    model = models.Model(inputs=inputs, outputs=outputs)

    opt = optimizers.Adam(learning_rate=LEARNING_RATE)
    loss = 'sparse_categorical_crossentropy'
    model.compile(optimizer=opt, loss=loss, metrics=['accuracy'])
    
    return model

def train(model, x_train, x_val, y_train, y_val):
    checkpoint = callbacks.ModelCheckpoint('data/weights.keras', save_best_only=True, monitor='val_loss', mode='min')
    dynamic_lr = callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=9, min_lr=0.000001, verbose=1)
    early_stop = callbacks.EarlyStopping(monitor='val_loss', patience=15, restore_best_weights=True, verbose=1)
    weights = class_weight.compute_class_weight(
        class_weight='balanced',
        classes=np.unique(y_train),
        y=y_train
    )
    weights_dict = dict(enumerate(weights))
    history = model.fit(
        x_train, y_train,
        validation_data=(x_val, y_val),
        epochs=EPOCHS,
        batch_size=BATCH_SIZE,
        callbacks=[checkpoint, dynamic_lr, early_stop],
        verbose=1,
        class_weight=weights_dict
    )
    return history

def plot_history(history):
    acc = history.history['accuracy']
    val_acc = history.history['val_accuracy']
    loss = history.history['loss']
    val_loss = history.history['val_loss']
    epochs = range(len(acc))

    plt.figure(figsize=(15, 5))
    plt.subplot(1, 2, 1)
    plt.plot(epochs, acc, label='Training Accuracy')
    plt.plot(epochs, val_acc, label='Validation Accuracy')
    plt.legend(loc='lower right')
    plt.title('Training and Validation Accuracy')

    plt.subplot(1, 2, 2)
    plt.plot(epochs, loss, label='Training Loss')
    plt.plot(epochs, val_loss, label='Validation Loss')
    plt.legend(loc='upper right')
    plt.title('Training and Validation Loss')

    plt.show()

def test(x_test, y_test, g_test):
    print(f'Testing Model....')
    model = models.load_model('data/weights.keras')
    predictions = model.predict(x_test)
    y_pred = np.argmax(predictions, axis=1)
    print("\n" + "-" * 50)
    print("CLASSIFICATION REPORT")
    print("-" * 50)
    print(classification_report(y_test, y_pred, target_names=EMOTIONS))
    cm = confusion_matrix(y_test, y_pred)
    print("\n" + "-" * 50)
    print("CONFUSION MATRIX (Text)")
    print("-" * 50)
    print(cm)
    female_idx = np.where(g_test == 0)[0]
    male_idx = np.where(g_test == 1)[0]
    female_acc = np.mean(y_pred[female_idx] == y_test[female_idx])
    male_acc = np.mean(y_pred[male_idx] == y_test[male_idx])
    print('\n' + '-' * 50)
    print('PITCH/GENDER BIAS ANALYSIS')
    print('-' * 50)
    print(f'Female Accuracy: {female_acc*100:.2f}%')
    print(f'Male Accuracy: {male_acc*100:.2f}%')

In [None]:
(x_train, x_test, x_val), (y_train, y_test, y_val), (a_train, a_test, a_val), (g_train, g_test, g_val) = load()

In [None]:
if var == 1 or var == 3:
    model = init(x_train.shape[1:])
    model.layers[0].adapt(x_train)
    history = train(model, x_train, x_val, y_train, y_val)
    plot_history(history)

In [None]:
if var == 2 or var == 3:
    test(x_test, y_test, g_test)