### Import libraries

In [None]:
import os
import pandas as pd
import numpy as np
import glob
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
from transformers import Wav2Vec2FeatureExtractor, Wav2Vec2Model
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.model_selection import train_test_split
import soundfile as sf

import tensorflow as tf
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
    except RuntimeError as e:
        print(e)

from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Dense, LSTM, Dropout, Flatten, Input, Bidirectional, Concatenate
import torchaudio
from transformers import Wav2Vec2ForCTC, Wav2Vec2Processor
from transformers import RobertaTokenizer, TFRobertaModel
from tensorflow.keras.layers import MultiHeadAttention
from tensorflow.keras.optimizers import Adam
from tqdm import tqdm

In [None]:
np.random.seed(2023)
tf.random.set_seed(2023)

### Load the Audio data

In [None]:
# Load data and preprocess
def load_data(directory, csv_file):
    df = pd.read_csv(csv_file)
    file_paths = []
    
    for _, row in df.iterrows():
        subdir = f"{row['segment_id'][:15]}"
        file_paths.append(os.path.join(directory, subdir, 'WAV_' + f"{row['segment_id']}.wav").replace('\\', '/'))

    labels = df['emotion'].values
    return file_paths, labels

train_paths, train_labels = load_data('./data/KEMDy20ERCNew/train', './data/KEMDy20ERCNew/train_labels.csv')
val_paths, val_labels = load_data('./data/KEMDy20ERCNew/val', './data/KEMDy20ERCNew/val_labels.csv')
test_paths, test_labels = load_data('./data/KEMDy20ERCNew/test', './data/KEMDy20ERCNew/test_labels.csv')

- Label Encoding (train_labels, val_labels, test_labels)

In [None]:
from sklearn.preprocessing import LabelEncoder

encoder = LabelEncoder()

all_labels = np.concatenate((train_labels, val_labels, test_labels), axis=0)

encoder.fit(all_labels)

train_labels = encoder.transform(train_labels)
val_labels = encoder.transform(val_labels)
test_labels = encoder.transform(test_labels)

### Extract the Audio features

In [None]:
# Feature extraction using Wav2Vec2
processor = Wav2Vec2Processor.from_pretrained("facebook/wav2vec2-base-960h")
model = Wav2Vec2ForCTC.from_pretrained("facebook/wav2vec2-base-960h")

def pad_sequences(sequences, max_len=None, padding_value=0.0):
    if max_len is None:
        max_len = max(len(seq) for seq in sequences)
    
    padded_sequences = np.full((len(sequences), max_len, sequences[0].shape[1]), padding_value)
    for i, seq in enumerate(sequences):
        padded_sequences[i, :len(seq)] = seq
    
    return padded_sequences

def extract_features(file_paths):
    features = []
    
    for file_path in file_paths:
        speech, sample_rate = torchaudio.load(file_path)
        speech = speech.squeeze(0)  # Remove the unnecessary channel dimension
        inputs = processor(speech, return_tensors="pt", padding=True, sampling_rate=sample_rate)
        with torch.no_grad():
            outputs = model(**inputs)
            logits = outputs.logits.squeeze(0).cpu().numpy()  
            features.append(logits)
    
    return pad_sequences(features)  # Use pad_sequences instead of np.stack

train_features = extract_features(train_paths)
val_features = extract_features(val_paths)
test_features = extract_features(test_paths)

In [None]:
min_timesteps = min(train_features.shape[1], val_features.shape[1], test_features.shape[1])

train_features = train_features[:, :min_timesteps, :]
val_features = val_features[:, :min_timesteps, :]
test_features = test_features[:, :min_timesteps, :]

In [None]:
train_features.shape, val_features.shape, test_features.shape

### Load the Text data

In [None]:
import os

