<a href="https://colab.research.google.com/github/Ronit82/Multimodal-Sentiment-Analysis/blob/main/multimodal_emotion_analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Multi - Modal Emotion Analysis with IEMOCAP dataset

In [None]:
# Import modules
import os
import wave
import numpy as np
import pickle
import librosa
import gensim.downloader as api

In [None]:
# Import Modules
import tensorflow as tf
from keras.models import Model
from keras.layers import Input, LSTM, Flatten, Concatenate, Attention, Embedding, Dense, Activation, Dropout, Conv2D
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from sklearn.preprocessing import label_binarize

# 1. Helper functions

In [None]:
def split_wav(wav, emotions):
    (nchannels, sampwidth, framerate, nframes, comptype, compname), samples = wav

    left = samples[0::nchannels]
    right = samples[1::nchannels]

    frames = []
    for ie, e in enumerate(emotions):
        start = e['start']
        end = e['end']

        e['right'] = right[int(start * framerate):int(end * framerate)]
        e['left'] = left[int(start * framerate):int(end * framerate)]

        frames.append({'left': e['left'], 'right': e['right']})
    return frames

In [None]:
def get_field(data, key):
    return np.array([e[key] for e in data])

In [None]:
def pad_sequence_into_array(Xs, maxlen=None, truncating='post', padding='post', value=0.):

    Nsamples = len(Xs)
    if maxlen is None:
        lengths = [s.shape[0] for s in Xs]    # 'sequences' must be list, 's' must be numpy array, len(s) return the first dimension of s
        maxlen = np.max(lengths)

    Xout = np.ones(shape=[Nsamples, maxlen] + list(Xs[0].shape[1:]), dtype=Xs[0].dtype) * np.asarray(value, dtype=Xs[0].dtype)
    Mask = np.zeros(shape=[Nsamples, maxlen], dtype=Xout.dtype)
    for i in range(Nsamples):
        x = Xs[i]
        if truncating == 'pre':
            trunc = x[-maxlen:]
        elif truncating == 'post':
            trunc = x[:maxlen]
        else:
            raise ValueError("Truncating type '%s' not understood" % truncating)
        if padding == 'post':
            Xout[i, :len(trunc)] = trunc
            Mask[i, :len(trunc)] = 1
        elif padding == 'pre':
            Xout[i, -len(trunc):] = trunc
            Mask[i, -len(trunc):] = 1
        else:
            raise ValueError("Padding type '%s' not understood" % padding)
    return Xout, Mask

In [None]:
def convert_gt_from_array_to_list(gt_batch, gt_batch_mask=None):

    B, L = gt_batch.shape
    gt_batch = gt_batch.astype('int')
    gts = []
    for i in range(B):
        if gt_batch_mask is None:
            l = L
        else:
            l = int(gt_batch_mask[i, :].sum())
        gts.append(gt_batch[i, :l].tolist())
    return gts

In [None]:
def get_audio(path_to_wav, filename):
    wav = wave.open(path_to_wav + filename, mode="r")
    (nchannels, sampwidth, framerate, nframes, comptype, compname) = wav.getparams()
    content = wav.readframes(nframes)
    samples = np.fromstring(content, dtype=np.int16)
    return (nchannels, sampwidth, framerate, nframes, comptype, compname), samples

In [None]:
def get_transcriptions(path_to_transcriptions, filename):
    f = open(path_to_transcriptions + filename, 'r').read()
    f = np.array(f.split('\n'))
    transcription = {}
    for i in range(len(f) - 1):
        g = f[i]
        i1 = g.find(': ')
        i0 = g.find(' [')
        ind_id = g[:i0]
        ind_ts = g[i1+2:]
        transcription[ind_id] = ind_ts
    return transcription

