In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import (Input, Conv2D, MaxPooling2D, Flatten, Dense, Dropout,
                                     Concatenate, BatchNormalization, Conv1D, MaxPooling1D)
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.utils import resample
from tensorflow.keras.utils import to_categorical

# Load and duplicate dataset
data_path = '/content/drive/MyDrive/dataset/preprocessed-data/mahnob_HCI_preprocessed_all.npy'
data = np.load(data_path, allow_pickle=True)

# Extract features and labels
X_eeg = np.array([entry['eeg_data'] for entry in data])
X_video = np.array([entry['video_data'] for entry in data])
X_ecg = np.array([entry['ecg_data'] for entry in data])
y = np.array([entry['labels'] for entry in data])

print(f"Number of samples: {len(data)}")

# Video padding if needed
if X_video.shape[1:] != (32, 64, 64):
    padded_video = []
    for video in X_video:
        pad_shape = [
            (0, max(0, 32 - video.shape[0])),
            (0, max(0, 64 - video.shape[1])),
            (0, max(0, 64 - video.shape[2]))
        ]
        padded_video.append(np.pad(video, pad_shape, mode='constant'))
    X_video = np.array(padded_video)

# Class balancing
def balance_classes(X1, X2, X3, y, class_idx):
    labels = y[:, class_idx]
    unique_classes, class_counts = np.unique(labels, return_counts=True)
    max_count = np.max(class_counts)
    balanced_X1, balanced_X2, balanced_X3, balanced_y = [], [], [], []
    for cls in unique_classes:
        cls_mask = labels == cls
        cls_X1, cls_X2, cls_X3, cls_y = X1[cls_mask], X2[cls_mask], X3[cls_mask], y[cls_mask]
        if len(cls_y) < max_count:
            X1_oversampled, X2_oversampled, X3_oversampled, y_oversampled = resample(
                cls_X1, cls_X2, cls_X3, cls_y,
                replace=True,
                n_samples=max_count - len(cls_y),
                random_state=42
            )
            cls_X1 = np.concatenate([cls_X1, X1_oversampled])
            cls_X2 = np.concatenate([cls_X2, X2_oversampled])
            cls_X3 = np.concatenate([cls_X3, X3_oversampled])
            cls_y = np.concatenate([cls_y, y_oversampled])
        balanced_X1.append(cls_X1)
        balanced_X2.append(cls_X2)
        balanced_X3.append(cls_X3)
        balanced_y.append(cls_y)
    return (np.concatenate(balanced_X1),
            np.concatenate(balanced_X2),
            np.concatenate(balanced_X3),
            np.concatenate(balanced_y))

X_eeg, X_video, X_ecg, y = balance_classes(X_eeg, X_video, X_ecg, y, class_idx=1)
X_eeg, X_video, X_ecg, y = balance_classes(X_eeg, X_video, X_ecg, y, class_idx=2)

# Split data
X_eeg_train, X_eeg_test, X_video_train, X_video_test, X_ecg_train, X_ecg_test, y_train, y_test = train_test_split(
    X_eeg, X_video, X_ecg, y, test_size=0.2, random_state=42
)

num_valence_classes = int(np.max(y[:, 1])) + 1
num_arousal_classes = int(np.max(y[:, 2])) + 1

y_valence_train = to_categorical(y_train[:, 1], num_classes=num_valence_classes)
y_arousal_train = to_categorical(y_train[:, 2], num_classes=num_arousal_classes)
y_valence_test = to_categorical(y_test[:, 1], num_classes=num_valence_classes)
y_arousal_test = to_categorical(y_test[:, 2], num_classes=num_arousal_classes)

# --------- 2D CNN BRANCH (single facial frame per video) ----------
X_video_train_2d = X_video_train[:, 0, :, :][..., np.newaxis]
X_video_test_2d  = X_video_test[:, 0, :, :][..., np.newaxis]

video_input = Input(shape=(64, 64, 1), name="video_input")
x = Conv2D(32, (3, 3), activation='relu', padding='same')(video_input)
x = MaxPooling2D((2, 2))(x)
x = BatchNormalization()(x)
x = Conv2D(64, (3, 3), activation='relu', padding='same')(x)
x = MaxPooling2D((2, 2))(x)
x = BatchNormalization()(x)
x = Flatten()(x)
cnn_output = Dense(128, activation='relu')(x)