def load_txt_data(folder_path):
    txt_data = []
    for root, _, files in os.walk(folder_path):
        for file in files:
            if file.endswith(".txt"):
                with open(os.path.join(root, file), "r", encoding="cp949") as f:
                    lines = f.readlines()
                    for line in lines:
                        txt_data.append(line.strip())
    return txt_data

train_text_data = load_txt_data('./data/KEMDy20ERCNew/train')
val_text_data = load_txt_data('./data/KEMDy20ERCNew/val')
test_text_data = load_txt_data('./data/KEMDy20ERCNew/test')

In [None]:
# RoBERTa sub-model for text data
roberta_model = TFRobertaModel.from_pretrained("roberta-large")
tokenizer = RobertaTokenizer.from_pretrained("roberta-large")

MAX_LENGTH = 128  # Adjust this value according to your GPU memory capacity

# Encode the text data using the RoBERTa tokenizer
train_input_ids = tokenizer(train_text_data, padding=True, truncation=True, max_length=MAX_LENGTH, return_tensors="tf")["input_ids"]
val_input_ids = tokenizer(val_text_data, padding=True, truncation=True, max_length=MAX_LENGTH, return_tensors="tf")["input_ids"]
test_input_ids = tokenizer(test_text_data, padding=True, truncation=True, max_length=MAX_LENGTH, return_tensors="tf")["input_ids"]

# Freeze the RoBERTa layers
for layer in roberta_model.layers:
    layer.trainable = False

### Load the Bio-signal (EDA, TEMP)
- Extract the features using tsfresh

In [None]:
from tsfresh import extract_features

train_folder = "./data/KEMDy20ERCNew/train"

train_eda_data = []
train_temp_data = []

id_counter = 0

for script_folder in os.listdir(train_folder):
    script_path = os.path.join(train_folder, script_folder)
    if os.path.isdir(script_path):
        for csv_file in os.listdir(script_path):
            if "EDA" in csv_file:
                eda_csv_path = os.path.join(script_path, csv_file)
                eda_df = pd.read_csv(eda_csv_path)
                eda_df["id"] = id_counter
                eda_df["time"] = eda_df.index
                train_eda_data.append(eda_df[["id", "time", "eda_value"]])
                id_counter += 1
            elif "TEMP" in csv_file:
                temp_csv_path = os.path.join(script_path, csv_file)
                temp_df = pd.read_csv(temp_csv_path)
                temp_df["id"] = id_counter
                temp_df["time"] = temp_df.index
                train_temp_data.append(temp_df[["id", "time", "temp_value"]])
                id_counter += 1

# Concatenate all the EDA and Temp data
train_eda_data = pd.concat(train_eda_data)
train_temp_data = pd.concat(train_temp_data)

# Extract features using tsfresh
train_eda_features = extract_features(train_eda_data, column_id="id", column_sort="time")
train_temp_features = extract_features(train_temp_data, column_id="id", column_sort="time")

In [None]:
val_folder = "./data/KEMDy20ERCNew/val"

val_eda_data = []
val_temp_data = []

id_counter = 0

for script_folder in os.listdir(val_folder):
    script_path = os.path.join(val_folder, script_folder)
    if os.path.isdir(script_path):
        for csv_file in os.listdir(script_path):
            if "EDA" in csv_file:
                eda_csv_path = os.path.join(script_path, csv_file)
                eda_df = pd.read_csv(eda_csv_path)
                eda_df["id"] = id_counter
                eda_df["time"] = eda_df.index
                val_eda_data.append(eda_df[["id", "time", "eda_value"]])
                id_counter += 1
            elif "TEMP" in csv_file:
                temp_csv_path = os.path.join(script_path, csv_file)
                temp_df = pd.read_csv(temp_csv_path)
                temp_df["id"] = id_counter
                temp_df["time"] = temp_df.index
                val_temp_data.append(temp_df[["id", "time", "temp_value"]])
                id_counter += 1

# Concatenate all the EDA and Temp data
val_eda_data = pd.concat(val_eda_data)
val_temp_data = pd.concat(val_temp_data)