In [None]:
def get_emotions(path_to_emotions, filename):
    f = open(path_to_emotions + filename, 'r').read()
    f = np.array(f.split('\n'))
    idx = f == ''
    idx_n = np.arange(len(f))[idx]
    emotion = []
    for i in range(len(idx_n) - 2):
        g = f[idx_n[i]+1:idx_n[i+1]]
        head = g[0]
        i0 = head.find(' - ')
        start_time = float(head[head.find('[') + 1:head.find(' - ')])
        end_time = float(head[head.find(' - ') + 3:head.find(']')])
        actor_id = head[head.find(filename[:-4]) + len(filename[:-4]) + 1:
                        head.find(filename[:-4]) + len(filename[:-4]) + 5]
        emo = head[head.find('\t[') - 3:head.find('\t[')]
        vad = head[head.find('\t[') + 1:]

        v = float(vad[1:7])
        a = float(vad[9:15])
        d = float(vad[17:23])

        j = 1
        emos = []
        while g[j][0] == "C":
            head = g[j]
            start_idx = head.find("\t") + 1
            evoluator_emo = []
            idx = head.find(";", start_idx)
            while idx != -1:
                evoluator_emo.append(head[start_idx:idx].strip().lower()[:3])
                start_idx = idx + 1
                idx = head.find(";", start_idx)
            emos.append(evoluator_emo)
            j += 1

        emotion.append({'start': start_time,
                        'end': end_time,
                        'id': filename[:-4] + '_' + actor_id,
                        'v': v,
                        'a': a,
                        'd': d,
                        'emotion': emo,
                        'emo_evo': emos})
    return emotion

In [None]:
def get_mocap_rot(path_to_mocap_rot, filename, start,end):
    f = open(path_to_mocap_rot + filename, 'r').read()
    f = np.array(f.split('\n'))
    mocap_rot = []
    mocap_rot_avg = []
    f = f[2:]
    counter = 0
    for data in f:
        counter+=1
        data2 = data.split(' ')
        if(len(data2)<2):
            continue
        if(float(data2[1])>start and float(data2[1])<end):
            mocap_rot_avg.append(np.array(data2[2:]).astype(np.float))

    mocap_rot_avg = np.array_split(np.array(mocap_rot_avg), 200)
    for spl in mocap_rot_avg:
        mocap_rot.append(np.mean(spl, axis=0))
    return np.array(mocap_rot)

In [None]:
def get_mocap_hand(path_to_mocap_hand, filename, start,end):
    f = open(path_to_mocap_hand + filename, 'r').read()
    f = np.array(f.split('\n'))
    mocap_hand = []
    mocap_hand_avg = []
    f = f[2:]
    counter = 0
    for data in f:
        counter+=1
        data2 = data.split(' ')
        if(len(data2)<2):
            continue
        if(float(data2[1])>start and float(data2[1])<end):
            mocap_hand_avg.append(np.array(data2[2:]).astype(np.float))

    mocap_hand_avg = np.array_split(np.array(mocap_hand_avg), 200)
    for spl in mocap_hand_avg:
        mocap_hand.append(np.mean(spl, axis=0))
    return np.array(mocap_hand)

In [None]:
def get_mocap_head(path_to_mocap_head, filename, start,end):
    f = open(path_to_mocap_head + filename, 'r').read()
    f = np.array(f.split('\n'))
    mocap_head = []
    mocap_head_avg = []
    f = f[2:]
    counter = 0
    for data in f:
        counter+=1
        data2 = data.split(' ')
        if(len(data2)<2):
            continue
        if(float(data2[1])>start and float(data2[1])<end):
            mocap_head_avg.append(np.array(data2[2:]).astype(np.float))

    mocap_head_avg = np.array_split(np.array(mocap_head_avg), 200)
    for spl in mocap_head_avg:
        mocap_head.append(np.mean(spl, axis=0))
    return np.array(mocap_head)

# 2. Load and Preprocess Dataset

### Load dataset

In [None]:
# code_path = r"D:\multimodal_emotion_analysis"
# print(code_path)
emotions_used = np.array(['ang', 'exc', 'neu', 'sad'])
# data_path = r"D:\multimodal_emotion_analysis\IEMOCAP_full_release\\"
# print(data_path)
sessions = ['Session1', 'Session2', 'Session3', 'Session4', 'Session5']
framerate = 16000

