# Emotion Recognition On Audio - CNN Model

## Importing needed libraries

In [1]:
!pip install tensorflow-addons
!pip install sentencepiece
!pip install tensorflow_hub
!pip install transformers
!pip install pytest


# System Libraries
import os
from pathlib import Path
import tarfile
from IPython.display import Audio, clear_output

# Handling Data Libraries
import pandas as pd
import numpy as np
import sentencepiece
from sklearn.model_selection import train_test_split
from sklearn import preprocessing

# Plot Libraries
import matplotlib.pyplot as plt
import plotly.express as px
import seaborn as sns

# Transformer models Libraries and utilities, metrics
from sklearn.metrics import classification_report, f1_score
from keras.utils.data_utils import get_file
from keras.utils.data_utils import get_file
from transformers import AlbertTokenizer, TFAlbertModel
import tensorflow as tf
from tensorflow.keras.layers import Input, Dense
import tensorflow_hub as hub
import tensorflow_addons as tfa
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.initializers import TruncatedNormal
from tensorflow.keras.losses import CategoricalCrossentropy
from tensorflow.keras.metrics import CategoricalAccuracy
from tensorflow.keras.utils import to_categorical

# Download of the chosen model and its tokenizer
print('Using TensorFlow version', tf.__version__)
tokenizer = AlbertTokenizer.from_pretrained("albert-base-v2")
albert = TFAlbertModel.from_pretrained("albert-base-v2")


random_seed = 42






















ModuleNotFoundError: No module named 'plotly'

## Preprocessing

### Dataset download

In [None]:
from google.colab import files

def download_dataset_from_kaggle_to_colab():
    files.upload()
    !ls -lha kaggle.json
    !pip install -q kaggle # Install kaggle API
    !mkdir -p ~/.kaggle
    !cp kaggle.json ~/.kaggle/
    !chmod 600 /root/.kaggle/kaggle.json
    !kaggle datasets download -d riccardopaolini/nlp-project-work
    !unzip nlp-project-work.zip
    clear_output()

download_dataset_from_kaggle_to_colab()

### Dataset conversion into table

In [None]:
def build_IEMOCAP_dataframe():
    folder = os.path.join(os.getcwd(), 'IEMOCAP/IEMOCAP')

    conv_id = 0

    df = []
    for session in os.listdir(folder):
        session_path = os.path.join(folder, session)
        # 'dialogue' folder contains Emotions and Transcripts
        # 'sentences' folder contains Audios

        trans_folder = os.path.join(session_path, 'dialog', 'transcriptions')

        for trans_name in os.listdir(trans_folder):
            if trans_name[:2] != '._':
                emo_path = os.path.join(session_path, 'dialog', 'EmoEvaluation', trans_name)
                with open(os.path.join(trans_folder, trans_name), encoding='utf8') as trans_file, open(emo_path, encoding='utf8') as emo_file:
                    conv_id += 1
                    turn_id = 0
                    for line in trans_file:
                        #print(line.split('_'))
                        audio_name, text = line.split(':')
                        if trans_name.split('.')[0] in audio_name:
                            turn_id += 1

                            wav_path = os.path.join(session_path, 'sentences', 'wav', trans_name.split('.')[0], audio_name.split(' ')[0] + '.wav')

                            reached = False
                            count_em = {'Anger': 0, 'Happiness': 0, 'Sadness': 0, 'Neutral': 0, 'Frustration': 0, 'Excited': 0, 'Fear': 0, 'Surprise': 0, 'Disgust': 0, 'Other': 0}
                            for emo_line in emo_file:
                                if audio_name.split(' ')[0] in emo_line:
                                    emotion, vad = emo_line.split('\t')[-2:]
                                    vad = vad[1:-2].split(',')
                                    reached = True
                                elif emo_line[0] == 'C' and reached:
                                    evaluator = emo_line.split(':')[0]
                                    emotions = emo_line.split(':')[1].split('(')[0].split(';')
                                    emotions = [em.strip() for em in emotions]
                                    for em in emotions:
                                        if em != '':
                                            count_em[em] += 1
                                elif reached:
                                    emo_file.seek(0)
                                    break
                                        

                            row = {'conv_id': conv_id, 
                                    'turn_id': turn_id, 
                                    'sentence': text.strip(),
                                    'path': wav_path,
                                    'emotion': emotion,
                                    'valence': float(vad[0]),
                                    'activation': float(vad[1]),
                                    'dominance': float(vad[2])
                                    }
                            
                            df.append(dict(**row, **count_em))

    return pd.DataFrame(df)