# Extract features using tsfresh
val_eda_features = extract_features(val_eda_data, column_id="id", column_sort="time")
val_temp_features = extract_features(val_temp_data, column_id="id", column_sort="time")

In [None]:
test_folder = "./data/KEMDy20ERCNew/test"

test_eda_data = []
test_temp_data = []

id_counter = 0

for script_folder in os.listdir(test_folder):
    script_path = os.path.join(test_folder, script_folder)
    if os.path.isdir(script_path):
        for csv_file in os.listdir(script_path):
            if "EDA" in csv_file:
                eda_csv_path = os.path.join(script_path, csv_file)
                eda_df = pd.read_csv(eda_csv_path)
                eda_df["id"] = id_counter
                eda_df["time"] = eda_df.index
                test_eda_data.append(eda_df[["id", "time", "eda_value"]])
                id_counter += 1
            elif "TEMP" in csv_file:
                temp_csv_path = os.path.join(script_path, csv_file)
                temp_df = pd.read_csv(temp_csv_path)
                temp_df["id"] = id_counter
                temp_df["time"] = temp_df.index
                test_temp_data.append(temp_df[["id", "time", "temp_value"]])
                id_counter += 1

# Concatenate all the EDA and Temp data
test_eda_data = pd.concat(test_eda_data)
test_temp_data = pd.concat(test_temp_data)

# Extract features using tsfresh
test_eda_features = extract_features(test_eda_data, column_id="id", column_sort="time")
test_temp_features = extract_features(test_temp_data, column_id="id", column_sort="time")

In [None]:
from sklearn.preprocessing import MinMaxScaler
from sklearn.impute import SimpleImputer

train_eda_features.replace([np.inf, -np.inf], np.nan, inplace=True)
train_temp_features.replace([np.inf, -np.inf], np.nan, inplace=True)

# Fill missing values using the mean value of the column
imputer = SimpleImputer(strategy="mean")
train_eda_features_filled = imputer.fit_transform(train_eda_features)
train_temp_features_filled = imputer.fit_transform(train_temp_features)

# Normalize data to the range [0, 1]
scaler = MinMaxScaler()
train_eda_features_norm = scaler.fit_transform(train_eda_features_filled)
train_temp_features_norm = scaler.fit_transform(train_temp_features_filled)

In [None]:
val_eda_features.replace([np.inf, -np.inf], np.nan, inplace=True)
val_temp_features.replace([np.inf, -np.inf], np.nan, inplace=True)

# Fill missing values using the mean value of the column
imputer = SimpleImputer(strategy="mean")
val_eda_features_filled = imputer.fit_transform(val_eda_features)
val_temp_features_filled = imputer.fit_transform(val_temp_features)

# Normalize data to the range [0, 1]
scaler = MinMaxScaler()
val_eda_features_norm = scaler.fit_transform(val_eda_features_filled)
val_temp_features_norm = scaler.fit_transform(val_temp_features_filled)

In [None]:
test_eda_features.replace([np.inf, -np.inf], np.nan, inplace=True)
test_temp_features.replace([np.inf, -np.inf], np.nan, inplace=True)

# Fill missing values using the mean value of the column
imputer = SimpleImputer(strategy="mean")
test_eda_features_filled = imputer.fit_transform(test_eda_features)
test_temp_features_filled = imputer.fit_transform(test_temp_features)

# Normalize data to the range [0, 1]
scaler = MinMaxScaler()
test_eda_features_norm = scaler.fit_transform(test_eda_features_filled)
test_temp_features_norm = scaler.fit_transform(test_temp_features_filled)

### Borderline SMOTE for train data

In [None]:
from imblearn.over_sampling import BorderlineSMOTE

text_smote = BorderlineSMOTE(k_neighbors=5, random_state=2023)
audio_smote = BorderlineSMOTE(k_neighbors=5, random_state=2023)
bio_smote = BorderlineSMOTE(k_neighbors=5, random_state=2023)

