### Imports & GPU Configuration

In [1]:
# ==============================================================================
# 0. Importing Dependencies
# ==============================================================================
import os
import glob
import numpy as np
import pandas as pd
import tensorflow as tf
import pywt
import datetime
import json
import matplotlib.pyplot as plt
import seaborn as sns
from typing import List, Tuple, Dict, Optional

# Keras and TensorFlow Layers
from tensorflow.keras.models import Model
from tensorflow.keras.layers import (
    Input, Conv1D, BatchNormalization, MaxPooling1D, Bidirectional, LSTM,
    Dropout, Dense, concatenate, Layer, GlobalAveragePooling1D
)
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau, TensorBoard

# Scikit-learn for data splitting and class weights
try:
    from sklearn.model_selection import train_test_split
    from sklearn.utils.class_weight import compute_class_weight
    from sklearn.metrics import classification_report, confusion_matrix
    _HAVE_SKLEARN = True
except ImportError:
    _HAVE_SKLEARN = False

# ==============================================================================
# 1. GPU Memory Configuration
# ==============================================================================
# Configure TensorFlow to grow GPU memory usage as needed.
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        print(f"GPU memory growth configured for {len(gpus)} device(s).")
    except RuntimeError as e:
        print(e)

2025-09-22 05:56:16.069080: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


GPU memory growth configured for 1 device(s).


### Global Constants & Configuration

In [2]:
# ==============================================================================
# 2. Configuration and Constants
# ==============================================================================
CLASS_NAMES = [
    'happy', 'sad', 'surprised', 'satisfied',
    'protected', 'frightened', 'angry', 'unconcerned'
]

FOLDER_TO_CLASS = {
    'Happy': 'happy',
    'Sad': 'sad',
    'Surprise': 'surprised',
    'Satisfied': 'satisfied',
    'Protected': 'protected',
    'Frightened': 'frightened',
    'Angry': 'angry',
    'Unconcerned': 'unconcerned'
}

# We will focus on the 14 core EEG channels for a more robust signal
EEG_CHANNELS = [
    'EEG.AF3', 'EEG.F7', 'EEG.F3', 'EEG.FC5', 'EEG.T7', 'EEG.P7', 'EEG.O1',
    'EEG.O2', 'EEG.P8', 'EEG.T8', 'EEG.FC6', 'EEG.F4', 'EEG.F8', 'EEG.AF4'
]

### Data Preprocessing Helper Functions

In [3]:
# ==============================================================================
# 3. Data Loading and Preprocessing Functions
# ==============================================================================

def wavelet_denoise(data, wavelet='db4', level=4):
    """Applies wavelet denoising to a 1D signal."""
    coeffs = pywt.wavedec(data, wavelet, level=level)
    sigma = np.median(np.abs(coeffs[-1] - np.median(coeffs[-1]))) / 0.6745
    threshold = sigma * np.sqrt(2 * np.log(len(data)))
    new_coeffs = coeffs.copy()
    for i in range(1, len(coeffs)):
        new_coeffs[i] = pywt.threshold(coeffs[i], value=threshold, mode='soft')
    reconstructed_signal = pywt.waverec(new_coeffs, wavelet)
    return reconstructed_signal[:len(data)]

def _read_and_denoise_csv(filepath: str) -> np.ndarray:
    """
    Reads a CSV file, applies wavelet denoising to the specified EEG channels,
    and returns a NumPy array containing only those channels.
    """
    df = pd.read_csv(filepath, skiprows=1, low_memory=False)
    denoised_data = {}

    for channel in EEG_CHANNELS:
        if channel in df.columns:
            signal = df[channel].dropna().values
            if np.var(signal) > 0:
                denoised_signal = wavelet_denoise(signal)
                denoised_data[channel] = denoised_signal
            else:
                denoised_data[channel] = signal
    
    max_len = max(len(v) for v in denoised_data.values()) if denoised_data else 0
    for channel in denoised_data:
        if len(denoised_data[channel]) < max_len:
            padding = np.zeros(max_len - len(denoised_data[channel]))
            denoised_data[channel] = np.concatenate([denoised_data[channel], padding])
            
    arr = pd.DataFrame(denoised_data).values.astype(np.float32)
    return arr

