In [None]:
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from sklearn import metrics
from tensorflow import keras
from tensorflow.keras import models, layers
from tensorflow.keras.layers import Dense, Dropout, Input, LayerNormalization, MultiHeadAttention, GlobalAveragePooling1D
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.utils import to_categorical
from tensorflow.keras import optimizers
from dataset import load_dataset, load_labels, split_data, format_labels
import variables as v
import matplotlib.pyplot as plt


# Transformer Model for EEG Stress Detection

This notebook implements a Transformer neural network for classifying EEG stress levels.
Transformers use self-attention mechanisms to capture long-range dependencies in time-series data.


In [None]:
# Configuration - using filtered_data
data_type = "wt_filtered"
test_type = "Arithmetic"
print(f"Data type: {data_type}")
print(f"Test type: {test_type}")


## 1. Load and Prepare Data


In [None]:
# Load dataset
dataset_ = load_dataset(data_type=data_type, test_type=test_type)
dataset = split_data(dataset_, v.SFREQ)

print(f"Dataset shape after splitting: {dataset.shape}")
print(f"Shape breakdown: (trials={dataset.shape[0]}, epochs={dataset.shape[1]}, channels={dataset.shape[2]}, timepoints={dataset.shape[3]})")


In [None]:
# Load labels
label_ = load_labels()
label = format_labels(label_, test_type=test_type, epochs=dataset.shape[1])

print(f"Label shape: {label.shape}")
print(f"Label distribution: {np.bincount(label.astype(int))}")


## 2. Reshape Data for Transformer

Transformers require data in the format: (samples, sequence_length, features)
- Each epoch (1 second) will be one sample
- Sequence length = 128 (timepoints per second)
- Features = 32 (EEG channels)


In [None]:
# Reshape data for Transformer: (trials, epochs, channels, timepoints) -> (samples, sequence_length, features)
n_trials, n_epochs, n_channels, n_timepoints = dataset.shape

# Reshape to (n_trials * n_epochs, n_timepoints, n_channels)
X = dataset.reshape(n_trials * n_epochs, n_timepoints, n_channels)
y = label.reshape(-1)

print(f"Reshaped X shape: {X.shape} (samples, sequence_length, features)")
print(f"Reshaped y shape: {y.shape}")
print(f"Number of samples: {X.shape[0]}")
print(f"Sequence length: {X.shape[1]}")
print(f"Features per timestep: {X.shape[2]}")


## 3. Split Data into Train, Validation, and Test Sets


In [None]:
# Split into train/test (80/20)
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

# Split train into train/validation (75/25 of train set)
X_train, X_val, y_train, y_val = train_test_split(
    X_train, y_train, test_size=0.25, random_state=42, stratify=y_train
)

print(f"Training set: {X_train.shape[0]} samples")
print(f"Validation set: {X_val.shape[0]} samples")
print(f"Test set: {X_test.shape[0]} samples")


## 4. Normalize Data


In [None]:
# Normalize data: fit on training set, transform all sets
n_samples_train, n_seq_len, n_features = X_train.shape
X_train_reshaped = X_train.reshape(-1, n_features)
X_val_reshaped = X_val.reshape(-1, n_features)
X_test_reshaped = X_test.reshape(-1, n_features)

# Fit scaler on training data
scaler = MinMaxScaler()
scaler.fit(X_train_reshaped)

# Transform all sets
X_train_scaled = scaler.transform(X_train_reshaped).reshape(X_train.shape)
X_val_scaled = scaler.transform(X_val_reshaped).reshape(X_val.shape)
X_test_scaled = scaler.transform(X_test_reshaped).reshape(X_test.shape)

print(f"Scaled training data shape: {X_train_scaled.shape}")
print(f"Data range after scaling: [{X_train_scaled.min():.3f}, {X_train_scaled.max():.3f}]")


In [None]:
# Convert labels to categorical for binary classification
y_train_cat = to_categorical(y_train, num_classes=v.N_CLASSES)
y_val_cat = to_categorical(y_val, num_classes=v.N_CLASSES)
y_test_cat = to_categorical(y_test, num_classes=v.N_CLASSES)

print(f"Categorical labels shape: {y_train_cat.shape}")


## 5. Build Transformer Model


In [None]:
# Transformer Encoder Block
def transformer_encoder(inputs, head_size, num_heads, ff_dim, dropout=0.3):
    """
    Creates a transformer encoder block with multi-head self-attention.
    """
    # Multi-head self-attention
    attention_output = MultiHeadAttention(
        key_dim=head_size, 
        num_heads=num_heads, 
        dropout=dropout
    )(inputs, inputs)
    attention_output = Dropout(dropout)(attention_output)
    x = LayerNormalization(epsilon=1e-6)(inputs + attention_output)
    
    # Feed-forward network
    ffn_output = Dense(ff_dim, activation="relu")(x)
    ffn_output = Dense(inputs.shape[-1])(ffn_output)
    ffn_output = Dropout(dropout)(ffn_output)
    outputs = LayerNormalization(epsilon=1e-6)(x + ffn_output)
    
    return outputs


In [None]:
# Clear any existing models
keras.backend.clear_session()

# Model parameters
sequence_length = X_train_scaled.shape[1]
num_features = X_train_scaled.shape[2]
head_size = 64
num_heads = 4
ff_dim = 128
num_transformer_blocks = 2
mlp_units = [64, 32]
mlp_dropout = 0.4
dropout = 0.3