text_train_resampled, labels_train_resampled = text_smote.fit_resample(train_input_ids, train_labels)

# Reshape audio data to 2D
train_features_2d = train_features.reshape(train_features.shape[0], -1)

# Apply Borderline SMOTE
audio_train_resampled, _ = audio_smote.fit_resample(train_features_2d, train_labels)

# Reshape audio data back to original 3D shape
audio_train_resampled = audio_train_resampled.reshape(-1, train_features.shape[1], train_features.shape[2])

# Apply Borderline SMOTE on bio signal data
eda_train_resampled, _ = bio_smote.fit_resample(train_eda_features_norm, train_labels)
temp_train_resampled, _ = bio_smote.fit_resample(train_temp_features_norm, train_labels)

In [None]:
# Reshape the data to have a time dimension
eda_train_resampled_3d = np.expand_dims(eda_train_resampled, axis=1)
temp_train_resampled_3d = np.expand_dims(temp_train_resampled, axis=1)

val_eda_features_3d = np.expand_dims(val_eda_features_norm, axis=1)
val_temp_features_3d = np.expand_dims(val_temp_features_norm, axis=1)

test_eda_features_3d = np.expand_dims(test_eda_features_norm, axis=1)
test_temp_features_3d = np.expand_dims(test_temp_features_norm, axis=1)

In [None]:
# Find the minimum length of EDA signals across all datasets
ead_min_len = min(eda_train_resampled_3d.shape[2], val_eda_features_3d.shape[2], test_eda_features_3d.shape[2])

# Trim the EDA signals to the minimum length
eda_train_resampled_3d = eda_train_resampled_3d[:, :, :ead_min_len]
val_eda_features_3d = val_eda_features_3d[:, :, :ead_min_len]
test_eda_features_3d = test_eda_features_3d[:, :, :ead_min_len]

In [None]:
# Find the minimum length of EDA signals across all datasets
temp_min_len = min(temp_train_resampled_3d.shape[2], val_temp_features_3d.shape[2], test_temp_features_3d.shape[2])

# Trim the EDA signals to the minimum length
temp_train_resampled_3d = temp_train_resampled_3d[:, :, :temp_min_len]
val_temp_features_3d = val_temp_features_3d[:, :, :temp_min_len]
test_temp_features_3d = test_temp_features_3d[:, :, :temp_min_len]

In [None]:
temp_train_resampled_3d.shape, val_temp_features_3d.shape, test_temp_features_3d.shape

### Bio-signal Model

In [None]:
def create_eda_model(eda_input_shape):
    eda_input = Input(shape=eda_input_shape)
    x = Bidirectional(LSTM(64, return_sequences=True))(eda_input)
    x = Dropout(0.5)(x)
    x = Bidirectional(LSTM(64))(x)
    x = Dropout(0.5)(x)
    x = Dense(32, activation=tf.nn.gelu)(x)

    return Model(inputs=eda_input, outputs=x)

def create_temp_model(temp_input_shape):
    temp_input = Input(shape=temp_input_shape)
    x = Bidirectional(LSTM(64, return_sequences=True))(temp_input)
    x = Dropout(0.5)(x)
    x = Bidirectional(LSTM(64))(x)
    x = Dropout(0.5)(x)
    x = Dense(32, activation=tf.nn.gelu)(x)

    return Model(inputs=temp_input, outputs=x)

# Update the input shapes
eda_input_shape = (1, eda_train_resampled_3d.shape[2])
temp_input_shape = (1, temp_train_resampled_3d.shape[2])

eda_model = create_eda_model(eda_input_shape)
temp_model = create_temp_model(temp_input_shape)

### Text Model