In [None]:
def read_iemocap_mocap():
    data = []
    ids = {}
    for session in sessions:
        path_to_wav = data_path + session + '/dialog/wav/'
        path_to_emotions = data_path + session + '/dialog/EmoEvaluation/'
        path_to_transcriptions = data_path + session + '/dialog/transcriptions/'
        path_to_mocap_hand = data_path + session + '/dialog/MOCAP_hand/'
        path_to_mocap_rot = data_path + session + '/dialog/MOCAP_rotated/'
        path_to_mocap_head = data_path + session + '/dialog/MOCAP_head/'

        files2 = os.listdir(path_to_wav)

        files = []
        for f in files2:
            if f.endswith(".wav"):
                if f[0] == '.':
                    files.append(f[2:-4])
                else:
                    files.append(f[:-4])


        for f in files:
            print(f)
            mocap_f = f
            if (f== 'Ses05M_script01_1b'):
                mocap_f = 'Ses05M_script01_1'

            wav = get_audio(path_to_wav, f + '.wav')
            transcriptions = get_transcriptions(path_to_transcriptions, f + '.txt')
            emotions = get_emotions(path_to_emotions, f + '.txt')
            sample = split_wav(wav, emotions)

            for ie, e in enumerate(emotions):
                '''if 'F' in e['id']:
                    e['signal'] = sample[ie]['left']
                else:
                    e['signal'] = sample[ie]['right']'''

                e['signal'] = sample[ie]['left']
                e.pop("left", None)
                e.pop("right", None)
                e['transcription'] = transcriptions[e['id']]
                e['mocap_hand'] = get_mocap_hand(path_to_mocap_hand, mocap_f + '.txt', e['start'], e['end'])
                e['mocap_rot'] = get_mocap_rot(path_to_mocap_rot, mocap_f + '.txt', e['start'], e['end'])
                e['mocap_head'] = get_mocap_head(path_to_mocap_head, mocap_f + '.txt', e['start'], e['end'])
                if e['emotion'] in emotions_used:
                    if e['id'] not in ids:
                        data.append(e)
                        ids[e['id']] = 1


    sort_key = get_field(data, "id")
    return np.array(data)[np.argsort(sort_key)]

In [None]:
# Run only once then use pickle file
# data = read_iemocap_mocap()
# data.shape

In [None]:
# Store data to pickle file
# with open(data_path +'data_collected.pickle', 'wb') as handle:
#     pickle.dump(data, handle, protocol=pickle.HIGHEST_PROTOCOL)

In [None]:
# Read data from picle file
file_path = '/kaggle/input/iemocap/data_collected.pickle'
with open(file_path, 'rb') as handle:
    data2 = pickle.load(handle)

# 3. Extract Features

### Text

In [None]:
# Load pre-trained Word2Vec embeddings
embedding_dim = 300
def load_embeddings(tokenizer):
    word_index = tokenizer.word_index
    word2vec_model = api.load("word2vec-google-news-300")

    return word_index, word2vec_model

In [None]:
# Create embedding matrix
def create_embedding_matrix(word_index, word2vec_model):
    embedding_matrix = np.zeros((len(word_index) + 1, embedding_dim))
    for word, i in word_index.items():
        if word in word2vec_model.key_to_index:
            embedding_matrix[i] = word2vec_model[word]
    return embedding_matrix

In [None]:
# with open(data_path +'embedding_matrix.pickle', 'wb') as handle:
#     pickle.dump(embedding_matrix, handle, protocol=pickle.HIGHEST_PROTOCOL)
# with open(data_path +'word_index.pickle', 'wb') as handle:
#     pickle.dump(word_index, handle, protocol=pickle.HIGHEST_PROTOCOL)

In [None]:
text = []

for ses_mod in data2:
    text.append(ses_mod['transcription'])

In [None]:
MAX_SEQUENCE_LENGTH = 50

tokenizer = Tokenizer()
tokenizer.fit_on_texts(text)

token_tr_X = tokenizer.texts_to_sequences(text)
x_train_text = []

x_train_text = pad_sequences(token_tr_X, maxlen=MAX_SEQUENCE_LENGTH)

In [None]:
# Run once then use pickle files
# word_index, word2vec_model = load_embeddings(tokenizer)
# embedding_matrix = create_embedding_matrix(word_index, word2vec_model)

