# **Explainable Deep Multi-branch Neural Network for Recognizing Autonomic Activation States from Multimodal Physiological Signals**

## Introduction
This notebook presents the **experimental pipeline** used in the thesis, including:
- Data preprocessing and feature engineering  
- Model construction and training (Hybrid model and baseline models)  
- Performance evaluation using multiple metrics (accuracy, F1-score, confusion matrix, etc.)  
- Model explainability analysis with attention weights (mean activation, temporal attention, entropy)

## Objective
The objectives of this notebook are:
1. To demonstrate the complete experimental process from raw data to final results.  
2. To compare the performance of the proposed Hybrid model with baseline models (both Machine Learning and Deep Learning).  
3. To provide a reproducible workflow that supports validation and further research.

## Import Required Libraries

Before running the experiments, some Python packages need to be installed (if not already available in the environment).
The following packages are required in addition to standard ML/DL libraries:

- **peakutils**: for signal peak detection and processing  
- **biosppy**: for biomedical signal processing (e.g., ECG, respiration)  
- **tsaug**: for time-series data augmentation  

In [None]:
!pip install peakutils

In [None]:
!pip install biosppy

In [None]:
pip install tsaug

In [None]:
# ============================================================
# 📂 Data Handling & Preprocessing
# ============================================================
import numpy as np
import pandas as pd
import random

from sklearn.preprocessing import StandardScaler, MinMaxScaler, LabelBinarizer
from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_class_weight
from scipy import stats

# ============================================================
# 📂 Signal Processing
# ============================================================
from scipy import signal, interpolate
from scipy.signal import (
    find_peaks, butter, filtfilt, detrend, resample
)
from scipy.ndimage import uniform_filter1d
from scipy.interpolate import interp1d
from scipy.optimize import minimize
from scipy.stats import skew, kurtosis

import pywt  # Wavelet transform

# Biosppy: specialized functions for biomedical signals
from biosppy.signals import resp as biosppy_resp
from biosppy.signals import eda as biosppy_eda
from biosppy.signals import ecg as biosppy_ecg

# Tsaug: data augmentation for time-series
from tsaug import TimeWarp, Drift, AddNoise

# ============================================================
# 📂 Visualization & Analysis
# ============================================================
import matplotlib.pyplot as plt
import seaborn as sns
from collections import Counter

# ============================================================
# 📂 Model Saving / Loading & Utilities
# ============================================================
import pickle
import joblib
import os
from tqdm import tqdm

# ============================================================
# 📂 Deep Learning (TensorFlow / Keras)
# ============================================================
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import (
    Input, Dense, Dropout, Conv1D, MaxPooling1D, GRU, Bidirectional,
    GlobalAveragePooling1D, Add, MultiHeadAttention, Concatenate, 
    BatchNormalization, ReLU, Multiply, Activation, LayerNormalization, ELU
)
from tensorflow.keras.losses import CategoricalCrossentropy
from tensorflow.keras.callbacks import (
    EarlyStopping, ModelCheckpoint, ReduceLROnPlateau, Callback
)
from tensorflow.keras.optimizers import AdamW
from tensorflow.keras import regularizers
from tensorflow.keras.regularizers import l2
from tensorflow.keras.utils import to_categorical
from tensorflow.keras import backend as K
from tensorflow.keras.activations import relu  # Explicit ReLU activation

# ============================================================
# 📂 Classical Machine Learning Models (Baselines)
# ============================================================
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier

# ============================================================
# 📂 Model Evaluation
# ============================================================
from sklearn.metrics import (
    accuracy_score, confusion_matrix, ConfusionMatrixDisplay, 
    classification_report, roc_auc_score, roc_curve, auc, f1_score)

# ============================================================
# 📂 Model Selection & Cross-validation
# ============================================================
from sklearn.model_selection import KFold

# ============================================================
# 📂 Additional
# ============================================================
from sklearn.preprocessing import label_binarize
from collections import defaultdict

# ============================================================
# 📂 Warning Control
# ============================================================
import warnings
warnings.filterwarnings("ignore")

print("✔️ All libraries imported successfully!")

## Convert & Merge (WESAD Dataset)

In this section, raw WESAD data files in pickle format (`.pkl`) are converted into a unified DataFrame structure. 
The process includes:

1. **Configuration**  
   - Define input/output paths.  
   - Specify subject IDs (excluding S12, due to corrupted data).  
   - Select physiological signals of interest: **ECG**, **EDA**, and **Respiration**.  

2. **Conversion**  
   - Each subject's pickle file contains multiple signals.  
   - Signals from the chest device (ECG, EDA, Resp) are extracted.  
   - Labels are aligned with signals and a subject ID (`sid`) column is added.  

3. **Merging**  
   - Data from all subjects are concatenated into one large DataFrame.  
   - Columns: `['sid', 'ecg', 'eda', 'resp', 'label']`.  

4. **Filtering**  
   - Only keep the relevant labels: **Baseline (1)**, **Stress (2)**, and **Amusement (3)**.  
   - Relabel them as `{0: Baseline, 1: Stress, 2: Amusement}` for supervised learning.  

5. **Saving**  
   - Save both raw merged data (`merged_chest.pkl`) and filtered data (`merged_chest_fltr.pkl`) for reproducibility.  

In [None]:
# ==== Step 1. Basic Configuration ====
WESAD_PATH = '/kaggle/input/wesad-dataset/WESAD'
OUTPUT_PATH = '/kaggle/working/wesad_processed'
os.makedirs(OUTPUT_PATH, exist_ok=True)

# Subject IDs (2–17, excluding 12 because of missing/corrupted data)
ids = [i for i in range(2, 18) if i != 12]

# Columns used for chest signals
cols_chest = ['sid', 'ecg', 'eda', 'resp', 'label']
sf_chest = 700  # Sampling frequency

print("✔️ Configuration done.")

In [None]:
# ==== Step 2. Convert from Pickle to DataFrame ====
def pkl_to_chest(filename, sub_id):
    """
    Convert raw pickle dictionary for a subject into a NumPy array.

    Parameters:
        filename (str): Path to pickle file for a subject.
        sub_id (int): Subject ID.

    Returns:
        np.array: Chest data [sid, ecg, eda, resp, label].
    """
    unpickled_df = pd.read_pickle(filename)

    chest_ecg = unpickled_df['signal']['chest']['ECG'][:, 0].reshape(-1, 1)
    chest_eda = unpickled_df['signal']['chest']['EDA'].reshape(-1, 1)
    chest_resp = unpickled_df['signal']['chest']['Resp'].reshape(-1, 1)

    label = unpickled_df['label'].reshape(-1, 1)
    sid = np.full((label.shape[0], 1), sub_id)

    chest_data = np.concatenate((sid, chest_ecg, chest_eda, chest_resp, label), axis=1)
    return chest_data


# ==== Step 3. Merge multiple subjects ====
def merge_chest(ids):
    merged_data = np.empty((0, len(cols_chest)))

    for sid in ids:
        file_path = os.path.join(WESAD_PATH, f'S{sid}', f'S{sid}.pkl')
        print(f"\nProcessing subject: S{sid}")

        if os.path.exists(file_path):
            subject_data = pkl_to_chest(file_path, sid)
            merged_data = np.concatenate((merged_data, subject_data), axis=0)
            print(f"Merged shape so far: {merged_data.shape}")
        else:
            print(f"❌ File not found: {file_path}")

    # Convert to DataFrame with proper types
    merged_df = pd.DataFrame(merged_data, columns=cols_chest).astype({
        'sid': int, 'ecg': float, 'eda': float, 'resp': float, 'label': int
    })

    # Save merged DataFrame
    merged_df.to_pickle(os.path.join(OUTPUT_PATH, 'merged_chest.pkl'))
    print(f"\n✔️ Merged dataset saved at {OUTPUT_PATH}/merged_chest.pkl")


# ==== Step 4. Filter relevant labels ====
def filter_chest_data():
    """
    Keep only baseline, stress, and amusement classes.
    Map labels {1:0, 2:1, 3:2}.
    """
    df = pd.read_pickle(os.path.join(OUTPUT_PATH, "merged_chest.pkl"))
    df_fltr = df[df["label"].isin([1, 2, 3])]
    label_map = {1: 0, 2: 1, 3: 2}
    df_fltr['label'] = df_fltr['label'].map(label_map)

    print(f"Filtered dataset shape: {df_fltr.shape}")
    df_fltr.to_pickle(os.path.join(OUTPUT_PATH, "merged_chest_fltr.pkl"))
    print(f"✔️ Filtered dataset saved at {OUTPUT_PATH}/merged_chest_fltr.pkl")

In [None]:
# ==== Step 5. Run Preprocessing Pipeline ====
def preprocess():
    merge_chest(ids)
    filter_chest_data()

preprocess()

## Exploratory Data Analysis (EDA)

In this section, the filtered WESAD dataset (`merged_chest_fltr.pkl`) is loaded and inspected to ensure data quality before preprocessing. The process includes:

1. **Loading & Inspection**
   - Load the filtered dataset from disk.
   - Display dataset shape, first rows (`head()`), and basic info (`info()`).
   - Summarize descriptive statistics (`describe()`).
   - Check unique subject IDs and label distribution per subject.

2. **Label Distribution**
   - Visualize the number of samples for each condition:
     - **Baseline (0)**
     - **Stress (1)**
     - **Amusement (2)**

3. **Signal Preview**
   - Plot the first 10 seconds of physiological signals to verify signal quality.
   - Signals inspected:
     - **ECG** (Electrocardiogram – heart electrical activity).
     - **EDA** (Electrodermal Activity – skin conductance, related to arousal).
     - **Resp** (Respiration – breathing patterns).

4. **Purpose**
   - Ensure that the dataset is correctly filtered and structured.
   - Identify potential class imbalance in labels.
   - Visually confirm that physiological signals are properly recorded.

In [None]:
fltr_data = pd.read_pickle(os.path.join(OUTPUT_PATH, "merged_chest_fltr.pkl"))
print(f"Filtered Data Shape: {fltr_data.shape}")
fltr_data.head()

In [None]:
fltr_data.info()

In [None]:
fltr_data.describe().T

In [None]:
print(fltr_data['sid'].unique())
fltr_data.groupby(['sid', 'label']).head()

In [None]:
plt.figure(figsize=(6, 4))
ax = sns.countplot(
    data=fltr_data,
    x="label",
    hue="label",
    dodge=False
)
ax.legend_.remove()
plt.xticks([0, 1, 2], ["Baseline", "Stressed", "Amusement"])
plt.title("Number of Baseline, Stressed and Amusement Data Points")
plt.xlabel("Condition")
plt.ylabel("Count")
plt.show()

In [None]:
sampling_rate = sf_chest  # Hz
duration_sec = 10
samples_to_plot = sampling_rate * duration_sec
ecg_raw=fltr_data['ecg']
ecg_signal=ecg_raw

# --- Time axis ---
time_axis = np.linspace(0, duration_sec, samples_to_plot)

# --- Plot ECG ---
plt.figure(figsize=(15, 4))
plt.plot(time_axis, ecg_signal[:samples_to_plot])
plt.title('ecg first 5 sec')
plt.xlabel('Time (seconds)')
plt.ylabel('Amplitude')
plt.grid(True)
plt.show()

In [None]:
sampling_rate = sf_chest  # Hz
duration_sec = 10
samples_to_plot = sampling_rate * duration_sec
eda_raw=fltr_data['eda']
eda_signal=eda_raw

# --- Time axis ---
time_axis = np.linspace(0, duration_sec, samples_to_plot)

# --- Plot EDA ---
plt.figure(figsize=(15, 4))
plt.plot(time_axis, eda_signal[:samples_to_plot])
plt.title('eda first 5 sec')
plt.xlabel('Time (seconds)')
plt.ylabel('Amplitude')
plt.grid(True)
plt.show()

In [None]:
sampling_rate = sf_chest  # Hz
duration_sec = 10
samples_to_plot = sampling_rate * duration_sec
resp_raw=fltr_data['resp']
resp_signal=resp_raw

# --- Time axis ---
time_axis = np.linspace(0, duration_sec, samples_to_plot)

# --- Plot RESP ---
plt.figure(figsize=(15, 4))
plt.plot(time_axis, resp_signal[:samples_to_plot])
plt.title('resp first 5 sec')
plt.xlabel('Time (seconds)')
plt.ylabel('Amplitude')
plt.grid(True)
plt.show()

## Preprocessing  

This section describes the **preprocessing pipeline** applied to the physiological signals (ECG, EDA, RESP) for stress detection.  
The pipeline includes:  

1. **Signal filtering & denoising**  
   - ECG, EDA, and RESP signals are filtered using either `biosppy` functions or custom filters.  
   - Alternative denoising methods (e.g., wavelet, detrending) are also available if `use_biosppy=False`.  

2. **Normalization**  
   - ECG and RESP signals are standardized (`StandardScaler`).  
   - EDA signals are scaled to `[0, 1]` range using `MinMaxScaler`.  

