### **MA-CapsNet-DA: Speech Emotion Recognition based on MA-CapsNet using Data Augmentation**

**Sample dataset: EMODB** \
40125 samples, 1-20 dimensional MFCC features
ZCR feature: 1D dimensional

For each frame, we extracted 21-dimensional features. \
For each audio, it contains 130 frames. \
The whole big dataset (~1.38GB) is available upon request

Authors: Huiyun Zhang, Heming Huang, Henry Han \
Last update: Oct 7, 2023 \

(c) all right reserved

In [None]:
import os
os.environ["CUDA_VISIBLE_DEVICES"]="1"

from tensorflow.keras import utils
from tensorflow.keras.datasets import mnist
from tensorflow.keras.models import Model
from tensorflow.keras.layers import *

from tensorflow.keras import backend as K
import numpy as np
import librosa
import pathlib
import random

In [None]:
# you can replace the path with your path
path = pathlib.Path(r'C:\Users\hp\01-202110031 Experiments\EMODBM')


In [None]:
all_emotion_wav = list(path.glob('*/*'))

In [None]:
all_emotion_path =[ str(path) for path in all_emotion_wav]

In [None]:
random.shuffle(all_emotion_path)

In [None]:
label_names = sorted([item.name for item in path.glob('*/')])

In [None]:
label_to_index = dict([(name,index) for index,name in enumerate(label_names)])

In [None]:
all_emotion_label = [label_to_index[pathlib.Path(p).parent.name] for p in all_emotion_path ]

In [None]:
import librosa

In [None]:
# 1. mfcc
def get_mfcc(wav_file, max_mfcc_len):
    y, sr = librosa.load(wav_file, sr = None)
    mfcc = librosa.feature.mfcc(y, sr)
    if max_mfcc_len > mfcc.shape[1]:
        mfcc_feature = np.pad(mfcc, ((0, 0), (0, max_mfcc_len - mfcc.shape[1])), 'constant')
    else:
        mfcc_feature = mfcc[:,:max_mfcc_len]
    return mfcc_feature

# 2. zcr
def get_zcr(wav_file, max_zcr_len):
    y, sr = librosa.load(wav_file, sr = None)
    zcr = librosa.feature.zero_crossing_rate(y)
    if max_zcr_len > zcr.shape[1]:
        zcr_feature = np.pad(zcr, ((0, 0), (0, max_zcr_len - zcr.shape[1])), 'constant')
    else:
        zcr_feature = zcr[:,:max_zcr_len]
    return zcr_feature

mfcc_list = []
index = 0
for i in all_emotion_path:
    print(index)
    feature = np.zeros((21, 130))
    feature[0: 20] = get_mfcc(i, 130)
    feature[20: 21] = get_zcr(i, 130)
    print(feature.T.shape)
    mfcc_list.append(feature.T)
    index = index + 1

In [None]:
data = np.array(mfcc_list)

In [None]:
label = np.array(all_emotion_label)
label = label.reshape(-1,1)

In [None]:
from sklearn.model_selection import train_test_split
from collections import Counter
# x_train, x_test, y_train, y_test = train_test_split(data, label, stratify = label, test_size=0.2)
label=np.array(label,dtype=int)
x_train, x_test, y_train, y_test = train_test_split(data, label, stratify = label, test_size=0.2)
t=Counter(y_train.T[0].tolist())
# print(t)
# x_train = x_train.reshape(-1,130*21*1)
x_train, y_train=doResamapling(x_train, y_train, method)
x_train=x_train.reshape(-1, 130, 21)
# t=Counter(y_train)
print(t)

In [None]:
num_classes = 4
rows, cols = 130, 21

x_train = x_train.reshape(x_train.shape[0], rows, cols, 1)
x_test = x_test.reshape(x_test.shape[0], rows, cols, 1)
x_train = x_train.astype('float32')
x_test = x_test.astype('float32')

from sklearn.preprocessing import StandardScaler,MinMaxScaler
ss = StandardScaler()
x_train = ss.fit_transform(x_train.reshape(-1,130*21*1)).reshape(-1, 130, 21, 1)
x_test = ss.transform(x_test.reshape(-1,130*21*1)).reshape(-1, 130, 21, 1)
from joblib import dump
dump(ss, "std_scaler.bin", compress=True)

y_train = utils.to_categorical(y_train, num_classes)
y_test = utils.to_categorical(y_test, num_classes)

In [None]:
x_train.shape,x_test.shape