In [None]:
file_path = r'/kaggle/input/iemocap/embedding_matrix.pickle'
with open(file_path, 'rb') as handle:
    embedding_matrix = pickle.load(handle)
file_path = r'/kaggle/input/iemocap/word_index.pickle'
with open(file_path, 'rb') as handle:
    word_index = pickle.load(handle)

#### Audio

In [None]:
def calculate_features(frames, freq):
    frames = frames.astype(np.float32)
    window_sec = 0.2
    hop_length = int(freq * window_sec / 2)
    features = librosa.feature.mfcc(y=frames, sr=freq, n_mfcc=13, hop_length=hop_length)
    return features

In [None]:
x_train_speech = []

counter = 0
for ses_mod in data2:
    x_head = ses_mod['signal']
    st_features = calculate_features(x_head, framerate)
    st_features, _ = pad_sequence_into_array(st_features, maxlen=100)
    x_train_speech.append(st_features.T)
    counter+=1
    if(counter%1000==0):
        print(counter)

In [None]:
x_train_speech = np.array(x_train_speech)
x_train_speech.shape

#### Mocap

In [None]:
x_train_mocap = []

counter = 0
for ses_mod in data2:
    x_head = ses_mod['mocap_head']
    if(x_head.shape != (200,18)):
        x_head = np.zeros((200,18))
    x_head[np.isnan(x_head)]=0
    x_hand = ses_mod['mocap_hand']
    if(x_hand.shape != (200,6)):
        x_hand = np.zeros((200,6))
    x_hand[np.isnan(x_hand)]=0
    x_rot = ses_mod['mocap_rot']
    if(x_rot.shape != (200,165)):
        x_rot = np.zeros((200,165))
    x_rot[np.isnan(x_rot)]=0
    x_mocap = np.concatenate((x_head, x_hand), axis=1)
    x_mocap = np.concatenate((x_mocap, x_rot), axis=1)
    x_train_mocap.append( x_mocap )

x_train_mocap = np.array(x_train_mocap)
x_train_mocap = x_train_mocap.reshape(-1,200,189,1)
x_train_mocap.shape

#### Load Labels

In [None]:
Y=[]
for ses_mod in data2:
    Y.append(ses_mod['emotion'])

Y = label_binarize(Y, classes = emotions_used)
Y.shape

# 4. Training, Testing, Validation Dataset Split

In [None]:
counter = 0
for ses_mod in data2:
    if (ses_mod['id'][:5]=="Ses05"):
        break
    counter+=1
print(counter)

In [None]:
xtrain_sp = x_train_speech[:3000]
xval_sp = x_train_speech[3000:3838]
xtest_sp = x_train_speech[3838:]

xtrain_tx = x_train_text[:3000]
xval_tx = x_train_text[3000:3838]
xtest_tx = x_train_text[3838:]

x_train_mocap2 = x_train_mocap.reshape(-1,200,189)
xtrain_mo = x_train_mocap[:3000]
xval_mo = x_train_mocap[3000:3838]
xtest_mo = x_train_mocap[3838:]

ytrain = Y[:3000]
yval = Y[3000:3838]
ytest = Y[3838:]

# 5. Individual Modality Training

### Text Model (LSTM)

In [None]:
model_text = Sequential()
model_text.add(Embedding(len(word_index) + 1, embedding_dim, input_length = MAX_SEQUENCE_LENGTH,
                    trainable = True))

model_text.build(input_shape=(None, MAX_SEQUENCE_LENGTH))
model_text.layers[0].set_weights([embedding_matrix])
model_text.layers[0].trainable = True

model_text.add(LSTM(units = 256, return_sequences=True, recurrent_dropout = 0.2))
model_text.add(Dropout(0.2))
model_text.add(LSTM(units = 256, return_sequences=False, recurrent_dropout = 0.2))
model_text.add(Dropout(0.2))
model_text.add(Dense(512))
model_text.add(Activation('relu'))
model_text.add(Dense(4))
model_text.add(Activation('softmax'))

In [None]:
model_text.compile(loss='categorical_crossentropy',optimizer='adam' ,metrics=['acc'])
model_text.summary()