def _ensure_shape_and_pad(raw: np.ndarray, time_steps: int, channels: int) -> np.ndarray:
    """
    Ensure the data has the correct 2D shape (time_steps, channels) by padding or truncating.
    """
    if raw.shape[0] > time_steps:
        return raw[:time_steps, :]
    elif raw.shape[0] < time_steps:
        padding = np.zeros((time_steps - raw.shape[0], channels), dtype=raw.dtype)
        return np.concatenate([raw, padding], axis=0)
    return raw

def _normalize_per_sample(sample: np.ndarray) -> np.ndarray:
    """Normalize each sample by subtracting mean and dividing by standard deviation."""
    mean = sample.mean(axis=0, keepdims=True)
    std = sample.std(axis=0, keepdims=True)
    std[std < 1e-8] = 1.0
    return (sample - mean) / std

### Main Dataset Loading Function

In [4]:
# ==============================================================================
# 4. Main Dataset Loading Function
# ==============================================================================

def load_eeg_dataset(
    data_dir: str, time_steps: int, channels: int,
    stressed_classes: Optional[List[str]] = None, test_size: float = 0.15,
    val_size: float = 0.15, random_state: int = 42, batch_size: int = 4
) -> Tuple[Dict[str, tf.data.Dataset], Dict]:
    """Load EEG dataset with sample weights included directly for the Conv1D model."""
    if not _HAVE_SKLEARN:
        raise ImportError("Scikit-learn is required. Please run 'pip install scikit-learn'.")

    if stressed_classes is None: stressed_classes = ['frightened', 'angry']
    
    files, labels = [], []
    for folder, cls in FOLDER_TO_CLASS.items():
        cls_folder = os.path.join(data_dir, folder)
        if os.path.isdir(cls_folder):
            found = glob.glob(os.path.join(cls_folder, "*.csv"))
            files.extend(found)
            labels.extend([cls] * len(found))

    if not files: raise ValueError(f"No CSV files found in subdirectories of {data_dir}.")

    X_list, y_multi_idx, y_binary = [], [], []
    for fpath, cls in zip(files, labels):
        raw = _read_and_denoise_csv(fpath)
        if raw.shape[1] != channels:
            temp = np.zeros((raw.shape[0], channels), dtype=raw.dtype)
            min_cols = min(raw.shape[1], channels)
            temp[:, :min_cols] = raw[:, :min_cols]
            raw = temp
        sample = _ensure_shape_and_pad(raw, time_steps, channels)
        sample = _normalize_per_sample(sample)
        X_list.append(sample.astype(np.float32))
        y_multi_idx.append(CLASS_NAMES.index(cls))
        y_binary.append(1 if cls in stressed_classes else 0)

    X = np.stack(X_list, axis=0)
    y_multi_idx = np.array(y_multi_idx, dtype=np.int32)
    y_binary = np.array(y_binary, dtype=np.float32)
    y_multi_onehot = tf.keras.utils.to_categorical(y_multi_idx, num_classes=len(CLASS_NAMES))
    
    weights = compute_class_weight('balanced', classes=np.arange(len(CLASS_NAMES)), y=y_multi_idx)
    class_weights = {i: float(w) for i, w in enumerate(weights)}
    
    emotion_sample_weights = np.array([class_weights[label] for label in y_multi_idx], dtype=np.float32)
    stress_sample_weights = np.ones_like(y_binary, dtype=np.float32)
    
    indices = np.arange(len(X))
    train_indices, temp_indices = train_test_split(indices, test_size=(test_size + val_size), random_state=random_state, stratify=y_multi_idx)
    val_indices, test_indices = train_test_split(temp_indices, test_size=(test_size / (test_size + val_size)), random_state=random_state, stratify=y_multi_idx[temp_indices])

    def make_ds(inds):
        x = X[inds]
        y = {'stressed_not_stressed_output': y_binary[inds], 'emotion_class_output': y_multi_onehot[inds]}
        sw = {'stressed_not_stressed_output': stress_sample_weights[inds], 'emotion_class_output': emotion_sample_weights[inds]}
        return tf.data.Dataset.from_tensor_slices((x, y, sw)).shuffle(len(inds), seed=random_state).batch(batch_size).prefetch(tf.data.AUTOTUNE)

    datasets = {'train': make_ds(train_indices), 'val': make_ds(val_indices), 'test': make_ds(test_indices)}
    meta = {'counts': {cls: labels.count(cls) for cls in CLASS_NAMES}, 'total_samples': len(X), 'class_weights': class_weights, 'index_to_class': {i: c for i, c in enumerate(CLASS_NAMES)}}
    return datasets, meta