3. **Segmentation**  
   - Continuous labeled signals are split into windows of fixed length (e.g., 30 seconds).  
   - Overlapping windows are generated with a defined stride (e.g., 5 seconds).  
   - Very short segments are discarded.  

4. **Downsampling**  
   - Each window is downsampled to a fixed length (`target_len=3500`) to ensure uniform input size across subjects and modalities.  

5. **Data aggregation**  
   - Segments are collected from all subjects.  
   - Only valid samples (with consistent shapes) are kept.  
   - Final output includes preprocessed ECG, EDA, RESP arrays, labels, subject IDs, and sample IDs.

6. **Feature extraction for machine learning**
   - For **ECG**, HRV-based features are extracted: mean RR interval, SDNN, RMSSD, NN50, pNN50.
   - For **EDA**, statistical and dynamic features are extracted: mean, standard deviation, max, min, mean/SD of differences, signal power.
   - For **RESP**, respiratory features are extracted: mean, standard deviation, max, min, and estimated breath rate.
   - Features from all modalities are concatenated into a single feature vector per segment, producing a final feature matrix suitable for classical ML models.

In [None]:
# ===== ECG Preprocessing =====
def preprocess_ecg_raw(ecg_signal, fs=sf_chest, target_len=None, use_biosppy=True):
    if use_biosppy:
        ecg_processed = biosppy_ecg.ecg(signal=ecg_signal, sampling_rate=fs, show=False)
        filtered = ecg_processed['filtered']
    else:
        filtered = highpass_filter(ecg_signal, cutoff=0.5, fs=fs)
        filtered = wavelet_denoise(filtered)

    # Adjust signal length if target length is specified
    if target_len is not None:
        if len(filtered) > target_len:
            filtered = filtered[:target_len]
        elif len(filtered) < target_len:
            filtered = np.pad(filtered, (0, target_len - len(filtered)), mode='constant')
    return filtered


# ===== EDA Preprocessing =====
def preprocess_eda_raw(eda_signal_raw, fs=sf_chest, target_len=None, use_biosppy=True):
    if use_biosppy:
        eda_processed = biosppy_eda.eda(signal=eda_signal_raw, sampling_rate=fs, show=False)
        eda_filtered = eda_processed['filtered']
    else:
        eda_detrended = detrend(eda_signal_raw, type='linear')
        eda_filtered = lowpass_filter_eda(eda_detrended, cutoff=1.5, fs=fs)
    
    # Adjust signal length if target length is specified
    if target_len:
        eda_filtered = eda_filtered[:target_len] if len(eda_filtered) >= target_len else np.pad(eda_filtered, (0, target_len - len(eda_filtered)))
    return eda_filtered


# ===== RESP Preprocessing =====
def preprocess_resp_raw(resp_signal_raw, fs=sf_chest, target_len=None, use_biosppy=True):
    if use_biosppy:
        resp_processed = biosppy_resp.resp(signal=resp_signal_raw, sampling_rate=fs, show=False)
        resp_filtered = resp_processed['filtered']
    else:
        resp_detrended = detrend(resp_signal_raw, type='linear')
        resp_filtered = lowpass_filter_resp(resp_detrended, cutoff=0.6, fs=fs)

    # Adjust signal length if target length is specified
    if target_len:
        resp_filtered = resp_filtered[:target_len] if len(resp_filtered) >= target_len else np.pad(resp_filtered, (0, target_len - len(resp_filtered)))
    return resp_filtered

print('✔️ Done!')