In [None]:
idx = range(0,len(x_test))
idx=list(idx)
np.random.shuffle(idx)

In [None]:
X_test = np.concatenate([x_test, x_test[idx]], 1)
Y_test = np.vstack([y_test.argmax(1), y_test[idx].argmax(1)]).T
X_test = X_test[Y_test[:,0] != Y_test[:,1]]
Y_test = Y_test[Y_test[:,0] != Y_test[:,1]]
Y_test.sort(axis=1)

In [None]:
from tensorflow.keras import activations
from tensorflow.keras import backend as K
from tensorflow.keras.layers import Layer

def squash(x, axis = -1):
    s_squared_norm = K.sum(K.square(x), axis, keepdims = True) + K.epsilon()
    scale = K.sqrt(s_squared_norm)/ (1 + s_squared_norm)
    return scale * x

#define our own softmax function instead of K.softmax
def softmax(x, axis = -1):
    ex = K.exp(x - K.max(x, axis = axis, keepdims = True))
    return ex/K.sum(ex, axis = axis, keepdims = True)

#A Capsule Implement with Pure Keras
class Capsule(Layer):
    def __init__(self, num_capsule, dim_capsule, routings = 3, share_weights = True, activation = 'squash', **kwargs):
        super(Capsule, self).__init__(**kwargs)
        self.num_capsule = num_capsule
        self.dim_capsule = dim_capsule
        self.routings = routings
        self.share_weights = share_weights
        if activation == 'squash':
            self.activation = squash
        else:
            self.activation = activations.get(activation)

    def build(self, input_shape):
        super(Capsule, self).build(input_shape)
        input_dim_capsule = input_shape[-1]
        if self.share_weights:
            self.W = self.add_weight(name='capsule_kernel',
                                     shape=(1, input_dim_capsule,
                                            self.num_capsule * self.dim_capsule),
                                     initializer='glorot_uniform',
                                     trainable=True)
        else:
            input_num_capsule = input_shape[-2]
            self.W = self.add_weight(name = 'capsule_kernel',
                                     shape = (input_num_capsule,
                                            input_dim_capsule,
                                            self.num_capsule * self.dim_capsule),
                                     initializer = 'glorot_uniform',
                                     trainable = True)

    def call(self, u_vecs):
        if self.share_weights:
            u_hat_vecs = K.conv1d(u_vecs, self.W)
        else:
            u_hat_vecs = K.local_conv1d(u_vecs, self.W, [1], [1])

        batch_size = K.shape(u_vecs)[0]
        input_num_capsule = K.shape(u_vecs)[1]
        u_hat_vecs = K.reshape(u_hat_vecs, (batch_size, input_num_capsule,
                                            self.num_capsule, self.dim_capsule))
        u_hat_vecs = K.permute_dimensions(u_hat_vecs, (0, 2, 1, 3))
        #final u_hat_vecs.shape = [None, num_capsule, input_num_capsule, dim_capsule]

        b = K.zeros_like(u_hat_vecs[:,:,:,0]) #shape = [None, num_capsule, input_num_capsule]
        for i in range(self.routings):
            c = softmax(b, 1)
            o = K.batch_dot(c, u_hat_vecs, [2, 2])
            if K.backend() == 'theano':
                o = K.sum(o, axis=1)
            if i < self.routings - 1:
                o = K.l2_normalize(o, -1)
                b = K.batch_dot(o, u_hat_vecs, [2, 3])
                if K.backend() == 'theano':
                    b = K.sum(b, axis = 1)

        return self.activation(o)

    def compute_output_shape(self, input_shape):
        return (None, self.num_capsule, self.dim_capsule)

In [None]:
import warnings
warnings.filterwarnings('ignore')

input_feature = Input(shape = (None,None,1))
cnn = Conv2D(128, (3, 3))(input_feature)
cnn1 = MaxPooling2D((2, 2))(cnn)
cnn1 = LeakyReLU(alpha = 0.001)(cnn1)

cnn2 = Conv2D(128, (3, 3))(cnn1)
cnn2 = MaxPooling2D((2, 2))(cnn2)
cnn2 = LeakyReLU(alpha = 0.001)(cnn2)

cnn = Conv2D(128, (3, 3))(cnn2)
cnn = LeakyReLU(alpha = 0.001)(cnn)