# Input layer
inputs = Input(shape=(sequence_length, num_features))

# Transformer blocks
x = inputs
for _ in range(num_transformer_blocks):
    x = transformer_encoder(x, head_size, num_heads, ff_dim, dropout)

# Global average pooling
x = GlobalAveragePooling1D()(x)

# MLP head
for dim in mlp_units:
    x = Dense(dim, activation="relu")(x)
    x = Dropout(mlp_dropout)(x)

# Output layer
outputs = Dense(v.N_CLASSES, activation="softmax", name="output")(x)

model = models.Model(inputs, outputs)

# Compile model
model.compile(
    optimizer=optimizers.Adam(learning_rate=0.001),
    loss='binary_crossentropy',
    metrics=['accuracy']
)

# Display model architecture
model.summary()


## 6. Train the Model


In [None]:
# Define callbacks
early_stopping = EarlyStopping(
    monitor='val_accuracy',
    patience=15,
    restore_best_weights=True,
    verbose=1
)

reduce_lr = ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.5,
    patience=5,
    min_lr=1e-6,
    verbose=1
)

# Train the model
history = model.fit(
    X_train_scaled, y_train_cat,
    batch_size=32,
    epochs=100,
    validation_data=(X_val_scaled, y_val_cat),
    callbacks=[early_stopping, reduce_lr],
    verbose=1
)


## 7. Evaluate Model Performance


In [None]:
# Evaluate on test set
test_loss, test_accuracy = model.evaluate(X_test_scaled, y_test_cat, verbose=0)
print("=" * 60)
print("TEST SET RESULTS")
print("=" * 60)
print(f"Test Loss: {test_loss:.4f}")
print(f"Test Accuracy: {test_accuracy:.4f} ({test_accuracy*100:.2f}%)")
print("=" * 60)


In [None]:
# Make predictions
y_pred_proba = model.predict(X_test_scaled, verbose=0)
y_pred = np.argmax(y_pred_proba, axis=1)
y_true = np.argmax(y_test_cat, axis=1)

# Classification report
print("\n" + "=" * 60)
print("DETAILED CLASSIFICATION REPORT")
print("=" * 60)
print(metrics.classification_report(y_true, y_pred, 
                                    target_names=['Low Stress', 'High Stress']))
print("\n" + "=" * 60)
print("CONFUSION MATRIX")
print("=" * 60)
cm = metrics.confusion_matrix(y_true, y_pred)
print(cm)
print("\nConfusion Matrix Breakdown:")
print(f"True Negatives (Low Stress correctly predicted):  {cm[0][0]}")
print(f"False Positives (Low Stress predicted as High):  {cm[0][1]}")
print(f"False Negatives (High Stress predicted as Low):  {cm[1][0]}")
print(f"True Positives (High Stress correctly predicted): {cm[1][1]}")
print("=" * 60)


## 8. Plot Training History


## 9. Save Model and Results


In [None]:
# Calculate best validation accuracy if not already calculated
if 'best_val_acc' not in locals():
    best_val_acc = max(history.history['val_accuracy'])
    best_val_epoch = np.argmax(history.history['val_accuracy']) + 1

# Save the trained model
model.save('transformer_stress_model.h5')
print("Model saved as 'transformer_stress_model.h5'")

# Save training history
import pickle
with open('transformer_training_history.pkl', 'wb') as f:
    pickle.dump(history.history, f)
print("Training history saved as 'transformer_training_history.pkl'")

# Save scaler for future use
import joblib
joblib.dump(scaler, 'transformer_scaler.pkl')
print("Scaler saved as 'transformer_scaler.pkl'")

# Save results summary
results_summary = {
    'test_accuracy': float(test_accuracy),
    'test_loss': float(test_loss),
    'best_val_accuracy': float(best_val_acc),
    'best_val_epoch': int(best_val_epoch),
    'confusion_matrix': cm.tolist(),
    'classification_report': metrics.classification_report(y_true, y_pred, 
                                                          target_names=['Low Stress', 'High Stress'], 
                                                          output_dict=True)
}

import json
with open('transformer_results.json', 'w') as f:
    json.dump(results_summary, f, indent=4)
print("Results summary saved as 'transformer_results.json'")
print("\nAll files saved successfully!")


In [None]:
# Plot training history
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))

# Plot accuracy
ax1.plot(history.history['accuracy'], label='Training Accuracy')
ax1.plot(history.history['val_accuracy'], label='Validation Accuracy')
ax1.set_title('Model Accuracy')
ax1.set_xlabel('Epoch')
ax1.set_ylabel('Accuracy')
ax1.legend()
ax1.grid(True)

# Plot loss
ax2.plot(history.history['loss'], label='Training Loss')
ax2.plot(history.history['val_loss'], label='Validation Loss')
ax2.set_title('Model Loss')
ax2.set_xlabel('Epoch')
ax2.set_ylabel('Loss')
ax2.legend()
ax2.grid(True)

plt.tight_layout()
plt.show()

# Print best validation accuracy
best_val_acc = max(history.history['val_accuracy'])
best_val_epoch = np.argmax(history.history['val_accuracy']) + 1
print(f"\nBest Validation Accuracy: {best_val_acc:.4f} ({best_val_acc*100:.2f}%) at epoch {best_val_epoch}")