df = build_IEMOCAP_dataframe()
df.head()

### Dataset inspection

In [None]:
X = df["sentence"].copy()
y = df["emotion"].copy()


# drawing plot of the sentences length distribution

lengths = [len(t.split(' ')) for t in X]
quantile = 0.95
thresh = int(np.quantile(lengths, quantile))
plt.hist(lengths, bins = len(set(lengths)))
plt.title(f"Sentence length distribution, thresh: (quantile ={quantile} -->{thresh} words) ")
plt.axvline(x = thresh, color = 'r', label = 'axvline - full height')
plt.show()

# drawing plot of the emotions distribution
plt.hist(y, bins = len(set(lengths)))
plt.title("Emotions distribution")
plt.show()

### Dataset split

In [None]:
from sklearn.model_selection import train_test_split


X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1, random_state=random_seed)# stratify=y)
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.11, random_state=random_seed)

In [None]:
plt.hist(y_train, label="train set")
plt.legend()
plt.show()
plt.hist(y_val, label = "val set")
plt.legend()
plt.show()
plt.hist(y_test, label = "test set")
plt.legend()
plt.show()

## ALBERT

### Tokenization

In [None]:
# setting length cut-limit for the sentences
maxlen = thresh

x_train_albert = tokenizer(
    text = X_train.tolist(),
    add_special_tokens=True,
    max_length = maxlen,
    truncation = True,
    padding = 'max_length',
    return_tensors='tf',
    return_token_type_ids = False,
    return_attention_mask = True,
    verbose = True
)

x_val_albert = tokenizer(
    text = X_val.tolist(),
    add_special_tokens=True,
    max_length = maxlen,
    truncation = True,
    padding = 'max_length',
    return_tensors='tf',
    return_token_type_ids = False,
    return_attention_mask = True,
    verbose = True
)

x_test_albert = tokenizer(
    text = X_test.tolist(),
    add_special_tokens=True,
    max_length = maxlen,
    truncation = True,
    padding = "max_length",
    return_tensors='tf',
    return_token_type_ids = False,
    return_attention_mask = True,
    verbose = True
)
     

### One-hot encoding of labels

In [None]:
encoded_dict= {'ang':0, 'dis':1, 'exc':2, 'fea':3, 'fru':4, 'hap':5,'neu':6, 'oth':7, 'sad':8, 'sur':9, 'xxx':10}

y_train_albert = y_train.map(encoded_dict)

y_test_albert = y_test.map(encoded_dict)

y_val_albert = y_val.map(encoded_dict)

print("Before one-hot encoding: \n")
print(y_train)
print("Label count by class: \n")
print(np.unique(y_train,return_counts=True))

print("After one-hot encoding: \n")
print(y_train_albert) 
print("Label count by class: \n")
print(np.unique(y_train_albert,return_counts=True))


# testing if the label vector has still same dimension
assert len(y_train) == len(y_train_albert)
assert len(y_val) == len(y_val_albert)
assert len(y_test) == len(y_test_albert)

# checking if new vectors contain only integers
assert np.array_equal(y_train_albert, y_train_albert.astype(int))
assert np.array_equal(y_val_albert, y_val_albert.astype(int))
assert np.array_equal(y_test_albert, y_test_albert.astype(int))


### Model definition