cnn = Reshape((-1, 128))(cnn)
capsule = Capsule(4, 16, 3, True)(cnn)
output = Lambda(lambda x: K.sqrt(K.sum(K.square(x), 2)), output_shape = (4, ))(capsule)

model = Model(inputs = input_feature, outputs = output)

In [None]:
model.compile(loss = lambda y_true,y_pred: y_true * K.relu(0.9 - y_pred)**2 + 0.26 * (1 - y_true) * K.relu(y_pred - 0.1)**2,
              optimizer = 'adam',
              metrics = ['accuracy'])

In [None]:
model.summary()

In [None]:
import tensorflow.keras as keras
import tensorflow as tf
checkpoint_path = "checkpoint/cp-cn-{epoch:04d}.ckpt"
cp_callback = keras.callbacks.ModelCheckpoint(filepath=checkpoint_path,save_weights_only=True,verbose=1)
model.compile(loss = lambda y_true,y_pred: y_true * K.relu(0.9 - y_pred)**2 + 0.26 * (1 - y_true) * K.relu(y_pred - 0.1)**2,
              optimizer = 'adam',
              metrics = ['accuracy',tf.keras.metrics.Precision(),tf.keras.metrics.Recall()])

In [None]:
history = model.fit(x_train, y_train, epochs = 100, batch_size = 256, validation_data = (x_test, y_test))

In [None]:
import matplotlib.pyplot as plt
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('MA-CapsNet-Multi-SNR5-EMODB-SMOTETomek')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.savefig('01-MA-CapsNet-loss-SMOTETomek.png', dpi = 300)
plt.show()

In [None]:
def F1(precision,recall):
    a=np.multiply(np.array(precision),np.array(recall))
    b=np.add(np.array(precision),np.array(recall))
    F1=((2 * a)/ b).tolist()
    return F1

In [None]:
plt.plot(history.history['acc'])
plt.plot(history.history['precision'])
plt.plot(history.history['recall'])
plt.plot(F1(history.history['precision'],history.history['recall']))
plt.plot(history.history['val_acc'])
plt.plot(history.history['val_precision'])
plt.plot(history.history['val_recall'])
plt.plot(F1(history.history['val_precision'],history.history['val_recall']))
plt.title('MA-CapsNet-Multi-SNR5-EMODB-SMOTETomek')
# plt.ylabel('acc')
plt.xlabel('epoch')
plt.legend(['acc', 'precision', 'recall', 'F1_score', 'val_acc', 'val_precision', 'val_recall','val_F1_score'])
plt.savefig('00-MA-CapsNet-All-Multi-SNR5-EMODB-SMOTETomek.png', dpi = 300)
plt.show()

In [None]:
plt.plot(history.history['acc'])
plt.plot(history.history['val_acc'])
plt.title('MA-CapsNet-Multi-SNR5-EMODB-SMOTETomek')
plt.ylabel('acc')
plt.xlabel('epoch')
plt.legend(['acc', 'val_acc'])
plt.savefig('02-MA-CapsNet-Acc-Multi-SNR5-EMODB-SMOTETomek.png', dpi = 300)
plt.show()

In [None]:
plt.plot(history.history['precision'])
plt.plot(history.history['val_precision'])
plt.title('MA-CapsNet-Multi-SNR5-EMODB-SMOTETomek')
plt.ylabel('precision')
plt.xlabel('epoch')
plt.legend(['precision','val_precision'])
plt.savefig('03-MA-CapsNet-precision-Multi-SNR5-EMODB-SMOTETomek.png', dpi = 300)
plt.show()

In [None]:
plt.plot(history.history['recall'])
plt.plot(history.history['val_recall'])
plt.title('MA-CapsNet-Multi-SNR5-EMODB-SMOTETomek')
plt.ylabel('recall')
plt.xlabel('epoch')
plt.legend(['recall','val_recall'])
plt.savefig('04-MA-CapsNet-recall-Multi-SNR5-EMODB-SMOTETomek.png', dpi = 300)
plt.show()

In [None]:
plt.plot(F1(history.history['precision'],history.history['recall']))
plt.plot(F1(history.history['val_precision'],history.history['val_recall']))
plt.title('MA-CapsNet-Multi-SNR5-EMODB-SMOTETomek')
plt.ylabel('F1_score')
plt.xlabel('epoch')
plt.legend([ 'F1_score', 'val_F1_score'])
plt.savefig('05-MA-CapsNet-F1-Score-Multi-SNR5-EMODB-SMOTETomek.png', dpi = 300)
plt.show()