### Model (Conv1D-LSTM)

In [5]:
# ==============================================================================
# 5. A Better Model Architecture (Conv1D-LSTM)
# ==============================================================================
class Attention(Layer):
    def __init__(self, **kwargs):
        super(Attention, self).__init__(**kwargs)
    def build(self, input_shape):
        self.W = self.add_weight(name="att_weight", shape=(input_shape[-1], 1), initializer="normal")
        self.b = self.add_weight(name="att_bias", shape=(input_shape[1], 1), initializer="zeros")
        super(Attention, self).build(input_shape)
    def call(self, x):
        et = tf.keras.backend.squeeze(tf.keras.backend.tanh(tf.keras.backend.dot(x, self.W) + self.b), axis=-1)
        at = tf.keras.backend.softmax(et)
        at = tf.keras.backend.expand_dims(at, axis=-1)
        output = x * at
        return tf.keras.backend.sum(output, axis=1)

def create_eeg_model(input_shape):
    """Create a Conv1D-LSTM model designed for multi-channel time-series."""
    input_layer = Input(shape=input_shape)

    # Block 1: Temporal Feature Extraction
    x = Conv1D(filters=32, kernel_size=3, activation='relu', padding='same')(input_layer)
    x = BatchNormalization()(x)
    x = Conv1D(filters=32, kernel_size=3, activation='relu', padding='same')(x)
    x = BatchNormalization()(x)
    x = MaxPooling1D(pool_size=2)(x)
    x = Dropout(0.25)(x)

    # Block 2: Deeper Temporal Features
    x = Conv1D(filters=64, kernel_size=3, activation='relu', padding='same')(x)
    x = BatchNormalization()(x)
    x = Conv1D(filters=64, kernel_size=3, activation='relu', padding='same')(x)
    x = BatchNormalization()(x)
    x = MaxPooling1D(pool_size=2)(x)
    x = Dropout(0.25)(x)

    # Block 3: Recurrent Layers for sequence modeling
    x = Bidirectional(LSTM(64, return_sequences=True))(x)
    x = Dropout(0.3)(x)
    x = Bidirectional(LSTM(32, return_sequences=True))(x)
    x = Dropout(0.3)(x)
    
    # Attention and Output heads
    attention_output = Attention()(x)
    main_path = GlobalAveragePooling1D()(x)
    main_path = concatenate([main_path, attention_output])
    
    binary_head = Dense(32, activation='relu')(main_path)
    binary_head = Dropout(0.5)(binary_head)
    binary_head_output = Dense(1, activation='sigmoid', name='stressed_not_stressed_output')(binary_head)

    multiclass_head = Dense(32, activation='relu')(main_path)
    multiclass_head = Dropout(0.5)(multiclass_head)
    multiclass_head_output = Dense(len(CLASS_NAMES), activation='softmax', name='emotion_class_output')(multiclass_head)

    model = Model(
        inputs=input_layer,
        outputs={
            "stressed_not_stressed_output": binary_head_output,
            "emotion_class_output": multiclass_head_output
        }
    )
    return model

### Main Execution Block