In [None]:
def create_roberta_text_model():
    text_input_ids = Input(shape=(MAX_LENGTH,), dtype=tf.int32)
    roberta_output = roberta_model(text_input_ids)
    roberta_embeddings = roberta_output[0][:, 0, :]
    text_roberta = Dense(32, activation=tf.nn.gelu)(roberta_embeddings)
    return tf.keras.Model(inputs=text_input_ids, outputs=text_roberta)

text_model = create_roberta_text_model()

### Audio Model

In [None]:
audio_input_shape = (train_features.shape[1], train_features.shape[2])

lstm_model = Sequential([
    Input(shape=audio_input_shape),
    Bidirectional(LSTM(64, return_sequences=True)),
    Dropout(0.5),
    Bidirectional(LSTM(64)),
    Dropout(0.5),
    Dense(32, activation=tf.nn.gelu)
])

# MLP-Mixer Model
from tensorflow.keras.layers import Layer, Add
class MixerLayer(Layer):
    def __init__(self, tokens_mlp_dim, channels_mlp_dim):
        super().__init__()
        self.tokens_mlp_dim = tokens_mlp_dim
        self.channels_mlp_dim = channels_mlp_dim

    def build(self, input_shape):
        self.layer_norm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layer_norm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.dense1 = Dense(self.tokens_mlp_dim, activation=tf.nn.gelu)
        self.dense2 = Dense(input_shape[1], activation=tf.nn.gelu)
        self.dense3 = Dense(self.channels_mlp_dim, activation=tf.nn.gelu)
        self.dense4 = Dense(input_shape[2], activation=tf.nn.gelu)
        
    def call(self, inputs):
        # Token mixing
        x = self.layer_norm1(inputs)
        x_t = tf.transpose(x, perm=[0, 2, 1])
        x_t = self.dense1(x_t)
        x_t = self.dense2(x_t)
        x_t = tf.transpose(x_t, perm=[0, 2, 1])
        x = Add()([x, x_t])

        # Channel mixing
        y = self.layer_norm2(x)
        y = self.dense3(y)
        y = self.dense4(y)
        y = Add()([x, y])

        return tf.keras.layers.Flatten()(y)  # Add Flatten layer here
    
# MLP-Mixer model
mlp_mixer_model = Sequential([
    Input(shape=audio_input_shape),
    MixerLayer(tokens_mlp_dim=64, channels_mlp_dim=64),
    Flatten(),
    Dense(32, activation=tf.nn.gelu)
])

### Combined Model (Audio+Text+Bio-signal)

In [None]:
class CombinedModelWithTextAndBioSignals(tf.keras.Model):
    def __init__(self, text_model, lstm_model, mlp_mixer_model, eda_model, temp_model):
        super(CombinedModelWithTextAndBioSignals, self).__init__()
        self.text_model = text_model
        self.lstm_model = lstm_model
        self.mlp_mixer_model = mlp_mixer_model
        self.eda_model = eda_model
        self.temp_model = temp_model
        
        self.combine_layer = Dense(64, activation=tf.nn.gelu)
        self.dropout = Dropout(0.5)
        self.multihead_attention = MultiHeadAttention(num_heads=4, key_dim=32)
        self.classifier = Dense(7, activation='softmax')

    def call(self, inputs):
        text_input, audio_input, eda_input, temp_input = inputs
        text_output = self.text_model(text_input)
        lstm_output = self.lstm_model(audio_input)
        mlp_mixer_output = self.mlp_mixer_model(audio_input)
        eda_output = self.eda_model(eda_input)
        temp_output = self.temp_model(temp_input)
        x = tf.concat([text_output, lstm_output, mlp_mixer_output, eda_output, temp_output], axis=-1)
        x = self.combine_layer(x)
        x = self.dropout(x)
        x = tf.expand_dims(x, axis=1)  # Add sequence dimension
        x = self.multihead_attention(query=x, key=x, value=x)
        x = tf.squeeze(x, axis=1)  # Remove the sequence dimension 
        x = self.classifier(x)
        return x