In [None]:
def preprocess(df, sid, fs=700, win_sec=30, stride=5,
               use_biosppy=True, target_len=3500):
    """Preprocess signals for one subject: filtering, normalization, segmentation, and downsampling"""

    sub_df = df[df['sid'] == sid]
    ecg_raw = sub_df['ecg'].values.astype(float)
    resp_raw = sub_df['resp'].values.astype(float)
    eda_raw = sub_df['eda'].values.astype(float)
    labels = sub_df['label'].values.astype(int)

    # === Filtering & denoising
    ecg_filtered = preprocess_ecg_raw(ecg_raw, fs=fs, target_len=len(ecg_raw), use_biosppy=use_biosppy)
    resp_filtered = preprocess_resp_raw(resp_raw, fs=fs, target_len=len(resp_raw), use_biosppy=use_biosppy)
    eda_filtered = preprocess_eda_raw(eda_raw, fs=fs, target_len=len(eda_raw), use_biosppy=use_biosppy)

    # === Global normalization
    ecg_scaled = StandardScaler().fit_transform(ecg_filtered.reshape(-1, 1)).flatten()
    resp_scaled = StandardScaler().fit_transform(resp_filtered.reshape(-1, 1)).flatten()
    eda_scaled = MinMaxScaler().fit_transform(eda_filtered.reshape(-1, 1)).flatten()

    # === Window and stride parameters
    win_len = fs * win_sec
    stride_len = fs * stride

    X_ecg, X_resp, X_eda = [], [], []
    y, sub_ids, sample_ids = [], [], []
    sample_counter = 0

    # Downsampling function
    def downsample(signal, target_len=3500):
        return signal[::max(1, len(signal) // target_len)][:target_len]

    # Segment processing function
    def process_segment(pos, seg_label):
        nonlocal sample_counter
        ecg_win = ecg_scaled[pos: pos + win_len]
        resp_win = resp_scaled[pos: pos + win_len]
        eda_win = eda_scaled[pos: pos + win_len]

        # === Downsample
        ecg_down = downsample(ecg_win, target_len)
        resp_down = downsample(resp_win, target_len)
        eda_down = downsample(eda_win, target_len)

        X_ecg.append(ecg_down[:, np.newaxis])
        X_resp.append(resp_down[:, np.newaxis])
        X_eda.append(eda_down[:, np.newaxis])

        y.append(seg_label)
        sub_ids.append(f"S{int(sid)}")
        sample_ids.append(f"S{int(sid)}_sample{sample_counter}")
        sample_counter += 1

    # === Segment signals based on continuous labels
    start_idx = 0
    for i in range(1, len(labels)):
        if labels[i] != labels[start_idx] or i == len(labels) - 1:
            seg_end = i + 1 if i == len(labels) - 1 else i
            seg_label = labels[start_idx]
            seg_len = seg_end - start_idx

            if seg_len >= win_len:
                pos = start_idx
                while pos + win_len <= seg_end:
                    process_segment(pos, seg_label)
                    pos += stride_len
            elif seg_len >= win_len // 2:
                # If segment is at least half the window, take one centered sample
                center = start_idx + seg_len // 2
                pos = max(0, center - win_len // 2)
                process_segment(pos, seg_label)
            else:
                print(f"[!] Segment too short and skipped (label={seg_label}, len={seg_len})")

            start_idx = i

    # === Handle the last segment
    seg_label = labels[start_idx]
    if len(labels) - start_idx >= win_len:
        pos = start_idx
        while pos + win_len <= len(labels):
            process_segment(pos, seg_label)
            pos += stride_len

    return X_ecg, X_eda, X_resp, y, sub_ids, sample_ids

print('✔️ Done!')

In [None]:
resample_len = 3500  # Number of points after resampling for all signals

# === Aggregate data from all subjects ===
X_ecg_all, X_eda_all, X_resp_all = [], [], []
y_all, sid_all, sample_ids_all = [], [], []

for sid in fltr_data['sid'].unique():
    ecg_segs, eda_segs, resp_segs, y, sids, sample_ids = preprocess(fltr_data, sid)
    
    X_ecg_all.extend(ecg_segs)
    X_eda_all.extend(eda_segs)
    X_resp_all.extend(resp_segs)
    
    y_all.extend(y)
    sid_all.extend(sids)
    sample_ids_all.extend(sample_ids)

# === Filter out invalid segments with wrong shapes ===
X_ecg, X_eda, X_resp, y_clean, sid_clean, sample_ids_clean = [], [], [], [], [], []
expected_shape = (resample_len, 1)

for ecg, eda, resp, label, sid, samp_id in zip(
        X_ecg_all, X_eda_all, X_resp_all, y_all, sid_all, sample_ids_all):

    if ecg.shape == expected_shape and resp.shape == expected_shape and eda.shape == expected_shape:
        X_ecg.append(ecg)
        X_eda.append(eda)
        X_resp.append(resp)
        y_clean.append(label)
        sid_clean.append(sid)
        sample_ids_clean.append(samp_id)

# === Convert to numpy arrays ===
X_ecg = np.stack(X_ecg)
X_eda = np.stack(X_eda)
X_resp = np.stack(X_resp)
y = np.array(y_clean)
sid_clean = np.array(sid_clean)
sample_ids = np.array(sample_ids_clean)

# === Print final results ===
print("✔️ Valid data after shape check:")
print("ECG:", X_ecg.shape, "| EDA:", X_eda.shape, "| RESP:", X_resp.shape, "| Labels:", y.shape)
print("Number of unique sample IDs:", len(np.unique(sample_ids)))
print("Label distribution:", Counter(y))

In [None]:
from scipy.signal import find_peaks

def extract_hrv_features(ecg_signal, sampling_rate=700):
    ecg_signal = ecg_signal.squeeze()
    try:
        # Phát hiện R-peaks đơn giản
        peaks, _ = find_peaks(ecg_signal, distance=sampling_rate*0.6, height=np.mean(ecg_signal))

        if len(peaks) < 2:
            raise ValueError("Not enough R-peaks")

        rr_intervals = np.diff(peaks) / sampling_rate  # chuyển sang giây

        # HRV features
        mean_rr = np.mean(rr_intervals)
        std_rr = np.std(rr_intervals)
        rmssd = np.sqrt(np.mean(np.square(np.diff(rr_intervals))))
        nn50 = np.sum(np.abs(np.diff(rr_intervals)) > 0.05)
        pnn50 = nn50 / len(rr_intervals)

        return [mean_rr, std_rr, rmssd, nn50, pnn50]

    except Exception as e:
        print("ECG HRV error:", e)
        return [np.nan] * 5

def extract_eda_features(eda_signal):
    eda_signal = eda_signal.squeeze()
    try:
        diff = np.diff(eda_signal)
        power = np.mean(eda_signal ** 2)

        return [
            np.mean(eda_signal),
            np.std(eda_signal),
            np.max(eda_signal),
            np.min(eda_signal),
            np.mean(diff),
            np.std(diff),
            power
        ]
    except Exception as e:
        print("EDA error:", e)
        return [np.nan] * 7

def extract_resp_features(resp_signal, sampling_rate=700):
    resp_signal = resp_signal.squeeze()
    try:
        # Phát hiện đỉnh (chu kỳ thở)
        peaks, _ = find_peaks(resp_signal, distance=sampling_rate*1.5)  # giả sử tần số thở ~ 0.3–0.7 Hz

        if len(peaks) < 2:
            raise ValueError("Not enough breaths")

        intervals = np.diff(peaks) / sampling_rate  # giây
        mean_breath_rate = 60 / np.mean(intervals)

        return [
            np.mean(resp_signal),
            np.std(resp_signal),
            np.max(resp_signal),
            np.min(resp_signal),
            mean_breath_rate
        ]
    except Exception as e:
        print("RESP error:", e)
        return [np.nan] * 5

In [None]:
def extract_all_features_custom(X_ecg, X_eda, X_resp, sampling_rate=700):
    all_features = []
    for ecg, eda, resp in zip(X_ecg, X_eda, X_resp):
        feats = (
            extract_hrv_features(ecg, sampling_rate) +
            extract_eda_features(eda) +
            extract_resp_features(resp, sampling_rate)
        )
        all_features.append(feats)
    return np.array(all_features)

## Train/Validation/Test Split

In this step, the dataset is **split by subject IDs** into three non-overlapping subsets: **train**, **validation**, and **test**.  
This approach ensures that data from the same subject never appears in multiple splits, avoiding **data leakage**.  

### Main Functions
1. **`split_by_sid`**  
   - Filters arrays (`X_ecg`, `X_eda`, `X_resp`, `y`, `sid_arr`, `sample_ids`) according to a given subject ID list.  
   - Returns only the samples belonging to the specified subjects.

2. **`split_dataset_by_subjects`**  
   - Randomly splits subject IDs into **train/val/test** groups based on:  
     - `test_size`: proportion of subjects reserved for val+test.  
     - `val_ratio`: percentage of val inside the temp set.  
     - `seed`: random seed for reproducibility.  
   - Ensures **no subject overlap** between train, val, and test.  
   - Returns a dictionary containing the three splits and the subject mapping.  

### Verification & Analysis
- After splitting, the code prints:  
  1. **Dataset shape** (number of samples per modality, unique subjects, and sample IDs).  
  2. **Label distribution** (class balance within each split).  

- Finally, a **bar chart** is plotted to show the number of samples for each class (Baseline = 0, Stress = 1, Amusement = 2) across the three subsets.  
  - Bars are grouped by split (Train, Val, Test).  
  - Each bar displays the exact count of samples for clarity.  
  - The plot is saved as `cls_distribution.png`.

### Why Split by Subjects?
- Physiological signals vary strongly between individuals.  
- Splitting by samples (random rows) could cause **information leakage** if signals from the same subject appear in both train and test.  
- By splitting on **subject IDs**, the model is forced to generalize to unseen individuals, making evaluation more realistic.

In [None]:
def split_by_sid(X_ecg, X_eda, X_resp, y, sid_arr, sample_ids, sid_list):
    """
    Filter data based on a list of subject IDs (sid_list).
    All input data should be numpy arrays.
    """
    mask = np.isin(sid_arr, sid_list)
    if not np.any(mask):
        raise ValueError("No subjects found matching sid_list.")

    return (
        X_ecg[mask],
        X_eda[mask],
        X_resp[mask],
        y[mask],
        sid_arr[mask],
        sample_ids[mask],
    )

def split_dataset_by_subjects(X_ecg, X_eda, X_resp, y, sid_arr, sample_ids,
                               test_size=0.4, val_ratio=0.5, seed=42):
    """
    Split the dataset into train / validation / test sets by subject ID 
    (ensures no subject leakage across different sets).

    Parameters:
        - test_size: proportion of subjects assigned to val+test 
                     (e.g., 0.3 -> train gets 70%)
        - val_ratio: fraction of subjects in val+test that goes to validation 
                     (e.g., 0.5 -> validation = test)
        - seed: random seed for reproducibility

    Returns:
        dict containing train / validation / test splits,
        and the mapping of subject IDs for each split.
    """
    unique_sids = np.unique(sid_arr)

    # Train / temp split
    sids_train, sids_temp = train_test_split(
        unique_sids, test_size=test_size, random_state=seed
    )

    # Temp => val / test
    sids_val, sids_test = train_test_split(
        sids_temp, test_size=val_ratio, random_state=seed
    )

    # Sanity check
    assert not (set(sids_train) & set(sids_val)), "Subjects overlap between train and validation"
    assert not (set(sids_train) & set(sids_test)), "Subjects overlap between train and test"
    assert not (set(sids_val) & set(sids_test)), "Subjects overlap between validation and test"

    # Extract data for each split
    X_ecg_train, X_eda_train, X_resp_train, y_train, sid_train, sample_ids_train = split_by_sid(
        X_ecg, X_eda, X_resp, y, sid_arr, sample_ids, sids_train)
    X_ecg_val, X_eda_val, X_resp_val, y_val, sid_val, sample_ids_val = split_by_sid(
        X_ecg, X_eda, X_resp, y, sid_arr, sample_ids, sids_val)
    X_ecg_test, X_eda_test, X_resp_test, y_test, sid_test, sample_ids_test = split_by_sid(
        X_ecg, X_eda, X_resp, y, sid_arr, sample_ids, sids_test)

    # Final check: ensure all subjects are included after splitting
    all_split_sids = set(sid_train.tolist() + sid_val.tolist() + sid_test.tolist())
    assert all_split_sids == set(unique_sids), "Some subjects are missing after splitting!"

    return {
        "train": (X_ecg_train, X_eda_train, X_resp_train, y_train, sid_train, sample_ids_train),
        "val": (X_ecg_val, X_eda_val, X_resp_val, y_val, sid_val, sample_ids_val),
        "test": (X_ecg_test, X_eda_test, X_resp_test, y_test, sid_test, sample_ids_test),
        "sid_split": {
            "train": sids_train,
            "val": sids_val,
            "test": sids_test
        }
    }

print('✔️ Done!')

In [None]:
splits = split_dataset_by_subjects(X_ecg, X_eda, X_resp, y, sid_clean, sample_ids)

# === Extract the training set ===
X_ecg_train, X_eda_train, X_resp_train, y_train, sid_train, sample_ids_train = splits["train"]

# === Extract the validation set ===
X_ecg_val, X_eda_val, X_resp_val, y_val, sid_val, sample_ids_val = splits["val"]

# === Extract the test set ===
X_ecg_test, X_eda_test, X_resp_test, y_test, sid_test, sample_ids_test = splits["test"]

print('✔️ Done!')

In [None]:
# === Check dataset dimensions and number of subjects/sample IDs ===
print("\nDATASET SHAPE INFORMATION")
print("=" * 60)

def print_data_shape(name, X_ecg, X_eda, X_resp, y, sid, sample_ids):
    """
    Print the shape of ECG, EDA, and RESP arrays, 
    along with the number of unique subjects and total samples in each split.
    """
    print(f"{name:<6} - ECG: {X_ecg.shape}, EDA: {X_eda.shape}, RESP: {X_resp.shape}, "
          f"Subjects: {len(np.unique(sid))}, Samples: {len(sample_ids)}")

# Display dataset information for Train, Validation, and Test splits
print_data_shape("Train", X_ecg_train, X_eda_train, X_resp_train, y_train, sid_train, sample_ids_train)
print_data_shape("Val",   X_ecg_val,   X_eda_val,   X_resp_val,   y_val,   sid_val,   sample_ids_val)
print_data_shape("Test",  X_ecg_test,  X_eda_test,  X_resp_test,  y_test,  sid_test,  sample_ids_test)


# === Check label distribution (number of samples per class) ===
print("\nLABEL DISTRIBUTION")
print("=" * 60)

def print_label_dist(name, y):
    """
    Print the distribution of labels for a given dataset split.
    Each class is shown with the corresponding sample count.
    """
    counter = Counter(y)
    label_str = ", ".join([f"{k}: {v}" for k, v in sorted(counter.items())])
    print(f"{name:<6}: {label_str}")

# Display label distribution for Train, Validation, and Test splits
print_label_dist("Train", y_train)
print_label_dist("Val",   y_val)
print_label_dist("Test",  y_test)

In [None]:
# === Count labels in each split (Train, Validation, Test) ===
counts_train = Counter(y_train)
counts_val = Counter(y_val)
counts_test = Counter(y_test)

# === Get the set of class labels (e.g., 0, 1, 2) ===
classes = sorted(set(y_train) | set(y_val) | set(y_test))

# === Prepare data structure to store class counts across splits ===
labels = ['Train', 'Val', 'Test']
data_by_class = {cls: [] for cls in classes}

# Fill the dictionary with sample counts per class for each split
for cls in classes:
    data_by_class[cls] = [
        counts_train.get(cls, 0),
        counts_val.get(cls, 0),
        counts_test.get(cls, 0)
    ]


# === Plot grouped bar chart of class distribution ===
x = range(len(labels))
bar_width = 0.2  # adjust width so 3 bars fit within each group

fig, ax = plt.subplots(figsize=(9, 5))

# Define offsets for each class (to shift bars side by side)
offsets = [-bar_width, 0, bar_width]
colors = ['skyblue', 'salmon', 'lightgreen']

# Draw bars for each class
for i, cls in enumerate(classes):
    x_positions = [pos + offsets[i] for pos in x]
    heights = data_by_class[cls]

    bars = ax.bar(
        x_positions,
        heights,
        width=bar_width,
        label=f'Class {cls}',
        color=colors[i % len(colors)]
    )

    # Add sample counts on top of each bar
    for bar in bars:
        height = bar.get_height()
        ax.text(
            bar.get_x() + bar.get_width() / 2,
            height + 5,
            f'{int(height)}',
            ha='center',
            va='bottom',
            fontsize=9
        )

# === Customize chart labels and appearance ===
ax.set_xticks(x)
ax.set_xticklabels(labels)
ax.set_xlabel('Data splits')
ax.set_ylabel('Sample count')
ax.set_title('Class label distribution among the data splits')
ax.legend()
plt.grid(axis='y', linestyle='--', alpha=0.2)
plt.tight_layout()

# Save the figure as a PNG file
plt.savefig('/kaggle/working/cls_distribution.png', dpi=300, bbox_inches='tight')
plt.show()

## Data Augmentation

In this step, additional training samples are generated by applying transformations to the original signals.  
This increases dataset diversity and helps the model generalize better.

### Main Functions
1. **add_noise(signal, noise_level=0.01)**  
   - Adds Gaussian noise to the signal.  
   - `noise_level`: controls the amplitude of the noise.  

2. **time_shift(signal, shift_max=100)**  
   - Shifts the signal left or right by a random number of samples.  
   - `shift_max`: maximum shift allowed.  

3. **scaling(signal, scale_range=(0.9, 1.1))**  
   - Multiplies the signal by a random scaling factor.  
   - `scale_range`: defines min and max scaling factors.  

4. **augment_dataset(X, y, sid, sample_ids)**  
   - Applies one or more augmentation functions to the dataset.  
   - Returns augmented arrays together with original labels and IDs.  

### Verification & Analysis
- After augmentation, the code prints:  
  1. New dataset size (original + augmented samples).  
  2. Proportion of augmented vs. original samples.  
- Optionally, plots a few augmented signals for visual inspection.  

### Why Data Augmentation?
- Prevents overfitting by exposing the model to signal variations.  
- Simulates realistic noise and variability in physiological data.  
- Helps the model learn more robust features.

In [None]:
# === Define augmenters for each physiological signal ===
def create_ecg_augmenter(length):
    return (
        AddNoise(scale=0.01) +          
        Drift(max_drift=(0.02, 0.02))  
        # TimeWarp is omitted because ECG can be distorted too much by time warping
    ), length

def create_eda_augmenter(length):
    return (
        AddNoise(scale=0.05) * 2 +                      
        Drift(max_drift=(0.1, 0.1)) +                   
        TimeWarp(n_speed_change=2, max_speed_ratio=1.5) 
    ), length

def create_resp_augmenter(length):
    return (
        AddNoise(scale=0.02) +                          
        TimeWarp(n_speed_change=2, max_speed_ratio=1.3) 
        + Drift(max_drift=(0.05, 0.05))                 
    ), length


# === Padding function to ensure all augmented signals have the same length ===
def pad_to_length(x_aug, final_len):
    return np.array([
        np.pad(x, ((0, final_len - x.shape[0]), (0, 0)), mode='constant')
        if x.shape[0] < final_len else x
        for x in x_aug
    ])


# === Augment samples for a specific class with a given number of new samples ===
def augment_class(X, y, target_class, n_aug, augmenter, final_len):
    idx = np.where(y == target_class)[0]
    if len(idx) == 0:
        raise ValueError(f"No samples found for class {target_class}")
    
    # Select all samples belonging to this class
    X_class = X[idx]
    
    # Repeat samples if needed until reaching the target augmentation size
    n_repeat = int(np.ceil(n_aug / len(X_class)))
    X_repeat = np.repeat(X_class, n_repeat, axis=0)[:n_aug]
    
    # Apply augmentation pipeline
    X_aug = augmenter.augment(X_repeat)
    
    # Pad augmented samples to ensure equal length
    X_aug = pad_to_length(X_aug, final_len)
    
    # Assign the same label for all augmented samples
    y_aug = np.full((n_aug,), target_class)
    return X_aug, y_aug


# === Augment the training set to balance class distributions ===
def augment_train_set(X_ecg, X_eda, X_resp, y, use_random=True, seed=42, plot_distribution=True):
    # === Automatically compute augmentation plan based on class imbalance ===
    class_counts = Counter(y)
    max_count = max(class_counts.values())
    
    # Augment minority classes until they match the majority class count
    augment_plan = {cls: max_count - count for cls, count in class_counts.items() if count < max_count}

    print(f"[INFO] Original label counts: {dict(class_counts)}")
    print(f"[INFO] Auto augment plan: {augment_plan}")

    # === Create augmenters for each signal type ===
    augmenter_ecg, len_ecg = create_ecg_augmenter(X_ecg.shape[1])
    augmenter_eda, len_eda = create_eda_augmenter(X_eda.shape[1])
    augmenter_resp, len_resp = create_resp_augmenter(X_resp.shape[1])

    # === Perform augmentation for each underrepresented class ===
    X_ecg_aug_all, X_eda_aug_all, X_resp_aug_all, y_aug_all = [], [], [], []

    for cls, n_aug in augment_plan.items():
        ecg_aug, y_cls = augment_class(X_ecg, y, cls, n_aug, augmenter_ecg, len_ecg)
        eda_aug, _    = augment_class(X_eda, y, cls, n_aug, augmenter_eda, len_eda)
        resp_aug, _   = augment_class(X_resp, y, cls, n_aug, augmenter_resp, len_resp)

        # Collect augmented data
        X_ecg_aug_all.append(ecg_aug)
        X_eda_aug_all.append(eda_aug)
        X_resp_aug_all.append(resp_aug)
        y_aug_all.append(y_cls)

    # === Merge original and augmented data ===
    X_ecg_all = np.concatenate([X_ecg] + X_ecg_aug_all, axis=0)
    X_eda_all = np.concatenate([X_eda] + X_eda_aug_all, axis=0)
    X_resp_all = np.concatenate([X_resp] + X_resp_aug_all, axis=0)
    y_all = np.concatenate([y] + y_aug_all, axis=0)

    # === Shuffle dataset for randomness ===
    idx = list(range(len(y_all)))
    if use_random:
        random.seed(seed)
        random.shuffle(idx)
    else:
        np.random.seed(seed)
        idx = np.random.permutation(len(y_all))

    # === Analyze class distribution before and after augmentation ===
    class_labels = {0: 'Baseline', 1: 'Stress', 2: 'Amusement'}

    if plot_distribution:
        plt.figure(figsize=(10, 4))

        # Before Augmentation
        plt.subplot(1, 2, 1)
        plt.title("Before Augmentation")
        keys = list(class_counts.keys())
        values = list(class_counts.values())
        label_names = [class_labels[k] for k in keys]
        bars1 = plt.bar(label_names, values, color='skyblue')
        plt.xlabel("Class")
        plt.ylabel("Count")
        for bar in bars1:
            height = bar.get_height()
            plt.text(bar.get_x() + bar.get_width() / 2.0, height + 1, str(int(height)),
                     ha='center', va='bottom')

        # After Augmentation
        class_counts_aug = Counter(y_all)
        keys_aug = list(class_counts_aug.keys())
        values_aug = list(class_counts_aug.values())
        label_names_aug = [class_labels[k] for k in keys_aug]
        plt.subplot(1, 2, 2)
        plt.title("After Augmentation")
        bars2 = plt.bar(label_names_aug, values_aug, color='lightgreen')
        plt.xlabel("Class")
        plt.ylabel("Count")
        for bar in bars2:
            height = bar.get_height()
            plt.text(bar.get_x() + bar.get_width() / 2.0, height + 1, str(int(height)),
                     ha='center', va='bottom')

        plt.tight_layout()
        plt.savefig('/kaggle/working/cls_augment.png', dpi=300, bbox_inches='tight')
        plt.show()

    return X_ecg_all[idx], X_eda_all[idx], X_resp_all[idx], y_all[idx]


print('✔️ Done!')

In [None]:
# === Apply augmentation to balance the training set ===
X_ecg_train_aug, X_eda_train_aug, X_resp_train_aug, y_train_aug = augment_train_set(
    X_ecg_train, X_eda_train, X_resp_train, y_train, use_random=True, seed=42
)

In [None]:
# === Visualization of Augmented Samples ===
def plot_samples_by_class(X_ecg, X_eda, X_resp, y, classes_to_plot=None, n_samples=2):
    unique_classes = sorted(set(y)) if classes_to_plot is None else classes_to_plot
    for cls in unique_classes:
        idxs = np.where(y == cls)[0][:n_samples]
        print(f"\n[INFO] Plotting {len(idxs)} samples for class {cls}")
        for i, idx in enumerate(idxs):
            plt.figure(figsize=(15, 4))
            
            plt.subplot(3, 1, 1)
            plt.plot(X_ecg[idx], color='blue')
            plt.title(f'Class {cls} - Sample {i+1} - ECG')
            plt.ylabel("ECG")
            
            plt.subplot(3, 1, 2)
            plt.plot(X_eda[idx], color='orange')
            plt.title(f'EDA')
            plt.ylabel("EDA")
            
            plt.subplot(3, 1, 3)
            plt.plot(X_resp[idx], color='green')
            plt.title(f'RESP')
            plt.ylabel("RESP")
            plt.xlabel("Time")

            plt.tight_layout()
            plt.show()

plot_samples_by_class(X_ecg_train_aug, X_eda_train_aug, X_resp_train_aug, y_train_aug, n_samples=1)

## Training Utilities

In this section, several helper components are defined to support model training and evaluation:

- **One-hot Encoding**: Converts categorical labels into one-hot vectors for multi-class classification.  
- **Custom Loss (Focal Loss)**: Designed to handle class imbalance by focusing more on hard-to-classify samples.  
- **F1 Callback**: Custom callback to monitor the macro-F1 score on the validation set and apply early stopping when no improvement is observed.
- **Threshold Optimization**: Functions to assign labels based on adaptive thresholds for each class and to search for optimal thresholds that maximize the macro F1 score. 
- **ROC Curve Visualization**: Function to compute and plot ROC curves for each class, including AUC scores.  

In [None]:
# === One-hot encoding for labels ===
y_train_cat = to_categorical(y_train_aug, num_classes = 3)
y_val_cat   = to_categorical(y_val, num_classes = 3)
y_test_cat  = to_categorical(y_test, num_classes = 3)

print('✔️ Done!')

In [None]:
# === Custom Loss: Focal Loss for handling class imbalance ===
@tf.keras.utils.register_keras_serializable()
class FocalLoss(tf.keras.losses.Loss):
    def __init__(self, gamma=2.0, alpha=1.0, name='focal_loss'):
        super().__init__(name=name)
        self.gamma = gamma
        if isinstance(alpha, (list, tuple)):
            self.alpha = tf.convert_to_tensor(alpha, dtype=tf.float32)
        else:
            self.alpha = tf.constant(alpha, dtype=tf.float32)

    def call(self, y_true, y_pred):
        y_true = tf.cast(y_true, tf.float32)
        y_pred = tf.clip_by_value(y_pred, K.epsilon(), 1. - K.epsilon())
        ce = -y_true * tf.math.log(y_pred)
        weight = self.alpha * tf.pow(1 - y_pred, self.gamma)
        return tf.reduce_sum(weight * ce, axis=1)

print('✔️ Done!')

In [None]:
# === Custom Callback: Monitor macro-F1 and apply early stopping ===
class F1Callback(Callback):
    def __init__(self, val_data):
        super().__init__()
        self.validation_data = val_data
        self.best_f1 = -np.inf
        self.best_weights = None
        self.patience = 10
        self.wait = 0

    def on_epoch_end(self, epoch, logs=None):
        x_val, y_val_true = self.validation_data
        y_pred_prob = self.model.predict(x_val)
        y_pred = np.argmax(y_pred_prob, axis=1)
        try:
            y_true = np.argmax(y_val_true, axis=1)
        except:
            y_true = y_val_true  # handle sparse labels if needed

        f1_macro = f1_score(y_true, y_pred, average='macro')
        print(f'\nEpoch {epoch+1} — val_macro_f1: {f1_macro:.4f}')

        if f1_macro > self.best_f1:
            self.best_f1 = f1_macro
            self.best_weights = self.model.get_weights()
            self.wait = 0
        else:
            self.wait += 1
            if self.wait >= self.patience:
                print("Early stopping triggered by F1")
                self.model.stop_training = True
                self.model.set_weights(self.best_weights)

print('✔️ Done!')

In [None]:
def apply_thresholds(y_probs, thresholds):
    """
    Assign class labels based on class-specific thresholds.
    """
    y_pred = np.full(y_probs.shape[0], -1)
    for i, row in enumerate(y_probs):
        mask = row >= thresholds
        if np.any(mask):
            y_pred[i] = np.argmax(row * mask)  # select the highest probability among classes above threshold
        else:
            y_pred[i] = np.argmax(row)  # fallback if no class exceeds threshold
    return y_pred

def optimize_thresholds(y_true, y_probs, metric='f1'):
    """
    Find optimal thresholds for each class by maximizing the macro F1 score.
    """
    n_classes = y_probs.shape[1]
    
    def objective(thresholds):
        y_pred = apply_thresholds(y_probs, thresholds)
        score = f1_score(y_true, y_pred, average='macro')
        return -score  # minimize negative F1 to maximize F1

    init_thresholds = np.full(n_classes, 0.5)
    bounds = [(0.1, 0.9)] * n_classes

    result = minimize(objective, init_thresholds, method='Powell', bounds=bounds)
    best_thresholds = result.x
    return best_thresholds

print('✔️ Done!')

In [None]:
# === Function to plot ROC curves for multi-class classification ===
def plot_multiclass_roc(y_true, y_pred_prob, class_names, save_path=None):

    from sklearn.preprocessing import label_binarize
    from sklearn.metrics import roc_curve, auc
    import numpy as np
    import matplotlib.pyplot as plt

    n_classes = len(class_names)
    y_true_bin = label_binarize(y_true, classes=range(n_classes))

    fpr = {}
    tpr = {}
    roc_auc = {}

    for i in range(n_classes):
        fpr[i], tpr[i], _ = roc_curve(y_true_bin[:, i], y_pred_prob[:, i])
        roc_auc[i] = auc(fpr[i], tpr[i])

    plt.figure(figsize=(8, 6))
    for i in range(n_classes):
        plt.plot(fpr[i], tpr[i], label=f"{class_names[i]} (AUC = {roc_auc[i]:.2f})")

    plt.plot([0, 1], [0, 1], 'k--', label='Random (AUC = 0.5)')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel("False Positive Rate")
    plt.ylabel("True Positive Rate")
    plt.title("ROC Curve - Multi-class")
    plt.legend(loc="lower right")
    plt.grid(True)
    plt.tight_layout()

    # Save the ROC curve if a save_path is provided
    if save_path:
        plt.savefig(save_path, dpi=300, bbox_inches='tight')
        print(f"📁 ROC curve saved to: {save_path}")

    plt.show()

    macro_roc_auc = np.mean(list(roc_auc.values()))
    print(f"\nMacro ROC AUC: {macro_roc_auc:.4f}")

print('✔️ Done!')

## Models  

In this section, several machine learning and deep learning models are defined and implemented for physiological signal classification.  
Each model follows a consistent workflow, including **model construction (build)**, **training**, and **evaluation** before moving to the next architecture.  
This structured approach ensures a fair comparison across different methods, ranging from traditional classifiers to hybrid deep learning frameworks.  

### Hybrid Model  

A hybrid deep learning architecture is implemented to integrate ECG, EDA, and Resp signals.  
Each physiological modality is processed in an independent branch that combines **CNN** layers for feature extraction, **RNN (GRU)** layers for temporal modeling, and an **attention mechanism** for focusing on relevant signal patterns.  

The extracted representations are fused through concatenation, followed by fully connected layers to perform final classification.  

In [None]:
def attention_block(x, heads=2, key_dim=8, name=None):
    attn_layer = MultiHeadAttention(num_heads=heads, key_dim=key_dim, name=name)
    attn_output, attn_weights = attn_layer(x, x, return_attention_scores=True)
    attn_output = Add()([x, attn_output])
    return attn_output, attn_weights 

def build_hybrid_model(input_shape_ecg=(3500, 1), input_shape_eda=(3500, 1), input_shape_resp=(3500, 1), num_classes=3):
    l2_reg = regularizers.l2(1e-4)

    # === ECG branch ===
    inp_ecg = Input(shape=input_shape_ecg, name="ecg")
    x_ecg = Conv1D(16, 7, padding='same', kernel_regularizer=l2_reg)(inp_ecg)
    x_ecg = BatchNormalization()(x_ecg)
    x_ecg = Activation('relu')(x_ecg)
    x_ecg = MaxPooling1D(2)(x_ecg)
    x_ecg = Bidirectional(GRU(16, return_sequences=True))(x_ecg)
    x_ecg, attn_ecg = attention_block(x_ecg, name="attn_ecg")
    x_ecg = Dropout(0.4)(x_ecg)
    x_ecg = GlobalAveragePooling1D(name="gap_ecg")(x_ecg)

    # === EDA branch ===
    inp_eda = Input(shape=input_shape_eda, name="eda")
    x_eda = Conv1D(16, 5, padding='same', kernel_regularizer=l2_reg)(inp_eda)
    x_eda = BatchNormalization()(x_eda)
    x_eda = Activation('relu')(x_eda)
    x_eda = MaxPooling1D(2)(x_eda)
    x_eda = GRU(16, return_sequences=True)(x_eda)
    x_eda, attn_eda = attention_block(x_eda, name="attn_eda")
    x_eda = Dropout(0.4)(x_eda)
    x_eda = GlobalAveragePooling1D(name="gap_eda")(x_eda)

    # === RESP branch ===
    inp_resp = Input(shape=input_shape_resp, name="resp")
    x_resp = Conv1D(16, 7, padding='same', kernel_regularizer=l2_reg)(inp_resp)
    x_resp = BatchNormalization()(x_resp)
    x_resp = Activation('relu')(x_resp)
    x_resp = MaxPooling1D(2)(x_resp)
    x_resp = Bidirectional(GRU(16, return_sequences=True))(x_resp)
    x_resp, attn_resp = attention_block(x_resp, name="attn_resp")
    x_resp = Dropout(0.4)(x_resp)
    x_resp = GlobalAveragePooling1D(name="gap_resp")(x_resp)

    # === Fusion block ===
    fused = Concatenate()([x_ecg, x_eda, x_resp])
    fused = Dense(16, activation='relu', kernel_regularizer=l2_reg)(fused)
    fused = Dropout(0.3)(fused)
    output = Dense(num_classes, activation='softmax', kernel_regularizer=l2_reg)(fused)

    return Model(inputs=[inp_ecg, inp_eda, inp_resp], outputs=output)

print('✔️ Done!')

In [None]:
# === Loss + Optimizer ===
alpha_list = [1.0, 1.7, 1.2]  # 0: baseline, 1: stress, 2: amusement
loss_fn = FocalLoss(gamma=1.0, alpha=alpha_list)
optimizer = AdamW(learning_rate=1e-4, weight_decay=5e-5)

# === Input dicts ===
x_train = {'ecg': X_ecg_train_aug, 'eda': X_eda_train_aug, 'resp': X_resp_train_aug}
x_val = {'ecg': X_ecg_val, 'eda': X_eda_val, 'resp': X_resp_val}

# === Callbacks ===
lr_scheduler = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, min_lr=1e-6, mode='min', verbose=1)
ckpt = ModelCheckpoint('model_hybrid_best.keras', monitor='val_loss', save_best_only=True, mode='min', verbose=1)

f1_callback = F1Callback(val_data=(x_val, y_val_cat))

# === Build & Compile ===
model = build_hybrid_model()
model.compile(optimizer=optimizer, loss=loss_fn, metrics=['accuracy'])

# === Training ===
history = model.fit(
    x=x_train,
    y=y_train_cat,
    validation_data=(x_val, y_val_cat),
    epochs=50,
    batch_size=32,
    callbacks=[f1_callback, lr_scheduler, ckpt],
    shuffle=True,
    verbose=1
)

In [None]:
model.summary()

In [None]:
fig, axs = plt.subplots(1, 2, figsize=(14, 5))

# ---- Accuracy ----
axs[0].plot(history.history['accuracy'], label='Train Accuracy')
axs[0].plot(history.history['val_accuracy'], label='Val Accuracy')
axs[0].set_title('Training and Validation Accuracy')
axs[0].set_xlabel('Epoch')
axs[0].set_ylabel('Accuracy')
axs[0].set_ylim(bottom=0)
axs[0].legend()
axs[0].grid(True)

# ---- Loss ----
axs[1].plot(history.history['loss'], label='Train Loss')
axs[1].plot(history.history['val_loss'], label='Val Loss')
axs[1].set_title('Training and Validation Loss')
axs[1].set_xlabel('Epoch')
axs[1].set_ylabel('Loss')
axs[1].set_ylim(bottom=0)
axs[1].legend()
axs[1].grid(True)

# ==== Overall title for the figure ====
fig.suptitle('Late Fusion Hybrid Model', fontsize=16)

plt.tight_layout()
plt.show()

In [None]:
# === Prediction Phase ===
y_test_true = np.argmax(y_test_cat, axis=1)

x_test_dict = {
    'ecg': X_ecg_test,
    'eda': X_eda_test,
    'resp': X_resp_test,
}

# Obtain predicted class probabilities from the trained model
y_pred_probs = model.predict(x_test_dict, verbose=0)

# --- Threshold Optimization ---
# Determine the optimal threshold for each class to maximize macro-F1
optimal_thresholds = optimize_thresholds(y_test_true, y_pred_probs)
print(" Optimal thresholds:", np.round(optimal_thresholds, 3))

# --- Apply Thresholds ---
# Generate final predicted labels based on optimized thresholds
y_pred_cls = apply_thresholds(y_pred_probs, optimal_thresholds)

# === Confusion Matrix ===
# Visualize classification performance using a confusion matrix
cm = confusion_matrix(y_test_true, y_pred_cls)
class_names = ['Baseline (0)', 'Stress (1)', 'Amusement (2)']

plt.figure(figsize=(6, 5))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
            xticklabels=class_names, yticklabels=class_names,
            annot_kws={"size": 14})  # Increase annotation font size
plt.xlabel('Predicted Label', fontsize=12)
plt.ylabel('True Label', fontsize=12)
plt.title('Confusion Matrix (Test Set, Thresholded)', fontsize=14)
plt.xticks(fontsize=12)
plt.yticks(fontsize=12)
plt.tight_layout()
plt.savefig('/kaggle/working/confusion_matrix_test_thresholded.png', dpi=300)
plt.show()

# === Classification Report ===
# Print a detailed classification report including precision, recall, and F1-score
print("\n Classification Report (with thresholds):")
print(classification_report(y_test_true, y_pred_cls,
                            target_names=class_names, digits=4))

# === Macro F1-score ===
# Compute and report the macro-averaged F1 score
macro_f1 = f1_score(y_test_true, y_pred_cls, average='macro')
print(f"\n Macro F1-score: {macro_f1:.4f}")

In [None]:
plot_multiclass_roc(
    y_true=y_test_true,
    y_pred_prob=y_pred_probs,
    class_names=['Baseline', 'Stress', 'Amusement'],
    save_path='/kaggle/working/roc_test_hybrid.png')

### CNN-Only Model

A CNN-only architecture processes ECG, EDA, and Resp signals independently.  
Each modality passes through a CNN branch (Conv1D → BatchNorm → ReLU → MaxPool → GlobalAveragePooling).  
Features from all branches are concatenated and passed through Dense layers with Dropout for final classification.

In [None]:
def build_cnn_only_model(input_shape=(3500, 1), num_classes=3):
    l2_reg = regularizers.l2(1e-4)

    # Each signal passes through its own CNN branch
    def cnn_branch(inp):
        x = Conv1D(16, 7, padding='same', kernel_regularizer=l2_reg)(inp)
        x = BatchNormalization()(x)
        x = Activation('relu')(x)
        x = MaxPooling1D(2)(x)
        x = GlobalAveragePooling1D()(x)
        return x

    inp_ecg = Input(shape=input_shape, name="ecg")
    inp_eda = Input(shape=input_shape, name="eda")
    inp_resp = Input(shape=input_shape, name="resp")

    x_ecg = cnn_branch(inp_ecg)
    x_eda = cnn_branch(inp_eda)
    x_resp = cnn_branch(inp_resp)

    fused = Concatenate()([x_ecg, x_eda, x_resp])
    fused = Dense(16, activation='relu', kernel_regularizer=l2_reg)(fused)
    fused = Dropout(0.3)(fused)
    output = Dense(num_classes, activation='softmax', kernel_regularizer=l2_reg)(fused)

    return Model(inputs=[inp_ecg, inp_eda, inp_resp], outputs=output)

print('✔️ Done!')

In [None]:
# === Loss + Optimizer ===
alpha_list = [1.0, 1.7, 1.2]
loss_fn = FocalLoss(gamma=1.0, alpha = alpha_list)
optimizer = AdamW(learning_rate=1e-4, weight_decay=5e-5)

# === Input dicts ===
x_train = {'ecg': X_ecg_train_aug, 'eda': X_eda_train_aug, 'resp': X_resp_train_aug}
x_val = {'ecg': X_ecg_val, 'eda': X_eda_val, 'resp': X_resp_val}

# === Callbacks ===
lr_scheduler = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, min_lr=1e-6, mode='min', verbose=1)
ckpt_cnn = ModelCheckpoint('model_hcnn_only.keras', monitor='val_loss', save_best_only=True, mode='min', verbose=1)