In [6]:
# ==============================================================================
# 6. Main Execution Block
# ==============================================================================
if __name__ == '__main__':
    # --- Path and Model Parameters ---
    dataset_path = "/media/kd/New Volume/Github/EEG-Emotion-Detection/dataset"
    
    INPUT_TIME_STEPS = 1024 
    INPUT_CHANNELS = len(EEG_CHANNELS)
    INPUT_2D_SHAPE = (INPUT_TIME_STEPS, INPUT_CHANNELS)
    BATCH_SIZE = 4 

    # --- Load Data ---
    print("--- Loading and Preprocessing Dataset ---")
    datasets, meta = load_eeg_dataset(
        data_dir=dataset_path,
        time_steps=INPUT_TIME_STEPS,
        channels=INPUT_CHANNELS,
        batch_size=BATCH_SIZE
    )

    # --- Build and Compile Model ---
    print("\n--- Building Model ---")
    model = create_eeg_model(INPUT_2D_SHAPE)
    optimizer = Adam(learning_rate=5e-4)

    model.compile(
        optimizer=optimizer,
        loss={'stressed_not_stressed_output': 'binary_crossentropy', 'emotion_class_output': 'categorical_crossentropy'},
        loss_weights={'stressed_not_stressed_output': 0.5, 'emotion_class_output': 1.0},
        metrics={'stressed_not_stressed_output': 'accuracy', 'emotion_class_output': 'accuracy'}
    )
    model.summary()

    # --- Define Callbacks ---
    os.makedirs('models', exist_ok=True)
    os.makedirs('logs', exist_ok=True)
    timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
    
    callbacks = [
        ModelCheckpoint(
            filepath=f'models/best_model_{timestamp}.keras',
            monitor='val_emotion_class_output_accuracy', 
            save_best_only=True, 
            mode='max',
            verbose=1
        ),
        EarlyStopping(
            monitor='val_emotion_class_output_accuracy', 
            patience=100, 
            restore_best_weights=True, 
            mode='max',
            verbose=1
        ),
        ReduceLROnPlateau(
            monitor='val_emotion_class_output_accuracy', 
            factor=0.5, 
            patience=5, 
            min_lr=1e-7, 
            mode='max',
            verbose=1
        ),
        TensorBoard(log_dir=f'logs/fit/{timestamp}', histogram_freq=1)
    ]

    # --- Train the Model ---
    print("\n--- Starting Model Training ---")
    history = model.fit(
        datasets['train'],
        validation_data=datasets['val'],
        epochs=100,
        callbacks=callbacks,
        verbose=1
    )

    # --- Evaluate and Save ---
    print("\n--- Evaluating Model on Test Set ---")
    if datasets['test']:
        results = model.evaluate(datasets['test'], verbose=1)
        print("\nTest Results:")
        for name, value in zip(model.metrics_names, results):
            print(f"  {name}: {value:.4f}")

    model.save(f'models/final_model_{timestamp}.keras')
    with open(f'models/model_metadata_{timestamp}.json', 'w') as f:
        json.dump(meta, f, indent=4)
    print(f"\nFinal model and metadata saved with timestamp: {timestamp}")

--- Loading and Preprocessing Dataset ---


I0000 00:00:1758500795.364729  109211 gpu_device.cc:2020] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 1133 MB memory:  -> device: 0, name: NVIDIA GeForce RTX 3070 Ti, pci bus id: 0000:01:00.0, compute capability: 8.6



--- Building Model ---



--- Starting Model Training ---
Epoch 1/100


2025-09-22 05:56:40.868555: I external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:473] Loaded cuDNN version 91300


[1m42/42[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 31ms/step - emotion_class_output_accuracy: 0.1776 - emotion_class_output_loss: 2.0833 - loss: 2.4180 - stressed_not_stressed_output_accuracy: 0.6140 - stressed_not_stressed_output_loss: 0.6694
Epoch 1: val_emotion_class_output_accuracy improved from None to 0.11111, saving model to models/best_model_20250922-055636.keras
[1m42/42[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 53ms/step - emotion_class_output_accuracy: 0.1325 - emotion_class_output_loss: 2.0977 - loss: 2.4197 - stressed_not_stressed_output_accuracy: 0.6807 - stressed_not_stressed_output_loss: 0.6418 - val_emotion_class_output_accuracy: 0.1111 - val_emotion_class_output_loss: 2.0814 - val_loss: 2.4368 - val_stressed_not_stressed_output_accuracy: 0.2500 - val_stressed_not_stressed_output_loss: 0.7108 - learning_rate: 5.0000e-04
Epoch 2/100
[1m41/42[0m [32m━━━━━━━━━━━━━━━━━━━[0m[37m━[0m [1m0s[0m 27ms/step - emotion_class_output_accuracy: 0.1401