# Create the combined model
combined_model_with_text_and_bio_signals = CombinedModelWithTextAndBioSignals(text_model, lstm_model, mlp_mixer_model, eda_model, temp_model)

### Model Train and Evaluate

In [None]:
def train_and_evaluate(model, model_name):
    
    # Set the learning rate for the Adam optimizer
    learning_rate = 5e-4  # Change this value to adjust the learning rate
    optimizer = Adam(lr=learning_rate)
    
    model.compile(loss='sparse_categorical_crossentropy', optimizer=optimizer, metrics=['accuracy'])

    checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(f"{model_name}_best_weights.h5", monitor='val_loss', 
                                                             save_best_only=True, mode='min', save_weights_only=True)

    model.fit((text_train_resampled, audio_train_resampled, eda_train_resampled_3d, temp_train_resampled_3d), labels_train_resampled, epochs=200, 
              validation_data=((val_input_ids, val_features, val_eda_features_3d, val_temp_features_3d), val_labels), batch_size=32, callbacks=[checkpoint_callback])

    model.load_weights(f"{model_name}_best_weights.h5")
    predictions = model.predict((val_input_ids, val_features, val_eda_features_3d, val_temp_features_3d))
    predictions = np.argmax(predictions, axis=1)

    print(f"\n{model_name} - Classification Report:")
    print(classification_report(val_labels, predictions))
    print(f"{model_name} - Confusion Matrix:")
    print(confusion_matrix(val_labels, predictions))

# Train and evaluate the combined model with text and bio signals
train_and_evaluate(combined_model_with_text_and_bio_signals, "CombinedModelWithTextAndBioSignals+Att-230413")

In [None]:
predictions = combined_model_with_text_and_bio_signals.predict((val_input_ids, val_features, val_eda_features_3d, val_temp_features_3d))
predictions = np.argmax(predictions, axis=1)

print("Audio_Text_BioSignal Classification Report:")
print(classification_report(val_labels, predictions))
print("Audio_Text_BioSignal Confusion Matrix:")
print(confusion_matrix(val_labels, predictions))

In [None]:
test_predictions = combined_model_with_text_and_bio_signals.predict((test_input_ids, test_features, test_eda_features_3d, test_temp_features_3d))
test_predictions = np.argmax(test_predictions, axis=1)

print("Audio_Text_BioSignal - Classification Report:")
print(classification_report(test_labels, test_predictions))
print("Audio_Text_BioSignal - Confusion Matrix:")
print(confusion_matrix(test_labels, test_predictions))

In [None]:
from sklearn.metrics import f1_score

print(round(f1_score(test_labels, test_predictions, average='weighted'), 4))
print(round(f1_score(test_labels, test_predictions, average='micro'), 4))
print(round(f1_score(test_labels, test_predictions, average='macro'), 4))

### Draw the Confusion Matrix

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix

def plot_confusion_matrix(y_true, y_pred, labels, title='Confusion Matrix'):
    matrix = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(8, 6))
    sns.heatmap(matrix, annot=True, cmap='Blues', fmt='d', xticklabels=labels, yticklabels=labels)
    plt.title(title)
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.show()

In [None]:
def plot_confusion_matrix_percent(y_true, y_pred, labels, title='Confusion Matrix', cmap='Blues'):
    cm = confusion_matrix(y_true, y_pred)
    cm_normalized = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
    cm_percentage = cm_normalized * 100
    
    plt.figure(figsize=(10, 10))
    sns.heatmap(cm_percentage, annot=True, fmt='.2f', cmap=cmap, xticklabels=labels, yticklabels=labels)
    
    plt.xlabel('Predicted Label')
    plt.ylabel('True Label')
    plt.title(title)
    plt.show()

In [None]:
labels = ['angry', 'disgust', 'fear', 'happy', 'neutral', 'sad', 'surprise']
plot_confusion_matrix(test_labels, test_predictions, labels)
plot_confusion_matrix_percent(test_labels, test_predictions, labels)