f1_callback = F1Callback(val_data=(x_val, y_val_cat))

# === Build & Compile ===
cnn_model = build_cnn_only_model()
cnn_model.compile(optimizer=optimizer, loss=loss_fn, metrics=['accuracy'])

# === Fit ===
cnn_history = cnn_model.fit(
    x=x_train,
    y=y_train_cat,
    validation_data=(x_val, y_val_cat),
    epochs=50,
    batch_size=32,
    callbacks=[f1_callback, lr_scheduler, ckpt_cnn],
    shuffle=True,
    verbose=1
)

In [None]:
fig, axs = plt.subplots(1, 2, figsize=(14, 5))

# ---- Accuracy ----
axs[0].plot(cnn_history.history['accuracy'], label='Train Accuracy')
axs[0].plot(cnn_history.history['val_accuracy'], label='Val Accuracy')
axs[0].set_title('Training and Validation Accuracy')
axs[0].set_xlabel('Epoch')
axs[0].set_ylabel('Accuracy')
axs[0].set_ylim(bottom=0)
axs[0].legend()
axs[0].grid(True)

# ---- Loss ----
axs[1].plot(cnn_history.history['loss'], label='Train Loss')
axs[1].plot(cnn_history.history['val_loss'], label='Val Loss')
axs[1].set_title('Training and Validation Loss')
axs[1].set_xlabel('Epoch')
axs[1].set_ylabel('Loss')
axs[1].set_ylim(bottom=0)
axs[1].legend()
axs[1].grid(True)