In [None]:
tf.keras.utils.plot_model(model_text, to_file='text.png', show_shapes=True, show_layer_names=True)

In [None]:
hist_text = model_text.fit(xtrain_tx, ytrain, batch_size=64, epochs=30, verbose=1, validation_data=(xval_tx, yval))

In [None]:
print(max(hist_text.history['val_acc']))

### Speech Model (LSTM)

In [None]:
input_sequence = Input(shape=(100, 13))
query = LSTM(units=128, return_sequences=True, recurrent_dropout=0.2)(input_sequence)
value = LSTM(units=128, return_sequences=True, recurrent_dropout=0.2)(input_sequence)
attention = Attention()([query, value])
dropout = Dropout(0.2)(attention)
flatten = Flatten()(dropout)
dense1 = Dense(256)(flatten)
act = Activation('relu')(dense1)
dense2 = Dense(4)(act)
output = Activation('softmax')(dense2)
model_speech = Model(inputs=input_sequence, outputs=output)

In [None]:
model_speech.compile(loss='categorical_crossentropy',optimizer='adam' ,metrics=['acc'])
model_speech.summary()

In [None]:
tf.keras.utils.plot_model(model_speech, to_file='speech.png', show_shapes=True, show_layer_names=True)

In [None]:
hist_speech = model_speech.fit(xtrain_sp, ytrain, batch_size=64, epochs=30, verbose=1, validation_data=(xval_sp, yval))

In [None]:
print(max(hist_speech.history['val_acc']))

### Mocap Model (2D CNN)

In [None]:
model_mocap = Sequential()

model_mocap.add(Conv2D(32, 3, strides=(2, 2), padding='same', input_shape=(200, 165, 1)))
model_mocap.add(Dropout(0.2))
model_mocap.add(Activation('relu'))

model_mocap.add(Conv2D(64, 3, strides=(2, 2), padding='same'))
model_mocap.add(Dropout(0.2))
model_mocap.add(Activation('relu'))

model_mocap.add(Conv2D(64, 3, strides=(2, 2), padding='same'))
model_mocap.add(Dropout(0.2))
model_mocap.add(Activation('relu'))

model_mocap.add(Conv2D(128, 3, strides=(2, 2), padding='same'))
model_mocap.add(Dropout(0.2))
model_mocap.add(Activation('relu'))

model_mocap.add(Conv2D(128, 3, strides=(2, 2), padding='same'))
model_mocap.add(Dropout(0.2))
model_mocap.add(Activation('relu'))

model_mocap.add(Flatten())
model_mocap.add(Dropout(0.2))
model_mocap.add(Dense(256))

model_mocap.add(Activation('relu'))
model_mocap.add(Dropout(0.2))
model_mocap.add(Dense(4))
model_mocap.add(Activation('softmax'))

In [None]:
model_mocap.compile(loss='categorical_crossentropy',optimizer='adam' ,metrics=['acc'])
model_mocap.summary()

In [None]:
tf.keras.utils.plot_model(model_mocap, to_file='mocap.png', show_shapes=True, show_layer_names=True)

In [None]:
hist_mocap = model_mocap.fit(xtrain_mo, ytrain, batch_size=64, epochs=30, verbose=1, validation_data=(xval_mo, yval))

In [None]:
print(max(hist_mocap.history['val_acc']))

# 6. Ensemble Model

In [None]:
# Text
model_text1 = Sequential()
model_text1.add(Embedding(len(word_index) + 1, embedding_dim, input_length = MAX_SEQUENCE_LENGTH,
                    trainable = True))

model_text1.build(input_shape=(None, MAX_SEQUENCE_LENGTH))
model_text1.layers[0].set_weights([embedding_matrix])
model_text1.layers[0].trainable = True

model_text1.add(LSTM(units = 256, return_sequences=True, recurrent_dropout = 0.2))
model_text1.add(Dropout(0.2))
model_text1.add(LSTM(units = 256, return_sequences=False, recurrent_dropout = 0.2))
model_text1.add(Dropout(0.2))
model_text1.add(Dense(512))