In [None]:
# input
input_ids = Input(shape=(maxlen,), dtype=tf.int32, name="input_ids")
input_mask = Input(shape=(maxlen,), dtype=tf.int32, name="attention_mask")
# pretrained 
embeddings = albert([input_ids, input_mask])[0] #(0 is the last hidden layer, 1 means pooler output)
# last layers and output
out = tf.keras.layers.GlobalMaxPool1D()(embeddings)
out = Dense(128, activation='relu')(out)
out = tf.keras.layers.Dropout(0.1)(out)
out = Dense(32, activation = 'relu')(out)
y = Dense(11, activation = 'Softmax')(out)

model = tf.keras.Model(inputs=[input_ids, input_mask], outputs = y)
model.layers[2].trainable = True

In [None]:
model.summary()

In [None]:
tf.keras.utils.plot_model(model,show_shapes=True)

### Model training

In [None]:
# utility to plot train history
def plot_history(model_history,keys):
    m,val_m = keys
    plt.plot(range(1, len(model_history.history[m])+1), model_history.history[m])
    plt.plot(range(1, len(model_history.history[val_m])+1),model_history.history[val_m])
    plt.ylabel(m)
    plt.xlabel('epoch')
    plt.legend(['train', 'validation'], loc='upper left')
    plt.show()

#### Training settings

In [None]:
optimizer=Adam(
    learning_rate = 1e-05, # this learning rate is for bert model, taken from hugging face site
)

# set loss and metrics 
loss = 'categorical_crossentropy'#CategoricalCrossentropy(from_logits = True)
metric = CategoricalAccuracy(name='balanced_accuracy',dtype='float32')
f1 = tfa.metrics.F1Score(num_classes=11,average='macro')

# compile the model
model.compile(
    optimizer = optimizer,
    loss = loss,
    metrics = [metric,f1]
)

#### Training

In [None]:
albert_history = model.fit(
    x = {'input_ids': x_train_albert['input_ids'],'attention_mask':x_train_albert['attention_mask']},
    y = to_categorical(y_train_albert),
    validation_data = ({'input_ids':x_val_albert['input_ids'],'attention_mask':x_val_albert['attention_mask']}, to_categorical(y_val_albert)),
    epochs=7, 
    batch_size=80,
    callbacks = [tf.keras.callbacks.EarlyStopping(monitor='val_f1_score', patience=2, verbose=1, mode='max', restore_best_weights=True)]
)

In [None]:
plot_history(albert_history,['f1_score','val_f1_score'])

### Testing

In [None]:
y_pred_albert = model.predict({'input_ids':x_test_albert['input_ids'],'attention_mask':x_test_albert['attention_mask']})
# convertion of the guessed label in integer labels
y_pred_albert = np.argmax(y_pred_albert, axis = 1)

f1_macro = f1_score(y_test_albert,y_pred_albert, average='macro')

print("The final F1-score macro avg obtained on the test set is F1 = {}".format(f1_macro))

In [None]:
print(classification_report(y_test_albert,y_pred_albert))

In [None]:
from sklearn.metrics import accuracy_score
accuracy_score(y_test_albert,y_pred_albert)

## Bi-LSTM

In [None]:
!pip install text_hammer
import text_hammer as th
import re
tf.random.set_seed(random_seed)

def text_lite_preprocessing(data, col_name):
  column = col_name
  df = data.copy().to_frame()
  df[column] = df[column].progress_apply(lambda x: str(x).lower())
  df[column] = df[column].progress_apply(lambda x: th.cont_exp(x)) # you're --> you are
  df[column] = df[column].progress_apply(lambda x: th.remove_special_chars(x))
  df[column] = df[column].progress_apply(lambda x: th.remove_accented_chars(x))
  df[column] = df[column].progress_apply(lambda x: re.sub(r'\d', "number", x))
  df[column] = df[column].progress_apply(lambda x: re.sub("_", "number", x))
  return (df)

In [None]:
train_clean_litemodel = text_lite_preprocessing(X_train, "sentence")
test_clean_litemodel = text_lite_preprocessing(X_test, "sentence")
val_clean_litemodel = text_lite_preprocessing(X_val, "sentence")

In [None]:
print(train_clean_litemodel)

### Tokenization