# ==== Overall title for the figure ====
fig.suptitle('CNN-only Model', fontsize=16)

plt.tight_layout()
plt.show()

In [None]:
# === Prediction Phase ===
y_pred_probs_cnn = cnn_model.predict(x_test_dict, verbose=0)

# --- Threshold Optimization ---
# Determine the optimal threshold for each class to maximize macro-F1
optimal_thresholds_cnn = optimize_thresholds(y_test_true, y_pred_probs_cnn)
print(" Optimal thresholds:", np.round(optimal_thresholds_cnn, 3))

# --- Apply Thresholds ---
# Generate final predicted labels based on optimized thresholds
y_pred_cls_cnn = apply_thresholds(y_pred_probs_cnn, optimal_thresholds_cnn)

# === Confusion Matrix ===
# Visualize classification performance using a confusion matrix
cnn_cm = confusion_matrix(y_test_true, y_pred_cls_cnn)
class_names = ['Baseline (0)', 'Stress (1)', 'Amusement (2)']

plt.figure(figsize=(6, 5))
sns.heatmap(cnn_cm, annot=True, fmt='d', cmap='Blues',
            xticklabels=class_names, yticklabels=class_names,
            annot_kws={"size": 14}) 
plt.xlabel('Predicted Label', fontsize=12)
plt.ylabel('True Label', fontsize=12)
plt.title('Confusion Matrix (Test Set, Thresholded)', fontsize=14)
plt.xticks(fontsize=12)
plt.yticks(fontsize=12)
plt.tight_layout()
plt.savefig('/kaggle/working/cnn_confusion_matrix_test.png', dpi=300)
plt.show()

# === Classification Report ===
# Print a detailed classification report including precision, recall, and F1-score
print("\n Classification Report (with thresholds):")
print(classification_report(y_test_true, y_pred_cls_cnn,
                            target_names=class_names, digits=4))

# === Macro F1-score ===
# Compute and report the macro-averaged F1 score
cnn_macro_f1 = f1_score(y_test_true, y_pred_cls_cnn, average='macro')
print(f"\n Macro F1-score: {cnn_macro_f1:.4f}")

In [None]:
plot_multiclass_roc(
    y_true=y_test_true,
    y_pred_prob=y_pred_probs_cnn,
    class_names=['Baseline', 'Stress', 'Amusement'],
    save_path='/kaggle/working/roc_test_cnn.png')

### RNN-Only Model

A RNN-only architecture processes ECG, EDA, and Resp signals independently.  
Each modality passes through a GRU branch (Bidirectional or unidirectional GRU → GlobalAveragePooling) to capture temporal patterns.  

Features from all branches are concatenated and passed through Dense layers with Dropout for final classification.

In [None]:
def build_bigru_only_model(input_shape=(3500, 1), num_classes=3):
    l2_reg = regularizers.l2(1e-4)

    def rnn_branch(inp, use_bi=True):
        if use_bi:
            x = Bidirectional(GRU(16, return_sequences=True))(inp)
        else:
            x = GRU(16, return_sequences=True)(inp)
        x = GlobalAveragePooling1D()(x)
        return x

    inp_ecg = Input(shape=input_shape, name="ecg")
    inp_eda = Input(shape=input_shape, name="eda")
    inp_resp = Input(shape=input_shape, name="resp")

    x_ecg = rnn_branch(inp_ecg, use_bi=True)
    x_eda = rnn_branch(inp_eda, use_bi=False) 
    x_resp = rnn_branch(inp_resp, use_bi=True)

    fused = Concatenate()([x_ecg, x_eda, x_resp])
    fused = Dense(16, activation='relu', kernel_regularizer=l2_reg)(fused)
    fused = Dropout(0.3)(fused)
    output = Dense(num_classes, activation='softmax', kernel_regularizer=l2_reg)(fused)

    return Model(inputs=[inp_ecg, inp_eda, inp_resp], outputs=output)

print('✔️ Done!')

In [None]:
# === Loss + Optimizer ===
alpha_list = [1.0, 1.7, 1.2]
loss_fn = FocalLoss(gamma=1.0, alpha = alpha_list)
optimizer = AdamW(learning_rate=1e-4, weight_decay=5e-5)

# === Input dicts ===
x_train = {'ecg': X_ecg_train_aug, 'eda': X_eda_train_aug, 'resp': X_resp_train_aug}
x_val = {'ecg': X_ecg_val, 'eda': X_eda_val, 'resp': X_resp_val}

# === Callbacks ===
lr_scheduler = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, min_lr=1e-6, mode='min', verbose=1)
ckpt_rnn = ModelCheckpoint('model_rnn_only.keras', monitor='val_loss', save_best_only=True, mode='min', verbose=1)