# --------- 1D CNN EEG BRANCH ---------
# Transpose: (samples, channels, time) -> (samples, time, channels)
X_eeg_train_1d = np.transpose(X_eeg_train, (0, 2, 1))
X_eeg_test_1d = np.transpose(X_eeg_test, (0, 2, 1))
eeg_input = Input(shape=(512, 5), name="eeg_input")
eeg_cnn = Conv1D(64, 7, activation='relu', padding='same')(eeg_input)
eeg_cnn = MaxPooling1D(2)(eeg_cnn)
eeg_cnn = BatchNormalization()(eeg_cnn)
eeg_cnn = Conv1D(128, 5, activation='relu', padding='same')(eeg_cnn)
eeg_cnn = MaxPooling1D(2)(eeg_cnn)
eeg_cnn = BatchNormalization()(eeg_cnn)
eeg_cnn = Flatten()(eeg_cnn)
eeg_output = Dense(128, activation='relu')(eeg_cnn)

# --------- 1D CNN ECG BRANCH ---------
# Transpose: (samples, channels, time) -> (samples, time, channels)
X_ecg_train_1d = np.transpose(X_ecg_train, (0, 2, 1))
X_ecg_test_1d = np.transpose(X_ecg_test, (0, 2, 1))
ecg_input = Input(shape=(512, 2), name="ecg_input")
ecg_cnn = Conv1D(64, 7, activation='relu', padding='same')(ecg_input)
ecg_cnn = MaxPooling1D(2)(ecg_cnn)
ecg_cnn = BatchNormalization()(ecg_cnn)
ecg_cnn = Conv1D(128, 5, activation='relu', padding='same')(ecg_cnn)
ecg_cnn = MaxPooling1D(2)(ecg_cnn)
ecg_cnn = BatchNormalization()(ecg_cnn)
ecg_cnn = Flatten()(ecg_cnn)
ecg_output = Dense(64, activation='relu')(ecg_cnn)

# --------- FUSION AND OUTPUT ---------
combined = Concatenate()([cnn_output, eeg_output, ecg_output])
combined = Dense(256, activation='relu')(combined)
combined = Dropout(0.3)(combined)
combined = Dense(128, activation='relu')(combined)
combined = Dropout(0.2)(combined)

valence_output = Dense(num_valence_classes, activation='softmax', name="valence_output")(combined)
arousal_output = Dense(num_arousal_classes, activation='softmax', name="arousal_output")(combined)

model = Model(
    inputs=[video_input, eeg_input, ecg_input],
    outputs=[valence_output, arousal_output]
)

model.compile(
    optimizer='adam',
    loss=['categorical_crossentropy', 'categorical_crossentropy'],
    metrics=['accuracy', 'accuracy']
)
model.summary()

# Train
history = model.fit(
    [X_video_train_2d, X_eeg_train_1d, X_ecg_train_1d],
    [y_valence_train, y_arousal_train],
    validation_data=(
        [X_video_test_2d, X_eeg_test_1d, X_ecg_test_1d],
        [y_valence_test, y_arousal_test]
    ),
    epochs=50,
    batch_size=128
)

# Evaluate
y_valence_pred, y_arousal_pred = model.predict([X_video_test_2d, X_eeg_test_1d, X_ecg_test_1d])
y_valence_pred_classes = np.argmax(y_valence_pred, axis=1)
y_arousal_pred_classes = np.argmax(y_arousal_pred, axis=1)
y_valence_true = np.argmax(y_valence_test, axis=1)
y_arousal_true = np.argmax(y_arousal_test, axis=1)

valence_accuracy = accuracy_score(y_valence_true, y_valence_pred_classes)
arousal_accuracy = accuracy_score(y_arousal_true, y_arousal_pred_classes)

print(f"Valence Accuracy: {valence_accuracy * 100:.2f}%")
print(f"Arousal Accuracy: {arousal_accuracy * 100:.2f}%")


Number of samples: 186


Epoch 1/50
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m18s[0m 3s/step - arousal_output_accuracy: 0.1062 - arousal_output_loss: 4.4630 - loss: 8.0113 - valence_output_accuracy: 0.1203 - valence_output_loss: 3.5556 - val_arousal_output_accuracy: 0.0426 - val_arousal_output_loss: 3.2281 - val_loss: 5.7058 - val_valence_output_accuracy: 0.1596 - val_valence_output_loss: 2.4777
Epoch 2/50
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 92ms/step - arousal_output_accuracy: 0.1695 - arousal_output_loss: 3.3707 - loss: 6.6665 - valence_output_accuracy: 0.1449 - valence_output_loss: 3.2816 - val_arousal_output_accuracy: 0.1170 - val_arousal_output_loss: 2.5623 - val_loss: 4.8169 - val_valence_output_accuracy: 0.1170 - val_valence_output_loss: 2.2546
Epoch 3/50
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 76ms/step - arousal_output_accuracy: 0.2446 - arousal_output_loss: 2.2407 - loss: 4.5841 - valence_output_accuracy: 0.1813 - valence_output_loss: 2.34