In [None]:
from tensorflow.keras.preprocessing.text import Tokenizer
# creating tokenizer and fitting it on the training test
tokenizer = Tokenizer(num_words = None, oov_token='', split=' ', lower=False)
tokenizer.fit_on_texts(X)
print(tokenizer.get_config())


# printing a tokenization example
print("## Before tokenization: ")
print(train_clean_litemodel.iloc[2])
print("## After tokenization: ")
print(tokenizer.texts_to_sequences(train_clean_litemodel.sentence)[2])

In [None]:
from tensorflow.keras.preprocessing.sequence import pad_sequences

def get_sequences(tokenizer, dataset, maxlen):
  sequences = tokenizer.texts_to_sequences(dataset)
  print(sequences)
  padded = pad_sequences(sequences, truncating = 'post', padding = 'post', maxlen= maxlen)
  return padded



In [None]:
padded_train_seq = get_sequences(tokenizer, train_clean_litemodel.sentence, maxlen=thresh)
padded_val_seq = get_sequences(tokenizer, val_clean_litemodel.sentence, maxlen=thresh)
padded_test_seq = get_sequences(tokenizer, test_clean_litemodel.sentence, maxlen=thresh)

### One-hot encoding of labels

In [None]:
class_to_index = {'ang':0, 'dis':1, 'exc':2, 'fea':3, 'fru':4, 'hap':5,'neu':6, 'oth':7, 'sad':8, 'sur':9, 'xxx':10}

names_to_ids = lambda labels_to_conv: np.array([class_to_index.get(x) for x in labels_to_conv])

y_train_litemodel = names_to_ids(y_train)
y_val_litemodel = names_to_ids(y_val)
y_test_litemodel =  names_to_ids(y_test)

print(len(y_train_litemodel))

### Model definition

In [None]:

model = tf.keras.models.Sequential([
        tf.keras.layers.Embedding(tokenizer.document_count, 64, input_length=thresh),
        tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(64, return_sequences=True)),
        tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(64)),
        tf.keras.layers.Dense(11,activation='Softmax')
        
])


In [None]:
model.summary()

In [None]:
tf.keras.utils.plot_model(model)

### Model training

In [None]:
# utility to plot train history
def plot_history(model_history,keys):
    m,val_m = keys
    plt.plot(range(1, len(model_history.history[m])+1), model_history.history[m])
    plt.plot(range(1, len(model_history.history[val_m])+1),model_history.history[val_m])
    plt.ylabel(m)
    plt.xlabel('epoch')
    plt.legend(['train', 'validation'], loc='upper left')
    plt.show()

#### Training settings

In [None]:
f1 = tfa.metrics.F1Score(num_classes=11,average='macro')
accuracy = 'accuracy'

model.compile(
    loss = 'categorical_crossentropy',
    optimizer = Adam(learning_rate=5e-3),
    metrics = [accuracy,f1]
)

#### Training

In [None]:
bilstm_history = model.fit(
    padded_train_seq, to_categorical(y_train_litemodel),
    validation_data=(padded_val_seq, to_categorical(y_val_litemodel)),
    epochs =15,
    callbacks=[tf.keras.callbacks.EarlyStopping(monitor='val_f1_score', patience=3, verbose=1, mode='max', restore_best_weights=True)],
    batch_size=70
)

In [None]:
plot_history(bilstm_history,['f1_score','val_f1_score'])

### Testing

In [None]:
y_pred_litemodel = model.predict(padded_test_seq)
# convertion of the guessed label in integer labels
y_pred_litemodel = np.argmax(y_pred_litemodel, axis = 1)

In [None]:
f1_macro = f1_score(y_test_litemodel,y_pred_litemodel, average='macro')

print("The final F1-score macro avg obtained on the test set is F1 = {}".format(f1_macro))

In [None]:
from sklearn.metrics import classification_report

print(classification_report(y_test_litemodel,y_pred_litemodel))
     

In [None]:
from sklearn.metrics import accuracy_score
accuracy_score(y_test_litemodel,y_pred_litemodel)