f1_callback = F1Callback(val_data=(x_val, y_val_cat))

# === Build & Compile ===
rnn_model = build_bigru_only_model()
rnn_model.compile(optimizer=optimizer, loss=loss_fn, metrics=['accuracy'])

# === Fit ===
rnn_history = rnn_model.fit(
    x=x_train,
    y=y_train_cat,
    validation_data=(x_val, y_val_cat),
    epochs=50,
    batch_size=32,
    callbacks=[f1_callback, lr_scheduler, ckpt_rnn],
    shuffle=True,
    verbose=1
)

In [None]:
fig, axs = plt.subplots(1, 2, figsize=(14, 5))

# ---- Accuracy ----
axs[0].plot(rnn_history.history['accuracy'], label='Train Accuracy')
axs[0].plot(rnn_history.history['val_accuracy'], label='Val Accuracy')
axs[0].set_title('Training and Validation Accuracy')
axs[0].set_xlabel('Epoch')
axs[0].set_ylabel('Accuracy')
axs[0].set_ylim(bottom=0)
axs[0].legend()
axs[0].grid(True)

# ---- Loss ----
axs[1].plot(rnn_history.history['loss'], label='Train Loss')
axs[1].plot(rnn_history.history['val_loss'], label='Val Loss')
axs[1].set_title('Training and Validation Loss')
axs[1].set_xlabel('Epoch')
axs[1].set_ylabel('Loss')
axs[1].set_ylim(bottom=0)
axs[1].legend()
axs[1].grid(True)

# ==== Overall title for the figure ====
fig.suptitle('RNN-only Model', fontsize=16)

plt.tight_layout()
plt.show()

In [None]:
# === Prediction Phase ===
y_pred_probs_rnn = rnn_model.predict(x_test_dict, verbose=0)

# --- Threshold Optimization ---
# Determine the optimal threshold for each class to maximize macro-F1
optimal_thresholds_rnn = optimize_thresholds(y_test_true, y_pred_probs_rnn)
print(" Optimal thresholds:", np.round(optimal_thresholds_rnn, 3))

# --- Apply Thresholds ---
# Generate final predicted labels based on optimized thresholds
y_pred_cls_rnn = apply_thresholds(y_pred_probs_rnn, optimal_thresholds_rnn)

# === Confusion Matrix ===
# Visualize classification performance using a confusion matrix
rnn_cm = confusion_matrix(y_test_true, y_pred_cls_rnn)
class_names = ['Baseline (0)', 'Stress (1)', 'Amusement (2)']

plt.figure(figsize=(6, 5))
sns.heatmap(rnn_cm, annot=True, fmt='d', cmap='Blues',
            xticklabels=class_names, yticklabels=class_names,
            annot_kws={"size": 14})  
plt.xlabel('Predicted Label', fontsize=12)
plt.ylabel('True Label', fontsize=12)
plt.title('Confusion Matrix (Test Set, Thresholded)', fontsize=14)
plt.xticks(fontsize=12)
plt.yticks(fontsize=12)
plt.tight_layout()
plt.savefig('/kaggle/working/rnn_confusion_matrix_test.png', dpi=300)
plt.show()

# === Classification Report ===
# Print a detailed classification report including precision, recall, and F1-score
print("\n Classification Report (with thresholds):")
print(classification_report(y_test_true, y_pred_cls_rnn,
                            target_names=class_names, digits=4))

# === Macro F1-score ===
# Compute and report the macro-averaged F1 score
rnn_macro_f1 = f1_score(y_test_true, y_pred_cls_rnn, average='macro')
print(f"\n Macro F1-score: {rnn_macro_f1:.4f}")

In [None]:
plot_multiclass_roc(
    y_true=y_test_true,
    y_pred_prob=y_pred_probs_rnn,
    class_names=['Baseline', 'Stress', 'Amusement'],
    save_path='/kaggle/working/roc_test_rnn.png')

### SVM Model

The SVM model uses handcrafted features extracted from ECG, EDA, and Resp signals.  

**Pipeline:**
1. Feature extraction (`extract_all_features_custom`) for each modality  
2. Standardization using `StandardScaler`  
3. Train SVM classifier with RBF kernel (`SVC`)  
4. Predict class labels and probabilities on the test set

In [None]:
# Feature extraction
X_train_feat = extract_all_features_custom(X_ecg_train, X_eda_train, X_resp_train)
X_test_feat  = extract_all_features_custom(X_ecg_test,  X_eda_test,  X_resp_test)

# Standardization
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train_feat)
X_test_scaled = scaler.transform(X_test_feat)

# Train SVM
svm_clf = SVC(probability=True, kernel='rbf', C=1.0, random_state=42)
svm_clf.fit(X_train_scaled, y_train)

# Prediction
y_pred_svm = svm_clf.predict(X_test_scaled)
y_prob_svm = svm_clf.predict_proba(X_test_scaled)

In [None]:
class_names = ['Baseline', 'Stress', 'Amusement']

# === Confusion Matrix ===
# Visualize classification performance using a confusion matrix
cm_svm = confusion_matrix(y_test, y_pred_svm)

plt.figure(figsize=(6, 5))
sns.heatmap(cm_svm, annot=True, fmt='d', cmap='Blues',
            xticklabels=class_names, yticklabels=class_names,
            annot_kws={"size": 14})
plt.xlabel('Predicted Label', fontsize=12)
plt.ylabel('True Label', fontsize=12)
plt.title('Confusion Matrix (SVM)', fontsize=14)
plt.xticks(fontsize=12)
plt.yticks(fontsize=12)
plt.tight_layout()
plt.savefig('/kaggle/working/confusion_matrix_svm.png', dpi=300)
plt.show()

# === Classification Report ===
# Print a detailed classification report including precision, recall, and F1-score
print("\n Classification Report (SVM):")
print(classification_report(y_test, y_pred_svm,
                            target_names=class_names,
                            digits=4))

# === Macro F1-score ===
# Compute and report the macro-averaged F1 score
macro_f1_svm = f1_score(y_test, y_pred_svm, average='macro')
print(f"\n Macro F1-score: {macro_f1_svm:.4f}")

In [None]:
plot_multiclass_roc(
    y_true=y_test,
    y_pred_prob=y_prob_svm,
    class_names=class_names,
    save_path='/kaggle/working/roc_svm_baseline.png'  
)

### Random Forest Model

A Random Forest classifier is trained on handcrafted features from ECG, EDA, and Resp signals.  

**Pipeline:**
1. Feature extraction for each modality  
2. Initialize `RandomForestClassifier` (e.g., 200 trees, max depth 10, balanced class weights)  
3. Train the model on the training set  
4. Predict on the test set

In [None]:
# Initialize model
rf = RandomForestClassifier(
    n_estimators=200,
    max_depth=10,
    random_state=42,
    class_weight='balanced'  
)

# Train the model
rf.fit(X_train_feat, y_train)

# Predict labels
y_pred_rf = rf.predict(X_test_feat)

# Predict probabilities (for ROC)
y_prob_rf = rf.predict_proba(X_test_feat)

In [None]:
# === Confusion Matrix ===
# Visualize classification performance using a confusion matrix
cm_rf = confusion_matrix(y_test, y_pred_rf)

plt.figure(figsize=(6, 5))
sns.heatmap(cm_rf, annot=True, fmt='d', cmap='Blues',
            xticklabels=class_names, yticklabels=class_names,
            annot_kws={"size": 14})
plt.xlabel('Predicted Label', fontsize=12)
plt.ylabel('True Label', fontsize=12)
plt.title('Confusion Matrix (Random Forest)', fontsize=14)
plt.xticks(fontsize=12)
plt.yticks(fontsize=12)
plt.tight_layout()
plt.savefig('/kaggle/working/confusion_matrix_rf.png', dpi=300)
plt.show()

# === Classification Report ===
# Print a detailed classification report including precision, recall, and F1-score
print("\n Classification Report (Random Forest):")
print(classification_report(y_test, y_pred_rf,
                            target_names=class_names,
                            digits=4))

# === Macro F1-score ===
# Compute and report the macro-averaged F1 score
macro_f1_rf = f1_score(y_test, y_pred_rf, average='macro')
print(f"\n Macro F1-score (Random Forest): {macro_f1_rf:.4f}")

In [None]:
plot_multiclass_roc(
    y_true=y_test,
    y_pred_prob=y_prob_rf,
    class_names=class_names,
    save_path='/kaggle/working/roc_curve_rf.png'
)

## Error Analysis of the Hybrid Model

This section investigates the misclassifications of the hybrid model to better understand its limitations and strengths.

In [None]:
# Mapping nhãn số sang tên lớp
class_map = {0: 'Baseline', 1: 'Stress', 2: 'Amusement'}

# Danh sách lưu kết quả lỗi
error_rows = []

for i in range(len(y_test_true)):
    if y_pred_cls[i] != y_test_true[i]:
        # Trích sample gây lỗi
        input_sample = {
            'ecg': x_test_dict['ecg'][i:i+1],
            'eda': x_test_dict['eda'][i:i+1],
            'resp': x_test_dict['resp'][i:i+1]
        }

        # Lấy attention từ model phụ
        attn_ecg, attn_eda, attn_resp, _ = attn_model.predict(input_sample, verbose=0)

        # Tổng attention từng modality
        attn_total = {
            'ECG': np.sum(attn_ecg[0]),    # shape: (time, dim)
            'EDA': np.sum(attn_eda[0]),
            'RESP': np.sum(attn_resp[0])
        }

        # Modal chính được chú ý nhiều nhất
        dominant_modality = max(attn_total, key=attn_total.get)

        # Ghi lại thông tin lỗi
        error_rows.append({
            'Sample Index': i,
            'True Label': class_map[y_test_true[i]],
            'Predicted': class_map[y_pred_cls[i]],
            'Dominant Modality': dominant_modality,
            'ECG Attention Sum': round(attn_total['ECG'], 3),
            'EDA Attention Sum': round(attn_total['EDA'], 3),
            'RESP Attention Sum': round(attn_total['RESP'], 3),
        })

# Chuyển thành DataFrame
df_error_by_class = pd.DataFrame(error_rows)

# Tạo bảng tóm tắt theo (True Label → Predicted → Dominant Modality)
summary = df_error_by_class.groupby(['True Label', 'Predicted', 'Dominant Modality']).size().unstack(fill_value=0)

# 🔍 Hiển thị bảng tóm tắt
print("\n Bảng phân tích lỗi theo nhãn thật, dự đoán sai và modality trội:")
print(summary)

In [None]:
# Tạo DataFrame tổng hợp lỗi
df_errors = pd.DataFrame({
    'Sample': np.arange(len(y_test_true)),
    'Subject': sid_test,
    'True Label': y_test_true,
    'Predicted': y_pred_cls
})

# Thêm cột đánh dấu đúng/sai
df_errors['Correct'] = df_errors['True Label'] == df_errors['Predicted']

# Ánh xạ nhãn số sang tên lớp
df_errors['True Label Name'] = df_errors['True Label'].map(class_map)
df_errors['Predicted Name'] = df_errors['Predicted'].map(class_map)

# (Tùy chọn) Hiển thị một số dòng đầu tiên để kiểm tra
print("\n Tổng hợp lỗi dự đoán:")
print(df_errors.head())

In [None]:
# Tạo bảng đếm lỗi theo Subject và lớp thật
errors_by_subject_class = df_errors[df_errors['Correct'] == False].groupby(['Subject', 'True Label Name']).agg(
    misclassified=('Sample', 'count')
).reset_index()

print("\n Misclassified Samples per Subject per Class:")
print(errors_by_subject_class.head(10))

In [None]:
# Tính accuracy theo từng Subject và Class
acc_by_subject_class = df_errors.groupby(['Subject', 'True Label Name']).agg(
    total=('Sample', 'count'),
    correct=('Correct', 'sum')
).reset_index()

# Thêm cột accuracy nếu cần (tùy mục đích)
acc_by_subject_class['accuracy'] = (acc_by_subject_class['correct'] / acc_by_subject_class['total']).round(3)

print("\n Accuracy per Subject per Class:")
print(acc_by_subject_class.head(10))

In [None]:
# Gộp với bảng tổng số mẫu để tính error rate
error_rate_by_subject_class = pd.merge(
    acc_by_subject_class,
    errors_by_subject_class,
    on=['Subject', 'True Label Name'],
    how='left'
)

# Điền 0 nếu không có lỗi nào
error_rate_by_subject_class['misclassified'] = error_rate_by_subject_class['misclassified'].fillna(0).astype(int)

# Tính error rate
error_rate_by_subject_class['error_rate'] = (error_rate_by_subject_class['misclassified'] / error_rate_by_subject_class['total']).round(3)

print("\n Error Rate per Subject per Class:")
print(error_rate_by_subject_class.head(10))

## Mean Activation Analysis

This section analyzes the contributions of each modality branch (ECG, EDA, Resp) in the hybrid model.