In [None]:
# Audio
input_sequence1 = Input(shape=(100, 13))
query1 = LSTM(units=128, return_sequences=True, recurrent_dropout=0.2)(input_sequence1)
value1 = LSTM(units=128, return_sequences=True, recurrent_dropout=0.2)(input_sequence1)
attention1 = Attention()([query1, value1])
dropout1 = Dropout(0.2)(attention1)
flatten1 = Flatten()(dropout1)
dense11 = Dense(256)(flatten1)
act1 = Activation('relu')(dense11)
dense21 = Dense(4)(act1)
output1 = Activation('softmax')(dense21)
model_speech1 = Model(inputs=input_sequence1, outputs=output1)

In [None]:
# Mocap
model_mocap1 = Sequential()

model_mocap1.add(Conv2D(32, 3, strides=(2, 2), padding='same', input_shape=(200, 189, 1)))
model_mocap1.add(Dropout(0.2))
model_mocap1.add(Activation('relu'))

model_mocap1.add(Conv2D(64, 3, strides=(2, 2), padding='same'))
model_mocap1.add(Dropout(0.2))
model_mocap1.add(Activation('relu'))

model_mocap1.add(Conv2D(64, 3, strides=(2, 2), padding='same'))
model_mocap1.add(Dropout(0.2))
model_mocap1.add(Activation('relu'))

model_mocap1.add(Conv2D(128, 3, strides=(2, 2), padding='same'))
model_mocap1.add(Dropout(0.2))
model_mocap1.add(Activation('relu'))

model_mocap1.add(Conv2D(128, 3, strides=(2, 2), padding='same'))
model_mocap1.add(Dropout(0.2))
model_mocap1.add(Activation('relu'))

model_mocap1.add(Flatten())
model_mocap1.add(Dropout(0.2))
model_mocap1.add(Dense(256))

In [None]:
model_combined = Sequential()
model_combined.add(Concatenate([model_text1, model_speech1, model_mocap1]))
model_combined.add(Activation('relu'))
model_combined.add(Dense(256))
model_combined.add(Activation('relu'))
model_combined.add(Dense(4))
model_combined.add(Activation('softmax'))

In [None]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Concatenate, Activation, Dense

# Define input layers for each modality
text_input = Input(shape=(MAX_SEQUENCE_LENGTH,))
audio_input = Input(shape=(100, 13))
mocap_input = Input(shape=(200, 189, 1))

# Concatenate the outputs of the individual models
combined_output = Concatenate()([model_text1(text_input), model_speech1(audio_input), model_mocap1(mocap_input)])

# Add Activation and Dense layers
x = Activation('relu')(combined_output)
x = Dense(256)(x)
x = Activation('relu')(x)
output = Dense(4, activation='softmax')(x)

# Create the combined model
model_combined = Model(inputs=[text_input, audio_input, mocap_input], outputs=output)

In [None]:
model_combined.compile(loss='categorical_crossentropy',optimizer='adam' ,metrics=['acc'])
model_combined.summary()

In [None]:
tf.keras.utils.plot_model(model_combined, to_file='combined.png', show_shapes=True, show_layer_names=True)

In [None]:
hist_combined = model_combined.fit([xtrain_tx,xtrain_sp,xtrain_mo], ytrain,
                     batch_size=64, epochs=30, verbose=1,
                     validation_data=([xval_tx,xval_sp,xval_mo], yval))

In [None]:
print(max(hist_combined.history['val_acc']))

### Test

In [None]:
# Import Modules
import numpy as np
from sklearn.metrics import confusion_matrix, accuracy_score

In [None]:
# Predict labels for the test data
y_pred = model_combined.predict([xtest_tx, xtest_sp, xtest_mo])
y_pred_classes = np.argmax(y_pred, axis=1)

# Convert one-hot encoded true labels to categorical labels
y_true = np.argmax(ytest, axis=1)

# Compute confusion matrix
conf_matrix = confusion_matrix(y_true, y_pred_classes)

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

plt.figure(figsize=(10, 8))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', xticklabels=[0, 1, 2, 3], yticklabels=[0, 1, 2, 3])
plt.xlabel('Predicted labels')
plt.ylabel('True labels')
plt.title('Confusion Matrix')
plt.show()