**Steps:**
1. Create a sub-model to extract outputs from the three branches and the final softmax output.
2. Predict on the test set using the sub-model.
3. Compute predicted labels by applying optimal thresholds.
4. Calculate mean activation for each branch grouped by predicted emotion labels.
5. Visualize the mean activations using a bar chart.


In [None]:
# === Create sub-model to extract outputs from 3 branches ===
branch_model = Model(
    inputs=model.input,
    outputs=[
        model.get_layer("gap_ecg").output,
        model.get_layer("gap_eda").output,
        model.get_layer("gap_resp").output,
        model.output  # softmax đầu ra
    ]
)

# === Predict on test set ===
ecg_vec, eda_vec, resp_vec, y_pred_prob = branch_model.predict(x_test_dict, verbose=0)

# === Compute predicted labels (after thresholding) ===
optimal_thresholds = optimize_thresholds(np.argmax(y_test_cat, axis=1), y_pred_prob)
y_pred_cls = apply_thresholds(y_pred_prob, optimal_thresholds)

# === Compute mean activation per branch grouped by label ===
df = pd.DataFrame({
    'Label': y_pred_cls,
    'ECG': ecg_vec.mean(axis=1),
    'EDA': eda_vec.mean(axis=1),
    'RESP': resp_vec.mean(axis=1)
})
mean_activation = df.groupby('Label').mean().round(6)
label_map = {0: 'Baseline', 1: 'Stress', 2: 'Amusement'}
mean_activation.index = mean_activation.index.map(label_map)

print(" Mean Activation Per Branch (Grouped by Predicted Emotion):\n")
print(mean_activation)

In [None]:
# === Bar chart ===
mean_activation.plot(kind='bar', figsize=(8, 5), colormap='viridis')
plt.title("Mean Signal Activation per Emotion")
plt.ylabel("Mean Activation")
plt.xticks(rotation=0)
plt.grid(axis='y', linestyle='--', alpha=0.7)
plt.tight_layout()
plt.savefig('/kaggle/working/mean_activation.png', dpi=300)
plt.show()

## Attention Weight Analysis

This section analyzes the attention mechanisms of the hybrid model to interpret how the model focuses on important temporal patterns in physiological signals.

**Components:**
1. **Temporal Attention**  
   - Extract attention weights from the RNN/GRU layers.  
   - Visualize the distribution of attention over time to identify which signal segments contribute most to the prediction.

2. **Attention Entropy**  
   - Compute the entropy of attention weights as a measure of how concentrated or distributed the model's focus is.  
   - Lower entropy indicates focused attention on specific segments, while higher entropy indicates more distributed attention.


### Temporal Attention

In [None]:
# === Tạo model phụ để lấy attention ===
attn_model = Model(
    inputs=model.input,
    outputs=[
        model.get_layer("attn_ecg").output[1],   # attention weights ECG
        model.get_layer("attn_eda").output[1],   # attention weights EDA
        model.get_layer("attn_resp").output[1],  # attention weights RESP
        model.output                             # output softmax
    ]
)

In [None]:
# === Tạo thư mục lưu overlay ===
os.makedirs("heatmaps_overlay/correct", exist_ok=True)
os.makedirs("heatmaps_overlay/incorrect", exist_ok=True)

# === Chọn ngẫu nhiên 20 mẫu ===
random_indices = np.random.choice(len(y_test_true), size=50, replace=False)

for idx in random_indices:
    # --- Lấy dữ liệu ---
    sample_input = [
        x_test_dict['ecg'][idx:idx+1],
        x_test_dict['eda'][idx:idx+1],
        x_test_dict['resp'][idx:idx+1]
    ]

    attn_ecg, attn_eda, attn_resp, softmax_out = attn_model.predict(sample_input)

    mean_attn_ecg = attn_ecg.mean(axis=1)[0].mean(axis=0)
    mean_attn_eda = attn_eda.mean(axis=1)[0].mean(axis=0)
    mean_attn_resp = attn_resp.mean(axis=1)[0].mean(axis=0)

    true_label = y_test_true[idx]
    pred_label = np.argmax(softmax_out)

    # --- Chuẩn bị tín hiệu và attention ---
    sig_ecg = x_test_dict['ecg'][idx].squeeze()
    sig_eda = x_test_dict['eda'][idx].squeeze()
    sig_resp = x_test_dict['resp'][idx].squeeze()
    
    signals = {
        'ECG': sig_ecg,
        'EDA': sig_eda,
        'RESP': sig_resp
    }
    signal_cmaps = {
        'ECG': 'Reds',   # đỏ nhạt đến đậm
        'EDA': 'Greens',    # xanh lá nhạt đến đậm
        'RESP': 'Blues'   # xanh dương nhạt đến đậm
    }
    attentions = {
        'ECG': mean_attn_ecg,
        'EDA': mean_attn_eda,
        'RESP': mean_attn_resp
    }

    # === Vẽ overlay attention ===
    fig, axs = plt.subplots(3, 1, figsize=(14, 8), sharex=True)

    for i, key in enumerate(['ECG', 'EDA', 'RESP']):
        sig = signals[key]
        attn = attentions[key]

        # Resample attention nếu không khớp độ dài
        if len(attn) != len(sig):
            attn = np.interp(
                np.linspace(0, 1, len(sig)),
                np.linspace(0, 1, len(attn)),
                attn
            )

        # Chuẩn hóa attention về [0, 1]
        attn_norm = (attn - attn.min()) / (attn.max() - attn.min() + 1e-8)

        axs[i].plot(sig, color='black', label=key)

        # Dùng imshow để overlay attention như heatmap
        axs[i].imshow(
            attn_norm[np.newaxis, :],
            aspect='auto',
            cmap=signal_cmaps[key],
            alpha=0.6,
            extent=[0, len(sig), sig.min(), sig.max()],
            interpolation='bilinear',
            origin='lower'
        )

        axs[i].set_ylabel(key)
        axs[i].legend(loc='upper right')

    axs[2].set_xlabel("Timestep")
    plt.suptitle(f"Overlay Attention + Signal (True: {true_label}, Pred: {pred_label})", fontsize=14)
    plt.tight_layout()

    fname_overlay = f"overlay_{idx}_T{true_label}_P{pred_label}.png"
    overlay_path = f"heatmaps_overlay/{'correct' if true_label == pred_label else 'incorrect'}/{fname_overlay}"
    plt.savefig(overlay_path, dpi=300)
    plt.close()

### Attention Entropy

In [None]:
# ======== Hàm tính entropy và trích vector attention ========

def attention_entropy(attn_weights, epsilon=1e-8):
    attn_weights = np.clip(attn_weights, epsilon, 1.0)
    return -np.sum(attn_weights * np.log(attn_weights))

def extract_attention_vector(attn_matrix):
    """
    Trích vector attention 1D từ tensor [1, heads, T, T]
    - Trung bình qua all heads
    - Trung bình theo trục Q (query)
    - Chuẩn hóa để tổng = 1
    Trả về vector (T,)
    """
    # Shape: (1, heads, T, T)
    attn_avg = np.mean(attn_matrix, axis=1)         # (1, T, T)
    attn_mean_query = np.mean(attn_avg[0], axis=0)  # (T,)
    attn_norm = attn_mean_query / np.sum(attn_mean_query + 1e-8)
    return attn_norm

# ======== Tính entropy cho từng mẫu test ========

class_map = {0: 'Baseline', 1: 'Stress', 2: 'Amusement'}
entropy_by_class = {
    'ECG': defaultdict(list),
    'EDA': defaultdict(list),
    'RESP': defaultdict(list)
}

for i in range(len(y_test_true)):
    label = y_test_true[i]

    sample_input = {
        'ecg': x_test_dict['ecg'][i:i+1],
        'eda': x_test_dict['eda'][i:i+1],
        'resp': x_test_dict['resp'][i:i+1]
    }

    # Trích attention từ model phụ
    attn_ecg, attn_eda, attn_resp, _ = attn_model.predict(sample_input, verbose=0)

    # Tính vector attention và entropy
    vec_ecg = extract_attention_vector(attn_ecg)
    vec_eda = extract_attention_vector(attn_eda)
    vec_resp = extract_attention_vector(attn_resp)

    entropy_by_class['ECG'][label].append(attention_entropy(vec_ecg))
    entropy_by_class['EDA'][label].append(attention_entropy(vec_eda))
    entropy_by_class['RESP'][label].append(attention_entropy(vec_resp))

# ======== Hiển thị entropy trung bình theo class & modality ========

print("\n Entropy trung bình theo lớp và modality:")
for modality in ['ECG', 'EDA', 'RESP']:
    print(f"\n Modality: {modality}")
    for label, ent_list in entropy_by_class[modality].items():
        mean_entropy = np.mean(ent_list)
        std_entropy = np.std(ent_list)
        print(f"  {class_map[label]}: Mean = {mean_entropy:.4f}, Std = {std_entropy:.4f}")

# ======== Tạo DataFrame để vẽ boxplot ========

entropy_records = []

for modality in ['ECG', 'EDA', 'RESP']:
    for label, ent_list in entropy_by_class[modality].items():
        for ent in ent_list:
            entropy_records.append({
                'Modality': modality,
                'Class': class_map[label],
                'Entropy': ent
            })

df_entropy = pd.DataFrame(entropy_records)

## K-Fold Cross-Validation

This section describes the use of K-Fold cross-validation to evaluate model performance more robustly.

**Steps:**
1. Split the dataset into K equal folds.
2. For each fold:
   - Use one fold as the validation set and the remaining K-1 folds as the training set.
   - Train the model on the training set and evaluate on the validation set.
3. Repeat for all K folds and compute the average performance metrics.
4. This approach reduces variance and provides a more reliable estimate of the model's generalization ability.

### Prepare data for K-Fold deep learning

In [None]:
from sklearn.model_selection import KFold
# === Đảm bảo các biến là numpy arrays ===
assert X_ecg.shape[0] == y.shape[0] == sid_clean.shape[0]

# === Danh sách unique subject ID ===
unique_sid = np.unique(sid_clean)
assert len(unique_sid) % 5 == 0, "Số subject phải chia hết cho 5 để chia fold đều"

# === KFold chia theo subject ID ===
kf = KFold(n_splits=5, shuffle=True, random_state=42)
results = []

### Hybrid model 5-fold

In [None]:
# === Định nghĩa hàm build model ===
def build_model_kfold():
    model = build_hybrid_model()
    alpha_list = [1.0, 1.7, 1.2]
    loss_fn = FocalLoss(gamma=1.0, alpha=alpha_list)
    optimizer = AdamW(learning_rate=1e-4, weight_decay=5e-5)
    model.compile(optimizer=optimizer, loss=loss_fn, metrics=['accuracy'])
    return model

print('✔️ Done!')

In [None]:
# === Vòng lặp theo từng fold ===
for fold_idx, (train_subj_idx, test_subj_idx) in enumerate(kf.split(unique_sid)):
    train_subjects = unique_sid[train_subj_idx]
    test_subjects = unique_sid[test_subj_idx]

    train_mask = np.isin(sid_clean, train_subjects)
    test_mask = np.isin(sid_clean, test_subjects)

    X_ecg_train, X_eda_train, X_resp_train = X_ecg[train_mask], X_eda[train_mask], X_resp[train_mask]
    X_ecg_test, X_eda_test, X_resp_test = X_ecg[test_mask], X_eda[test_mask], X_resp[test_mask]
    y_train, y_test = y[train_mask], y[test_mask]

    # === Augmentation ===
    X_ecg_train_aug, X_eda_train_aug, X_resp_train_aug, y_train_aug = augment_train_set(
        X_ecg_train, X_eda_train, X_resp_train, y_train,
        use_random=True, seed=42, plot_distribution=False
    )

    y_train_cat = to_categorical(y_train_aug, num_classes=3)
    y_test_cat = to_categorical(y_test, num_classes=3)

    x_train_dict = {'ecg': X_ecg_train_aug, 'eda': X_eda_train_aug, 'resp': X_resp_train_aug}
    x_test_dict = {'ecg': X_ecg_test, 'eda': X_eda_test, 'resp': X_resp_test}

    model_kfold = build_model_kfold()
    model_kfold.fit(x_train_dict, y_train_cat, validation_data=(x_test_dict, y_test_cat),
                    epochs=10, batch_size=32, verbose=0)

    y_pred_prob = model_kfold.predict(x_test_dict)
    y_pred = np.argmax(y_pred_prob, axis=1)

    acc = accuracy_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred, average='macro')
    auc = roc_auc_score(y_test_cat, y_pred_prob, average='macro', multi_class='ovr')

    results.append([f"Fold-{fold_idx+1}", acc, f1, auc])

In [None]:
# === Tổng hợp kết quả ===
results_df = pd.DataFrame(results, columns=["Fold", "Accuracy", "F1_macro", "AUC"])

print(" Kết quả trung bình ± độ lệch chuẩn:")
for col in ["Accuracy", "F1_macro", "AUC"]:
    mean = results_df[col].mean() * 100
    std = results_df[col].std() * 100
    print(f"{col}: {mean:.2f} ± {std:.2f}%")

In [None]:
# Hiển thị bảng kết quả từng fold
print(" Kết quả từng fold:")
print(results_df.to_string(index=False))

In [None]:
# === Vẽ boxplot ===
plt.figure(figsize=(10, 5))
sns.boxplot(data=results_df[["Accuracy", "F1_macro", "AUC"]])
plt.title("5-Fold Cross-Subject Performance")
plt.ylabel("Score")
plt.grid(True)
plt.savefig('/kaggle/working/fold5.png', dpi=300)
plt.show()

### CNN-only 5-fold

In [None]:
# === Định nghĩa hàm build model ===
def build_cnn_model_kfold():
    model = build_cnn_only_model()
    alpha_list = [1.0, 1.7, 1.2]
    loss_fn = FocalLoss(gamma=1.0, alpha=alpha_list)
    optimizer = AdamW(learning_rate=1e-4, weight_decay=5e-5)
    model.compile(optimizer=optimizer, loss=loss_fn, metrics=['accuracy'])
    return model

print('✔️ Done!')

In [None]:
cnn_results = []
# === Vòng lặp theo từng fold ===
for fold_idx, (train_subj_idx, test_subj_idx) in enumerate(kf.split(unique_sid)):
    train_subjects = unique_sid[train_subj_idx]
    test_subjects = unique_sid[test_subj_idx]

    train_mask = np.isin(sid_clean, train_subjects)
    test_mask = np.isin(sid_clean, test_subjects)

    X_ecg_train, X_eda_train, X_resp_train = X_ecg[train_mask], X_eda[train_mask], X_resp[train_mask]
    X_ecg_test, X_eda_test, X_resp_test = X_ecg[test_mask], X_eda[test_mask], X_resp[test_mask]
    y_train, y_test = y[train_mask], y[test_mask]

    # === Augmentation ===
    X_ecg_train_aug, X_eda_train_aug, X_resp_train_aug, y_train_aug = augment_train_set(
        X_ecg_train, X_eda_train, X_resp_train, y_train,
        use_random=True, seed=42, plot_distribution=False
    )

    y_train_cat = to_categorical(y_train_aug, num_classes=3)
    y_test_cat = to_categorical(y_test, num_classes=3)

    x_train_dict = {'ecg': X_ecg_train_aug, 'eda': X_eda_train_aug, 'resp': X_resp_train_aug}
    x_test_dict = {'ecg': X_ecg_test, 'eda': X_eda_test, 'resp': X_resp_test}

    cnn_model_kfold = build_cnn_model_kfold()
    cnn_model_kfold.fit(x_train_dict, y_train_cat, validation_data=(x_test_dict, y_test_cat),
                    epochs=10, batch_size=32, verbose=0)

    y_pred_prob = cnn_model_kfold.predict(x_test_dict)
    y_pred = np.argmax(y_pred_prob, axis=1)

    acc = accuracy_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred, average='macro')
    auc = roc_auc_score(y_test_cat, y_pred_prob, average='macro', multi_class='ovr')

    cnn_results.append([f"Fold-{fold_idx+1}", acc, f1, auc])

In [None]:
# === Tổng hợp kết quả ===
cnn_results_df = pd.DataFrame(cnn_results, columns=["Fold", "Accuracy", "F1_macro", "AUC"])

print(" Kết quả trung bình ± độ lệch chuẩn:")
for col in ["Accuracy", "F1_macro", "AUC"]:
    cnn_mean = cnn_results_df[col].mean() * 100
    cnn_std = cnn_results_df[col].std() * 100
    print(f"{col}: {cnn_mean:.2f} ± {cnn_std:.2f}%")

In [None]:
# Hiển thị bảng kết quả từng fold
print(" Kết quả từng fold:")
print(cnn_results_df.to_string(index=False))

In [None]:
# === Vẽ boxplot ===
plt.figure(figsize=(10, 5))
sns.boxplot(data=cnn_results_df[["Accuracy", "F1_macro", "AUC"]])
plt.title("5-Fold Cross-Subject Performance")
plt.ylabel("Score")
plt.grid(True)
plt.savefig('/kaggle/working/fold5_cnn.png', dpi=300)
plt.show()

### RNN-only 5-fold

In [None]:
# === Định nghĩa hàm build model ===
def build_rnn_model_kfold():
    model = build_bigru_only_model()
    alpha_list = [1.0, 1.7, 1.2]
    loss_fn = FocalLoss(gamma=1.0, alpha=alpha_list)
    optimizer = AdamW(learning_rate=1e-4, weight_decay=5e-5)
    model.compile(optimizer=optimizer, loss=loss_fn, metrics=['accuracy'])
    return model

print('✔️ Done!')

In [None]:
rnn_results = []
# === Vòng lặp theo từng fold ===
for fold_idx, (train_subj_idx, test_subj_idx) in enumerate(kf.split(unique_sid)):
    train_subjects = unique_sid[train_subj_idx]
    test_subjects = unique_sid[test_subj_idx]

    train_mask = np.isin(sid_clean, train_subjects)
    test_mask = np.isin(sid_clean, test_subjects)

    X_ecg_train, X_eda_train, X_resp_train = X_ecg[train_mask], X_eda[train_mask], X_resp[train_mask]
    X_ecg_test, X_eda_test, X_resp_test = X_ecg[test_mask], X_eda[test_mask], X_resp[test_mask]
    y_train, y_test = y[train_mask], y[test_mask]

    # === Augmentation ===
    X_ecg_train_aug, X_eda_train_aug, X_resp_train_aug, y_train_aug = augment_train_set(
        X_ecg_train, X_eda_train, X_resp_train, y_train,
        use_random=True, seed=42, plot_distribution=False
    )

    y_train_cat = to_categorical(y_train_aug, num_classes=3)
    y_test_cat = to_categorical(y_test, num_classes=3)

    x_train_dict = {'ecg': X_ecg_train_aug, 'eda': X_eda_train_aug, 'resp': X_resp_train_aug}
    x_test_dict = {'ecg': X_ecg_test, 'eda': X_eda_test, 'resp': X_resp_test}

    rnn_model_kfold = build_rnn_model_kfold()
    rnn_model_kfold.fit(x_train_dict, y_train_cat, validation_data=(x_test_dict, y_test_cat),
                    epochs=10, batch_size=32, verbose=0)

    y_pred_prob = rnn_model_kfold.predict(x_test_dict)
    y_pred = np.argmax(y_pred_prob, axis=1)

    acc = accuracy_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred, average='macro')
    auc = roc_auc_score(y_test_cat, y_pred_prob, average='macro', multi_class='ovr')

    rnn_results.append([f"Fold-{fold_idx+1}", acc, f1, auc])

In [None]:
# === Tổng hợp kết quả ===
rnn_results_df = pd.DataFrame(rnn_results, columns=["Fold", "Accuracy", "F1_macro", "AUC"])

print(" Kết quả trung bình ± độ lệch chuẩn:")
for col in ["Accuracy", "F1_macro", "AUC"]:
    rnn_mean = rnn_results_df[col].mean() * 100
    rnn_std = rnn_results_df[col].std() * 100
    print(f"{col}: {rnn_mean:.2f} ± {rnn_std:.2f}%")

In [None]:
# Hiển thị bảng kết quả từng fold
print(" Kết quả từng fold:")
print(rnn_results_df.to_string(index=False))

In [None]:
# === Vẽ boxplot ===
plt.figure(figsize=(10, 5))
sns.boxplot(data=rnn_results_df[["Accuracy", "F1_macro", "AUC"]])
plt.title("5-Fold Cross-Subject Performance")
plt.ylabel("Score")
plt.grid(True)
plt.savefig('/kaggle/working/fold5_rnn.png', dpi=300)
plt.show()

### Prepare data for K-Fold machine learning

In [None]:
# Trích đặc trưng toàn bộ dataset
X_features = extract_all_features_custom(X_ecg, X_eda, X_resp)
y_labels = y  # giữ nguyên

class_names = ['Baseline', 'Stress', 'Amusement']
n_classes = len(class_names)

# Kiểm tra chiều
assert X_features.shape[0] == y_labels.shape[0] == sid_clean.shape[0]
unique_sid = np.unique(sid_clean)
assert len(unique_sid) % 5 == 0, "Số subject phải chia hết cho 5 để chia đều các fold"

kf = KFold(n_splits=5, shuffle=True, random_state=42)


### SVM 5-fold

In [None]:
svm_results = []

for fold_idx, (train_sid_idx, test_sid_idx) in enumerate(kf.split(unique_sid)):
    train_sid = unique_sid[train_sid_idx]
    test_sid = unique_sid[test_sid_idx]

    train_mask = np.isin(sid_clean, train_sid)
    test_mask = np.isin(sid_clean, test_sid)

    X_train, y_train = X_features[train_mask], y_labels[train_mask]
    X_test, y_test = X_features[test_mask], y_labels[test_mask]

    # Chuẩn hóa
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)

    # Huấn luyện SVM
    clf = SVC(kernel='rbf', C=1.0, probability=True, random_state=42)
    clf.fit(X_train_scaled, y_train)

    # Dự đoán
    y_pred = clf.predict(X_test_scaled)
    y_proba = clf.predict_proba(X_test_scaled)
    y_test_bin = label_binarize(y_test, classes=range(n_classes))

    # Đánh giá
    acc = accuracy_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred, average='macro')
    auc = roc_auc_score(y_test_bin, y_proba, average='macro', multi_class='ovr')

    svm_results.append([f"Fold-{fold_idx+1}", acc, f1, auc])

In [None]:
svm_results_df = pd.DataFrame(svm_results, columns=["Fold", "Accuracy", "F1_macro", "AUC"])

print(" Kết quả trung bình ± độ lệch chuẩn:")
for col in ["Accuracy", "F1_macro", "AUC"]:
    mean = svm_results_df[col].mean() * 100
    std = svm_results_df[col].std() * 100
    print(f"{col}: {mean:.2f} ± {std:.2f}%")

In [None]:
print("\n Kết quả từng fold:")
print(svm_results_df.to_string(index=False))

In [None]:
# Vẽ boxplot
plt.figure(figsize=(10, 5))
sns.boxplot(data=svm_results_df[["Accuracy", "F1_macro", "AUC"]])
plt.title("5-Fold Cross-Subject Performance (SVM)")
plt.ylabel("Score")
plt.grid(True)
plt.tight_layout()
plt.savefig('/kaggle/working/kfold_svm_boxplot.png', dpi=300)
plt.show()

### Random Forest 5-fold

In [None]:
rf_results = []

# === Vòng lặp K-Fold ===
for fold_idx, (train_sid_idx, test_sid_idx) in enumerate(kf.split(unique_sid)):
    train_sid = unique_sid[train_sid_idx]
    test_sid = unique_sid[test_sid_idx]

    train_mask = np.isin(sid_clean, train_sid)
    test_mask = np.isin(sid_clean, test_sid)

    X_train, y_train = X_features[train_mask], y[train_mask]
    X_test, y_test = X_features[test_mask], y[test_mask]

    # Chuẩn hóa
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)

    # Huấn luyện mô hình Random Forest
    rf_clf = RandomForestClassifier(n_estimators=100, random_state=42)
    rf_clf.fit(X_train_scaled, y_train)

    # Dự đoán
    y_pred_rf = rf_clf.predict(X_test_scaled)
    y_proba_rf = rf_clf.predict_proba(X_test_scaled)
    y_test_bin = label_binarize(y_test, classes=range(n_classes))

    # Đánh giá
    acc = accuracy_score(y_test, y_pred_rf)
    f1 = f1_score(y_test, y_pred_rf, average='macro')
    auc = roc_auc_score(y_test_bin, y_proba_rf, average='macro', multi_class='ovr')

    rf_results.append([f"Fold-{fold_idx+1}", acc, f1, auc])

In [None]:
rf_results_df = pd.DataFrame(rf_results, columns=["Fold", "Accuracy", "F1_macro", "AUC"])

print(" Kết quả trung bình ± độ lệch chuẩn (Random Forest):")
for col in ["Accuracy", "F1_macro", "AUC"]:
    mean = rf_results_df[col].mean() * 100
    std = rf_results_df[col].std() * 100
    print(f"{col}: {mean:.2f} ± {std:.2f}%")

In [None]:
print("\n Kết quả từng fold:")
print(rf_results_df.to_string(index=False))

In [None]:
# Vẽ boxplot
plt.figure(figsize=(10, 5))
sns.boxplot(data=results_rf_df[["Accuracy", "F1_macro", "AUC"]])
plt.title("5-Fold Cross-Subject Performance (Random Forest)")
plt.ylabel("Score")
plt.grid(True)
plt.tight_layout()
plt.savefig('/kaggle/working/kfold_randomforest_boxplot.png', dpi=300)
plt.show()