# Imports

In [17]:
from mpl_toolkits.mplot3d import Axes3D
from sklearn.preprocessing import StandardScaler
from scipy.io import loadmat
import matplotlib.pyplot as plt 
import numpy as np 
import os 
import pandas as pd
from scipy.stats import mode  # For consensus calculation

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import MaxPooling2D, GlobalAveragePooling2D
from tensorflow.keras.layers import Bidirectional

from tensorflow.keras.layers import Conv2D, AveragePooling2D, Dropout, TimeDistributed, Flatten, LSTM, Dense
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import EarlyStopping
from sklearn.model_selection import train_test_split
from tensorflow.keras.metrics import Precision, Recall, AUC
import tensorflow.keras.backend as K

from tensorflow.keras.layers import BatchNormalization
from sklearn.metrics import confusion_matrix, classification_report
import seaborn as sns
from scipy.signal import butter, lfilter
import numpy as np




# Data labeling

In [18]:
columns = [
    'ED_COUNTER',    'ED_INTERPOLATED',    'ED_RAW_CQ',    'ED_AF3',    'ED_F7',
    'ED_F3',    'ED_FC5',    'ED_T7',    'ED_P7',    'ED_O1',
    'ED_O2',    'ED_P8',    'ED_T8',    'ED_FC6',    'ED_F4',
    'ED_F8',    'ED_AF4',    'ED_GYROX',    'ED_GYROY',    'ED_TIMESTAMP',
    'ED_ES_TIMESTAMP',    'ED_FUNC_ID',    'ED_FUNC_VALUE',    'ED_MARKER',    'ED_SYNC_SIGNAL'
]

In [19]:
FOCUSED_ID = 0
UNFOCUSED_ID = 1
DROWSY_ID = 2

def get_state(timestamp):
    if timestamp <= 10*128*60:
        return FOCUSED_ID
    elif timestamp > 20*128*60:
        return UNFOCUSED_ID
    else:
        return DROWSY_ID

# Data preprocessing

In [20]:
# Bandpass filter function
def bandpass_filter(data, low_freq, high_freq, fs, order=4):
    nyquist = 0.5 * fs
    low = low_freq / nyquist
    high = high_freq / nyquist
    b, a = butter(order, [low, high], btype='band')
    return lfilter(b, a, data)

# Brainwave frequency ranges
brainwave_ranges = {
    "Delta": (0.5, 4),
    "Theta": (4, 8),
    "Alpha": (8, 13),
    "Beta": (13, 30)
}

# Parameters
features = []
delta_features = []
theta_features = []
alpha_features = []
beta_features = []
labels = []
SAMPLE_LENGTH_SECOND = 4
FREQUENCY_HZ = 128
SAMPLE_LENGTH_HZ = FREQUENCY_HZ * SAMPLE_LENGTH_SECOND
scaler = StandardScaler(with_mean=True, with_std=True)

# File extraction loop
for i in range(1, 35):
    print(f"Extracting file {i}")
    mat_data = loadmat(f'/kaggle/input/eeg-data-for-mental-attention-state-detection/EEG Data/eeg_record{i}.mat')
    data = mat_data['o'][0][0]['data']
    eeg_df = pd.DataFrame(data, columns=columns)
    eeg_df.reset_index(inplace=True)
    eeg_df.rename(columns={'index': 'timestamp'}, inplace=True)
    eeg_df['state'] = eeg_df['timestamp'].apply(get_state)

    # Extract original EEG features
    feature = eeg_df.iloc[:, 4:18].values  # Columns 4 to 17 (0-indexed)
    label = eeg_df['state'].values

    # Scale the original EEG features
    feature = scaler.fit_transform(feature)

    # Apply bandpass filters for each brainwave type
    brainwave_features = {}
    for wave, (low, high) in brainwave_ranges.items():
        filtered = np.apply_along_axis(
            bandpass_filter, 0, feature, low, high, FREQUENCY_HZ
        )
        brainwave_features[wave] = filtered  # Do not scale these

    # Reshape for sample segments
    num_samples = len(feature) // SAMPLE_LENGTH_HZ
    feature = feature[:num_samples * SAMPLE_LENGTH_HZ]
    label = label[:num_samples * SAMPLE_LENGTH_HZ]

    # Original feature
    feature = feature.reshape(num_samples, SAMPLE_LENGTH_HZ, 14, 1)
    label = label.reshape(num_samples, SAMPLE_LENGTH_HZ)
    consensus_labels = mode(label, axis=1)[0].flatten()

    # Append original and brainwave-specific features
    features.append(feature)
    labels.append(consensus_labels)

    for wave in brainwave_ranges.keys():
        brainwave_feature = brainwave_features[wave][:num_samples * SAMPLE_LENGTH_HZ]
        brainwave_feature = brainwave_feature.reshape(num_samples, SAMPLE_LENGTH_HZ, 14, 1)
        if wave == "Delta":
            delta_features.append(brainwave_feature)
        elif wave == "Theta":
            theta_features.append(brainwave_feature)
        elif wave == "Alpha":
            alpha_features.append(brainwave_feature)
        elif wave == "Beta":
            beta_features.append(brainwave_feature)

# Combine all features and labels
features = np.vstack(features)
delta_features = np.vstack(delta_features)
theta_features = np.vstack(theta_features)
alpha_features = np.vstack(alpha_features)
beta_features = np.vstack(beta_features)
labels = np.concatenate(labels)

# Print final shapes
print(f"Delta Features Shape: {delta_features.shape}")
print(f"Theta Features Shape: {theta_features.shape}")
print(f"Alpha Features Shape: {alpha_features.shape}")
print(f"Beta Features Shape: {beta_features.shape}")
print(f"Final Labels Shape: {labels.shape}")


Extracting file 1
Extracting file 2
Extracting file 3
Extracting file 4
Extracting file 5
Extracting file 6
Extracting file 7
Extracting file 8
Extracting file 9
Extracting file 10
Extracting file 11
Extracting file 12
Extracting file 13
Extracting file 14
Extracting file 15
Extracting file 16
Extracting file 17
Extracting file 18
Extracting file 19
Extracting file 20
Extracting file 21
Extracting file 22
Extracting file 23
Extracting file 24
Extracting file 25
Extracting file 26
Extracting file 27
Extracting file 28
Extracting file 29
Extracting file 30
Extracting file 31
Extracting file 32
Extracting file 33
Extracting file 34
Delta Features Shape: (24418, 512, 14, 1)
Theta Features Shape: (24418, 512, 14, 1)
Alpha Features Shape: (24418, 512, 14, 1)
Beta Features Shape: (24418, 512, 14, 1)
Final Labels Shape: (24418,)


# Data classes distribution

In [21]:
import numpy as np

# Use numpy's unique and counts to count class samples
unique_classes, class_counts = np.unique(labels, return_counts=True)

# Display the counts
for cls, count in zip(unique_classes, class_counts):
    print(f"Class {cls}: {count} samples")


Class 0: 5100 samples
Class 1: 14218 samples
Class 2: 5100 samples


In [22]:
# Prepare the data
X = features  # Already shaped as (samples, 728, 14, 1)
y = labels    # Class labels (numeric)

del features
del labels
# One-hot encode the labels (3 classes)
y = to_categorical(y, num_classes=3)

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)


In [23]:
eeg_df

Unnamed: 0,timestamp,ED_COUNTER,ED_INTERPOLATED,ED_RAW_CQ,ED_AF3,ED_F7,ED_F3,ED_FC5,ED_T7,ED_P7,...,ED_AF4,ED_GYROX,ED_GYROY,ED_TIMESTAMP,ED_ES_TIMESTAMP,ED_FUNC_ID,ED_FUNC_VALUE,ED_MARKER,ED_SYNC_SIGNAL,state
0,0,120.0,0.0,0.0,4445.641026,3955.384615,5040.000000,3830.769231,4047.179487,4200.512821,...,4076.410256,1569.0,1715.0,242.409,0.031277,0.0,0.0,0.0,0.0,0
1,1,121.0,0.0,0.0,4447.179487,3960.512821,5047.692308,3831.282051,4050.256410,4208.205128,...,4086.666667,1570.0,1715.0,242.417,0.031277,0.0,0.0,0.0,0.0,0
2,2,122.0,0.0,0.0,4446.666667,3960.000000,5046.153846,3831.282051,4045.641026,4201.538462,...,4089.230769,1572.0,1714.0,242.426,0.031277,0.0,0.0,0.0,0.0,0
3,3,123.0,0.0,429.0,4446.666667,3954.358974,5036.923077,3830.769231,4044.102564,4189.743590,...,4076.923077,1574.0,1714.0,242.433,0.031277,0.0,0.0,0.0,0.0,0
4,4,124.0,0.0,0.0,4448.717949,3952.820513,5034.358974,3833.333333,4046.153846,4187.692308,...,4068.205128,1577.0,1713.0,242.440,0.031277,0.0,0.0,0.0,0.0,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
408971,408971,48.0,0.0,0.0,4449.230769,3960.000000,5003.076923,3832.820513,4048.205128,4270.769231,...,4272.820513,1574.0,1739.0,3440.381,3197.802734,0.0,0.0,0.0,0.0,1
408972,408972,49.0,0.0,0.0,4448.205128,3959.487179,5008.717949,3833.846154,4045.641026,4276.410256,...,4278.974359,1574.0,1740.0,3440.389,3197.802734,0.0,0.0,0.0,0.0,1
408973,408973,50.0,0.0,0.0,4445.641026,3960.000000,5014.871795,3831.794872,4046.666667,4273.333333,...,4275.897436,1575.0,1740.0,3440.397,3197.802734,0.0,0.0,0.0,0.0,1
408974,408974,51.0,0.0,0.0,4446.666667,3959.487179,5024.102564,3828.717949,4045.128205,4274.871795,...,4272.820513,1575.0,1740.0,3440.405,3197.802734,0.0,0.0,0.0,0.0,1


In [24]:
y

array([[1., 0., 0.],
       [1., 0., 0.],
       [1., 0., 0.],
       ...,
       [0., 1., 0.],
       [0., 1., 0.],
       [0., 1., 0.]])

In [25]:
X.shape

(24418, 512, 14, 1)

# Helper functions

In [26]:
def f1_score(precision, recall):
    return 2 * ((precision * recall) / (precision + recall))


In [27]:
# Evaluate the model
def evaluate_model(model):
    results = model.evaluate(X_test, y_test)
    test_loss, test_accuracy, test_precision, test_recall, test_auc = results
    test_f1 = f1_score(test_precision, test_recall)
    # Print results
    print(f"Test Loss: {test_loss:.4f}")
    print(f"Test Accuracy: {test_accuracy:.4f}")
    print(f"Test Precision: {test_precision:.4f}")
    print(f"Test Recall: {test_recall:.4f}")
    print(f"Test AUC: {test_auc:.4f}")
    print(f"Test F1: {test_f1:.4f}")


# Model

****13/04/2025

In [30]:
import tensorflow as tf
import gc

# Force garbage collection
gc.collect()

# Force TensorFlow to use CPU
with tf.device('/CPU:0'):
    # Simple CNN model for EEG
    from tensorflow.keras.models import Sequential
    from tensorflow.keras.layers import Dense, Dropout, Flatten, Conv2D, MaxPooling2D
    from tensorflow.keras.callbacks import EarlyStopping
    from tensorflow.keras.metrics import Precision, Recall, AUC

    model = Sequential([
        # First convolutional layer
        Conv2D(32, (1, 14), activation='relu', input_shape=(SAMPLE_LENGTH_HZ, 14, 1)),
        MaxPooling2D(pool_size=(4, 1)),
        Dropout(0.3),
        
        # Second convolutional layer
        Conv2D(64, (5, 1), activation='relu', padding='same'),
        MaxPooling2D(pool_size=(4, 1)),
        Dropout(0.4),
        
        # Classification layers
        Flatten(),
        Dense(128, activation='relu'),
        Dropout(0.5),
        Dense(3, activation='softmax')
    ])

    # Compile the model
    model.compile(
        optimizer='adam',
        loss='categorical_crossentropy',
        metrics=[
            'accuracy',
            Precision(name='precision'),
            Recall(name='recall'),
            AUC(name='auc')
        ]
    )

    # Define callbacks
    early_stopping = EarlyStopping(
        monitor='val_loss',
        patience=15,
        restore_best_weights=True
    )

    # Train on CPU
    history = model.fit(
        X_train, y_train,
        validation_data=(X_test, y_test),
        epochs=100,
        batch_size=32,  # Can use larger batch size on CPU
        callbacks=[early_stopping]
    )

    # Evaluate using your function
    evaluate_model(model)

Epoch 1/100


I0000 00:00:1744563261.911321     122 service.cc:145] XLA service 0x7e980c003010 initialized for platform Host (this does not guarantee that XLA will be used). Devices:
I0000 00:00:1744563261.911369     122 service.cc:153]   StreamExecutor device (0): Host, Default Version


[1m611/611[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m21s[0m 29ms/step - accuracy: 0.7074 - auc: 0.8658 - loss: 0.7146 - precision: 0.7843 - recall: 0.6079 - val_accuracy: 0.7662 - val_auc: 0.9220 - val_loss: 0.5463 - val_precision: 0.8436 - val_recall: 0.6536
Epoch 2/100
[1m611/611[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m16s[0m 26ms/step - accuracy: 0.7747 - auc: 0.9218 - loss: 0.5467 - precision: 0.8228 - recall: 0.7022 - val_accuracy: 0.7852 - val_auc: 0.9365 - val_loss: 0.5121 - val_precision: 0.8483 - val_recall: 0.7088
Epoch 3/100
[1m611/611[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m15s[0m 25ms/step - accuracy: 0.7991 - auc: 0.9369 - loss: 0.4830 - precision: 0.8331 - recall: 0.7460 - val_accuracy: 0.7807 - val_auc: 0.9300 - val_loss: 0.5294 - val_precision: 0.8324 - val_recall: 0.7230
Epoch 4/100
[1m611/611[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m16s[0m 26ms/step - accuracy: 0.8008 - auc: 0.9412 - loss: 0.4616 - precision: 0.8302 - recall: 0.7565 -

In [None]:
import gc

# Force garbage collection
gc.collect()

# Force TensorFlow to use CPU
with tf.device('/CPU:0'):
    # Simple CNN model for EEG
    from tensorflow.keras.models import Sequential
    from tensorflow.keras.layers import Dense, Dropout, Flatten, Conv2D, MaxPooling2D
    from tensorflow.keras.callbacks import EarlyStopping
    from tensorflow.keras.metrics import Precision, Recall, AUC

    model = Sequential([
        # First convolutional layer
        Conv2D(32, (1, 14), activation='relu', input_shape=(SAMPLE_LENGTH_HZ, 14, 1)),
        MaxPooling2D(pool_size=(4, 1)),
        Dropout(0.3),
        
        # Second convolutional layer
        Conv2D(64, (5, 1), activation='relu', padding='same'),
        MaxPooling2D(pool_size=(4, 1)),
        Dropout(0.4),
        
        # Classification layers
        Flatten(),
        Dense(128, activation='relu'),
        Dropout(0.5),
        Dense(3, activation='softmax')
    ])

    # Compile the model
    model.compile(
        optimizer='adam',
        loss='categorical_crossentropy',
        metrics=[
            'accuracy',
            Precision(name='precision'),
            Recall(name='recall'),
            AUC(name='auc')
        ]
    )

    # Define callbacks
    # early_stopping = EarlyStopping(
    #     monitor='val_loss',
    #     patience=15,
    #     restore_best_weights=True
    # )

    # Train on CPU
    history = model.fit(
        X_train, y_train,
        validation_data=(X_test, y_test),
        epochs=100,
        batch_size=32,  # Can use larger batch size on CPU
    )

    # Evaluate using your function
    evaluate_model(model)

Epoch 1/100
[1m611/611[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m21s[0m 30ms/step - accuracy: 0.7089 - auc: 0.8655 - loss: 0.7140 - precision: 0.7826 - recall: 0.6065 - val_accuracy: 0.7670 - val_auc: 0.9167 - val_loss: 0.6390 - val_precision: 0.8874 - val_recall: 0.5565
Epoch 2/100
[1m611/611[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m15s[0m 25ms/step - accuracy: 0.7611 - auc: 0.9167 - loss: 0.5665 - precision: 0.8226 - recall: 0.6788 - val_accuracy: 0.7928 - val_auc: 0.9380 - val_loss: 0.4873 - val_precision: 0.8310 - val_recall: 0.7369
Epoch 3/100
[1m611/611[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m16s[0m 26ms/step - accuracy: 0.7917 - auc: 0.9333 - loss: 0.4978 - precision: 0.8283 - recall: 0.7365 - val_accuracy: 0.8127 - val_auc: 0.9475 - val_loss: 0.4572 - val_precision: 0.8635 - val_recall: 0.7486
Epoch 4/100
[1m611/611[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m15s[0m 25ms/step - accuracy: 0.8076 - auc: 0.9428 - loss: 0.4591 - precision: 0.8426 - reca

In [None]:
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Dropout, Reshape
from tensorflow.keras.layers import MultiHeadAttention, LayerNormalization, Dense, GlobalAveragePooling1D
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.metrics import Precision, Recall, AUC
import gc
from tensorflow.keras import backend as K

# Clear memory if re-running
gc.collect()
K.clear_session()

with tf.device('/CPU:0'):
    inp = Input(shape=(SAMPLE_LENGTH_HZ, 14, 1))  # EEG input shape

    # CNN block
    x = Conv2D(32, (1, 3), activation='relu', padding='same')(inp)
    x = MaxPooling2D(pool_size=(2, 1))(x)
    x = Dropout(0.2)(x)

    x = Conv2D(64, (1, 3), activation='relu', padding='same')(x)
    x = MaxPooling2D(pool_size=(2, 1))(x)
    x = Dropout(0.3)(x)

    # Reshape for transformer
    x = Reshape((x.shape[1], x.shape[2] * x.shape[3]))(x)

    # Transformer encoder block
    attn = MultiHeadAttention(num_heads=1, key_dim=16)(x, x)
    attn = LayerNormalization()(attn)

    ffn = Dense(64, activation='relu')(attn)
    ffn = LayerNormalization()(ffn)

    x = GlobalAveragePooling1D()(ffn)

    # Classifier
    x = Dense(64, activation='relu')(x)
    x = Dropout(0.4)(x)
    out = Dense(3, activation='softmax')(x)

    model = Model(inputs=inp, outputs=out)

    model.compile(
        optimizer=Adam(),
        loss='categorical_crossentropy',
        metrics=['accuracy', Precision(name='precision'), Recall(name='recall'), AUC(name='auc')]
    )

    model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=100, batch_size=16)

    evaluate_model(model)


End of 13/04/2025

## CNN

In [None]:
# Define the CNN model
model = Sequential([
    Conv2D(40, (1, 14), activation='relu', input_shape=(SAMPLE_LENGTH_HZ, 14, 1)),
    Conv2D(40, (1, 1), activation='relu'),
    AveragePooling2D(pool_size=(15, 1)),
    Dropout(0.5),
    Flatten(),
    Dense(80, activation='relu'),
    Dropout(0.5),
    Dense(3, activation='softmax')  # 3 classes for classification
])

# Compile the model
model.compile(
    optimizer='adam',
    loss='categorical_crossentropy',
    metrics=[
        'accuracy',             # Include default accuracy metric
        Precision(name='precision'),
        Recall(name='recall'),
        AUC(name='auc')         # Add AUC metric
    ]
)
# Define EarlyStopping
early_stopping = EarlyStopping(
    monitor='val_loss',        # Monitor validation loss
    patience=10,               # Stop after 10 epochs with no improvement
    restore_best_weights=True  # Restore the best weights
)




In [None]:
plot_model(model, to_file='model_architecture.png', show_shapes=True, show_layer_names=True)

In [None]:
model.summary()

In [None]:
# Train the model
history = model.fit(
    X_train, y_train,
    validation_data=(X_test, y_test),
    epochs=100,                # Max epochs
    batch_size=32,
    callbacks=[early_stopping]  # Add EarlyStopping callback
)


In [None]:
evaluate_model(model)

### Final result: CNN - 0.86 F1 score, 0.86 accuracy

## CRNN

In [None]:
model = Sequential([
    # Convolutional Layers
    Conv2D(40, (1, 14), activation='relu', input_shape=(SAMPLE_LENGTH_HZ, 14, 1)),
    Conv2D(40, (1, 1), activation='relu'),
    AveragePooling2D(pool_size=(5, 1)),  # Adjusted pooling to match (153, 1, 40) as in the architecture
    Dropout(0.5),
    
    # TimeDistributed Flatten for RNN compatibility
    TimeDistributed(Flatten()),  # Flatten while retaining sequence dimensions
    
    # LSTM Layer
    LSTM(40, return_sequences=False),  # Extract features from the temporal dimension
    
    # Fully Connected Layers
    Dense(15, activation='relu'),
    Dense(3, activation='softmax')  # 3 classes for classification
])

# Compile the model
model.compile(
    optimizer='adam',
    loss='categorical_crossentropy',
    metrics=[
        'accuracy',             # Include default accuracy metric
        Precision(name='precision'),
        Recall(name='recall'),
        AUC(name='auc')         # Add AUC metric
    ]
)
# Define EarlyStopping
early_stopping = EarlyStopping(
    monitor='val_loss',        # Monitor validation loss
    patience=10,               # Stop after 20 epochs with no improvement
    restore_best_weights=True  # Restore the best weights
)




In [None]:
model.summary()

In [None]:
# Train the model
history = model.fit(
    X_train, y_train,
    validation_data=(X_test, y_test),
    epochs=100,                # Max epochs
    batch_size=32,
    callbacks=[early_stopping]  # Add EarlyStopping callback
)

In [None]:
evaluate_model(model)

### Final result: CRNN - 0.89 F1 score, 0.89 accuracy

## Enhanced CRNN

In [None]:
model = Sequential([
    # Convolutional Layers
    Conv2D(128, (3, 3), activation='relu', input_shape=(SAMPLE_LENGTH_HZ, 14, 1)),
    BatchNormalization(),
    MaxPooling2D(pool_size=(2, 2)),
    Conv2D(256, (3, 3), activation='relu'),
    BatchNormalization(),
    MaxPooling2D(pool_size=(2, 2)),
    Dropout(0.3),
    
    # Global Pooling for dimensionality reduction
    TimeDistributed(Flatten()),
    
    # LSTM Layers
    Bidirectional(LSTM(64, return_sequences=True)),
    Dropout(0.3),
    LSTM(40, return_sequences=False),
    Dropout(0.3),
    
    # Fully Connected Layers
    Dense(128, activation='relu'),
    Dropout(0.5),
    Dense(64, activation='relu'),
    Dropout(0.5),
    Dense(3, activation='softmax')
])


# Compile the model
model.compile(
    optimizer='adam',
    loss='categorical_crossentropy',
    metrics=[
        'accuracy',             # Include default accuracy metric
        Precision(name='precision'),
        Recall(name='recall'),
        AUC(name='auc')         # Add AUC metric
    ]
)




In [None]:
model.summary()

In [None]:
# model.summary()
# Define EarlyStopping
# early_stopping = EarlyStopping(
#     monitor='val_accuracy',       
#     patience=10,               # Stop after 20 epochs with no improvement
#     restore_best_weights=True  # Restore the best weights
# )

# Train the model
history = model.fit(
    X_train, y_train,
    validation_data=(X_test, y_test),
    epochs=100,                # Max epochs
    batch_size=32  # Add EarlyStopping callback
)

In [None]:
evaluate_model(model)

In [None]:
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import (Input, Conv2D, BatchNormalization, MaxPooling2D, Dropout,
                                     TimeDistributed, Flatten, Bidirectional, LSTM,
                                     Dense, GlobalAveragePooling1D, Reshape)
from tensorflow.keras.metrics import Precision, Recall, AUC
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau



# Model inside strategy scope
with strategy.scope():
    input_layer = Input(shape=(SAMPLE_LENGTH_HZ, 14, 1))

    # Convolutional Layers
    x = Conv2D(128, (3, 3), activation='relu')(input_layer)
    x = BatchNormalization()(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)

    x = Conv2D(256, (3, 3), activation='relu')(x)
    x = BatchNormalization()(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    x = Dropout(0.3)(x)

    # Flatten spatial dims to feed into LSTM
    x = TimeDistributed(Flatten())(x)

    # LSTM Layers
    x = Bidirectional(LSTM(64, return_sequences=True))(x)
    x = Dropout(0.3)(x)

    # Instead of Attention (not TPU-compatible), use GlobalAveragePooling
    x = GlobalAveragePooling1D()(x)
    
    x = Reshape((1, -1))(x)  # optional if Dense expects 2D

    x = LSTM(40, return_sequences=False)(x)
    x = Dropout(0.3)(x)

    # Fully Connected Layers
    x = Dense(128, activation='relu')(x)
    x = Dropout(0.5)(x)
    x = Dense(64, activation='relu')(x)
    x = Dropout(0.5)(x)
    output = Dense(3, activation='softmax')(x)

    # Build model
    model = Model(inputs=input_layer, outputs=output)

    model.compile(
        optimizer='adam',
        loss='categorical_crossentropy',
        metrics=[
            'accuracy',
            Precision(name='precision'),
            Recall(name='recall'),
            AUC(name='auc')
        ]
    )

model.summary()

# Callbacks
early_stopping = EarlyStopping(monitor='val_accuracy', patience=20, restore_best_weights=True)
lr_scheduler = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, min_lr=1e-6, verbose=1)

# Fit the model
history = model.fit(
    X_train, y_train,
    validation_data=(X_test, y_test),
    epochs=100,
    batch_size=32 * strategy.num_replicas_in_sync,  # optimize for TPU
    callbacks=[early_stopping, lr_scheduler]
)


In [None]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import (Input, SeparableConv2D, BatchNormalization, MaxPooling2D, Dropout, 
                                     TimeDistributed, Flatten, Bidirectional, LSTM, Dense, Attention, 
                                     Reshape)
from tensorflow.keras.losses import CategoricalCrossentropy
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.metrics import Precision, Recall, AUC

# Input layer
input_layer = Input(shape=(SAMPLE_LENGTH_HZ, 14, 1))

# Convolutional Block
x = SeparableConv2D(64, (3, 3), activation='relu')(input_layer)
x = BatchNormalization()(x)
x = MaxPooling2D(pool_size=(2, 2))(x)

x = SeparableConv2D(128, (3, 3), activation='relu')(x)
x = BatchNormalization()(x)
x = MaxPooling2D(pool_size=(2, 2))(x)
x = Dropout(0.3)(x)

# Flatten spatial dims to feed into LSTM
x = TimeDistributed(Flatten())(x)

# BiLSTM Layer
x = Bidirectional(LSTM(64, return_sequences=True))(x)

# Attention block
attention_out = Attention()([x, x])  # Query = Value = x
x = Dropout(0.3)(attention_out)

# Optional second LSTM
x = LSTM(40, return_sequences=False)(x)
x = Dropout(0.3)(x)

# Dense Layers
x = Dense(128, activation='relu')(x)
x = Dropout(0.5)(x)
x = Dense(64, activation='relu')(x)
x = Dropout(0.5)(x)
output = Dense(3, activation='softmax')(x)

# Model
model = Model(inputs=input_layer, outputs=output)

# Compile the model
model.compile(
    optimizer=Adam(),
    loss=CategoricalCrossentropy(label_smoothing=0.1),
    metrics=['accuracy', Precision(name='precision'), Recall(name='recall'), AUC(name='auc')]
)

model.summary()

# Callbacks
early_stopping = EarlyStopping(monitor='val_accuracy', patience=10, restore_best_weights=True)
lr_scheduler = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, min_lr=1e-6, verbose=1)



In [None]:
history = model.fit(
    X_train, y_train,
    validation_data=(X_test, y_test),
    epochs=100,
    batch_size=32,
    callbacks=[early_stopping, lr_scheduler],
    # class_weight=class_weights  # Uncomment if needed
)


In [None]:
evaluate_model(model)

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.metrics import Precision, Recall, AUC
from tensorflow.keras.callbacks import EarlyStopping

# Reshape X_train and X_test if needed
# From shape: (samples, SAMPLE_LENGTH_HZ, 14, 1) → (samples, SAMPLE_LENGTH_HZ, 14)
# X_train_rnn = X_train.reshape((-1, SAMPLE_LENGTH_HZ, 14))
# X_test_rnn = X_test.reshape((-1, SAMPLE_LENGTH_HZ, 14))

model = Sequential([
    LSTM(64, return_sequences=True, input_shape=(SAMPLE_LENGTH_HZ, 14)),
    Dropout(0.5),
    LSTM(32),
    Dropout(0.5),
    Dense(80, activation='relu'),
    Dense(3, activation='softmax')  # 3 output classes
])

model.compile(
    optimizer='adam',
    loss='categorical_crossentropy',
    metrics=[
        'accuracy',
        Precision(name='precision'),
        Recall(name='recall'),
        AUC(name='auc')
    ]
)

early_stopping = EarlyStopping(
    monitor='val_loss',
    patience=10,
    restore_best_weights=True
)




In [None]:
model.summary()

In [None]:
history = model.fit(
    X_train, y_train,
    validation_data=(X_test, y_test),
    epochs=100,
    batch_size=32,
    callbacks=[early_stopping]
)

In [None]:
evaluate_model(model)

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import SimpleRNN, Dense, Dropout
from tensorflow.keras.metrics import Precision, Recall, AUC
from tensorflow.keras.callbacks import EarlyStopping

# Reshape input: from (samples, SAMPLE_LENGTH_HZ, 14, 1) to (samples, SAMPLE_LENGTH_HZ, 14)
# X_train_rnn = X_train.reshape((-1, SAMPLE_LENGTH_HZ, 14))
# X_test_rnn = X_test.reshape((-1, SAMPLE_LENGTH_HZ, 14))

# Build the RNN model
model = Sequential([
    SimpleRNN(64, return_sequences=True, input_shape=(SAMPLE_LENGTH_HZ, 14)),
    Dropout(0.5),
    SimpleRNN(32),
    Dropout(0.5),
    Dense(80, activation='relu'),
    Dropout(0.5),
    Dense(3, activation='softmax')  # 3 classes for classification
])

# Compile the model
model.compile(
    optimizer='adam',
    loss='categorical_crossentropy',
    metrics=[
        'accuracy',
        Precision(name='precision'),
        Recall(name='recall'),
        AUC(name='auc')
    ]
)

# Early stopping
early_stopping = EarlyStopping(
    monitor='val_loss',
    patience=10,
    restore_best_weights=True
)



In [None]:
model.summary()

In [None]:
# Train the model
history = model.fit(
    X_train, y_train,
    validation_data=(X_test, y_test),
    epochs=100,
    batch_size=32,
    callbacks=[early_stopping]
)


In [None]:
evaluate_model(model)

In [None]:
def print_confusion_matrix(y_true, y_pred, report=True):
    labels = sorted(list(set(y_true)))  # Ensure y_true contains class labels
    cmx_data = confusion_matrix(y_true, y_pred, labels=labels)

    df_cmx = pd.DataFrame(cmx_data, index=labels, columns=labels)

    fig, ax = plt.subplots(figsize=(14, 12))
    sns.heatmap(df_cmx, annot=True, fmt='g', square=False)
    ax.set_ylim(len(set(y_true)), 0)
    plt.xlabel("Predicted")
    plt.ylabel("Actual")
    plt.title("Confusion Matrix")
    plt.show()

    if report:
        print('Classification Report')
        print(classification_report(y_true, y_pred))

# Predict on the test set
Y_pred = model.predict(X_test)
y_pred = np.argmax(Y_pred, axis=1)

# Convert y_test from one-hot encoding to class labels
y_test_labels = np.argmax(y_test, axis=1)

# Call the function
print_confusion_matrix(y_test_labels, y_pred)

### Final result: enhanced CRNN - 0.93 F1 score, 0.93 accuracy

## Transformer

In [None]:
import torch
import torch.nn as nn

class Transformer(nn.Module):
    def __init__(self, input_dim=14, output_dim=3, seq_len=512, d_model=32, nhead=4, num_encoder_layers=2):
        super(Transformer, self).__init__()
        self.input_dim = input_dim
        self.d_model = d_model
        self.seq_len = seq_len
        
        # Linear layer to project the input to the desired embedding dimension
        self.embedding = nn.Linear(input_dim, d_model)
        
        # Positional encoding
        self.positional_encoding = nn.Parameter(torch.zeros(seq_len, d_model))
        
        # Transformer encoder
        self.transformer = nn.Transformer(
            d_model=d_model,
            nhead=nhead,
            num_encoder_layers=num_encoder_layers,
            dim_feedforward=256,
            dropout=0.1,
            batch_first=True
        )
        
        # Classification head
        self.classifier = nn.Sequential(
            nn.Linear(d_model, 64),
            nn.ReLU(),
            nn.Linear(64, output_dim),
            nn.Softmax(dim=-1)
        )

    def forward(self, x):
        # x shape: (batch_size, seq_len, input_dim, 1) -> (batch_size, seq_len, input_dim)
        x = x.squeeze(-1)
        
        # Project input to d_model dimension
        x = self.embedding(x)  # (batch_size, seq_len, d_model)
        
        # Add positional encoding
        x = x + self.positional_encoding
        
        # Pass through Transformer encoder
        x = self.transformer(x, x)  # (batch_size, seq_len, d_model)
        
        # Take the mean across the sequence length (global pooling)
        x = x.mean(dim=1)  # (batch_size, d_model)
        
        # Pass through classification head
        out = self.classifier(x)  # (batch_size, output_dim)
        return out


In [None]:
batch_size = 32
seq_len = 512
input_dim = 14
model = Transformer(input_dim=input_dim)

In [None]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f"Number of trainable parameters: {count_parameters(model)}")


In [None]:
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

# Check if GPU is available and set the device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Move the model to the GPU
model = model.to(device)

# Define the loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Convert data to PyTorch tensors and move to GPU
X_train_tensor = torch.tensor(X_train, dtype=torch.float32).to(device)
y_train_tensor = torch.tensor(y_train.argmax(axis=1), dtype=torch.long).to(device)

X_test_tensor = torch.tensor(X_test, dtype=torch.float32).to(device)
y_test_tensor = torch.tensor(y_test.argmax(axis=1), dtype=torch.long).to(device)

# Create DataLoaders
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)

test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
test_loader = DataLoader(test_dataset, batch_size=128, shuffle=False)




In [None]:
epochs = 30
for epoch in range(epochs):
    print(epoch)
    model.train()  # Set the model to training mode
    running_loss = 0.0
    correct = 0
    total = 0
    
    for X_batch, y_batch in train_loader:
        
        # Move data to GPU
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)
        
        optimizer.zero_grad()  # Clear gradients
        
        # Forward pass
        outputs = model(X_batch)
        
        # Calculate loss
        loss = criterion(outputs, y_batch)
        
        # Backward pass and optimization
        loss.backward()
        optimizer.step()
        
        # Calculate running loss and accuracy
        running_loss += loss.item()
        _, predicted = outputs.max(1)
        total += y_batch.size(0)
        correct += (predicted == y_batch).sum().item()
        
    epoch_loss = running_loss / len(train_loader)
    epoch_accuracy = 100.0 * correct / total
    print(f"Epoch {epoch+1}/{epochs}, Loss: {epoch_loss:.4f}, Accuracy: {epoch_accuracy:.2f}%")

# Evaluation on test set
model.eval()  # Set the model to evaluation mode
test_correct = 0
test_total = 0

with torch.no_grad():
    for X_batch, y_batch in test_loader:
        # Move data to GPU
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)
        
        outputs = model(X_batch)
        _, predicted = outputs.max(1)
        test_total += y_batch.size(0)
        test_correct += (predicted == y_batch).sum().item()

test_accuracy = 100.0 * test_correct / test_total
print(f"Test Accuracy: {test_accuracy:.2f}%")

In [None]:
from sklearn.metrics import f1_score

model.eval()  # Set the model to evaluation mode
test_correct = 0
test_total = 0

all_predictions = []
all_labels = []

with torch.no_grad():
    for X_batch, y_batch in test_loader:
        # Move data to GPU
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)
        
        outputs = model(X_batch)
        _, predicted = outputs.max(1)
        
        # Save predictions and true labels
        all_predictions.extend(predicted.cpu().numpy())
        all_labels.extend(y_batch.cpu().numpy())
        
        test_total += y_batch.size(0)
        test_correct += (predicted == y_batch).sum().item()

test_accuracy = 100.0 * test_correct / test_total
f1 = f1_score(all_labels, all_predictions, average='weighted')  
print(f"Test Accuracy: {test_accuracy:.2f}%")
print(f"F1 Score: {f1:.2f}")


### Final result: Transformer - 0.86 F1 score, 0.86 accuracy

**07/04/2025**

In [None]:
from sklearn.decomposition import PCA

# Flatten 4D to 2D: (samples, time_steps * channels)
X_flattened = X.reshape(X.shape[0], -1)
y_labels = np.argmax(y, axis=1)

# Reduce to 100 components
pca = PCA(n_components=100)
X_pca = pca.fit_transform(X_flattened)

X_train_svm, X_test_svm, y_train_svm, y_test_svm = train_test_split(
    X_pca, y_labels, test_size=0.2, random_state=42
)


In [None]:
from sklearn.svm import SVC
from sklearn.metrics import classification_report, confusion_matrix

In [None]:
svm_model = SVC(kernel='rbf', C=1)
svm_model.fit(X_train_svm, y_train_svm)

In [None]:
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score, classification_report, confusion_matrix, roc_auc_score

def evaluate_svm_model(model, X_test, y_test):
    y_pred = model.predict(X_test)

    precision = precision_score(y_test, y_pred, average='macro')
    recall = recall_score(y_test, y_pred, average='macro')
    f1 = f1_score(y_test, y_pred, average='macro')
    accuracy = accuracy_score(y_test, y_pred)

    # Print all metrics
    print(f"Test Accuracy: {accuracy:.4f}")
    print(f"Test Precision (macro): {precision:.4f}")
    print(f"Test Recall (macro): {recall:.4f}")
    print(f"Test F1 Score (macro): {f1:.4f}")
    print("\nClassification Report:\n", classification_report(y_test, y_pred))

    # Confusion Matrix
    cm = confusion_matrix(y_test, y_pred)
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
    plt.title("Confusion Matrix")
    plt.xlabel("Predicted")
    plt.ylabel("Actual")
    plt.show()


In [None]:
evaluate_svm_model(svm_model, X_test_svm, y_test_svm)

PCA 150 COMPONENTS

In [None]:
X_flattened = X.reshape(X.shape[0], -1)
y_labels = np.argmax(y, axis=1)

# Step 2: Standardize before PCA (optional but helps)
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X_flattened)

# Step 3: PCA
pca = PCA(n_components=150)  # Try 100, 150, 200 and compare
X_pca = pca.fit_transform(X_scaled)

# Step 4: Train-test split
X_train_svm, X_test_svm, y_train_svm, y_test_svm = train_test_split(
    X_pca, y_labels, test_size=0.2, random_state=42, stratify=y_labels
)

# Step 5: SVM with optimized hyperparameters
svm_model = SVC(kernel='rbf', C=10, gamma='scale', probability=True)
svm_model.fit(X_train_svm, y_train_svm)

In [None]:
evaluate_svm_model(svm_model, X_test_svm, y_test_svm)

**ANN**

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Flatten
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.metrics import Precision, Recall, AUC
from tensorflow.keras.callbacks import EarlyStopping

# Flatten data for ANN
X_flattened = X.reshape(X.shape[0], -1)  # Shape: (samples, 728 * 14)

# Split into train/test
X_train_flat, X_test_flat, y_train_ann, y_test_ann = train_test_split(
    X_flattened, y, test_size=0.2, random_state=42
)



In [None]:
# Build ANN model
ann_model = Sequential([
    Dense(256, activation='relu', input_shape=(X_flattened.shape[1],)),
    Dropout(0.3),
    Dense(128, activation='relu'),
    Dropout(0.3),
    Dense(64, activation='relu'),
    Dropout(0.2),
    Dense(3, activation='softmax')  # 3 classes
])



In [None]:
# Compile model
ann_model.compile(
    optimizer=Adam(learning_rate=0.001),
    loss='categorical_crossentropy',
    metrics=['accuracy', Precision(), Recall(), AUC()]
)

# Train the model
early_stop = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)


In [None]:

history = ann_model.fit(
    X_train_flat, y_train_ann,
    validation_split=0.2,
    epochs=50,
    batch_size=64,
    callbacks=[early_stop],
    verbose=1
)

In [None]:
# Evaluate the model
def evaluate_ann_model(model,X_test,y_test):
    results = model.evaluate(X_test, y_test)
    test_loss, test_accuracy, test_precision, test_recall, test_auc = results
    test_f1 = f1_score(test_precision, test_recall)
    # Print results
    print(f"Test Loss: {test_loss:.4f}")
    print(f"Test Accuracy: {test_accuracy:.4f}")
    print(f"Test Precision: {test_precision:.4f}")
    print(f"Test Recall: {test_recall:.4f}")
    print(f"Test AUC: {test_auc:.4f}")
    print(f"Test F1: {test_f1:.4f}")


In [None]:
evaluate_ann_model(ann_model,X_test_flat,y_test_ann)

**CNN + LSTM**

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import TimeDistributed, Conv2D, MaxPooling2D, Flatten
from tensorflow.keras.layers import LSTM, Dropout, Dense, BatchNormalization
from tensorflow.keras.metrics import Precision, Recall, AUC
from tensorflow.keras.optimizers import Adam

model = Sequential()

# Apply Conv2D across time-distributed EEG frames
model.add(TimeDistributed(Conv2D(32, (3, 3), activation='relu'), input_shape=(128, 4, 14, 1)))
model.add(TimeDistributed(MaxPooling2D((2, 2))))
model.add(TimeDistributed(BatchNormalization()))
model.add(TimeDistributed(Flatten()))

# LSTM to learn sequence
model.add(LSTM(64, return_sequences=False))
model.add(Dropout(0.5))

# Fully connected output
model.add(Dense(64, activation='relu'))
model.add(Dense(3, activation='softmax'))

In [None]:
# Compile
model.compile(
    optimizer=Adam(learning_rate=0.001),
    loss='categorical_crossentropy',
    metrics=['accuracy', Precision(), Recall(), AUC()]
)

In [None]:

model.summary()

In [None]:
X_cnn_lstm = X.reshape(X.shape[0], 128, 4, 14, 1)

In [None]:
model.fit(X_cnn_lstm, y, validation_split=0.2, epochs=30, batch_size=64)

In [None]:
from sklearn.model_selection import train_test_split

# Make sure you split after reshaping
X_train, X_test, y_train, y_test = train_test_split(X_cnn_lstm, y, test_size=0.2, random_state=42)

In [None]:
from sklearn.metrics import f1_score as sk_f1_score, classification_report, confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np

def evaluate_cnn_lstm_model(model, X_test, y_test):
    # Get predictions
    y_pred_prob = model.predict(X_test)
    y_pred = np.argmax(y_pred_prob, axis=1)
    y_true = np.argmax(y_test, axis=1)

    # Evaluate metrics
    test_loss, test_accuracy, test_precision, test_recall, test_auc = model.evaluate(X_test, y_test, verbose=0)
    test_f1 = 2 * ((test_precision * test_recall) / (test_precision + test_recall + 1e-8))

    # Print results
    print(f"Test Loss: {test_loss:.4f}")
    print(f"Test Accuracy: {test_accuracy:.4f}")
    print(f"Test Precision: {test_precision:.4f}")
    print(f"Test Recall: {test_recall:.4f}")
    print(f"Test AUC: {test_auc:.4f}")
    print(f"Test F1 Score: {test_f1:.4f}")

    # Classification Report
    print("\nClassification Report:\n", classification_report(y_true, y_pred))

    # Confusion Matrix
    cm = confusion_matrix(y_true, y_pred)
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
    plt.title("Confusion Matrix")
    plt.xlabel("Predicted")
    plt.ylabel("Actual")
    plt.show()


In [None]:
evaluate_cnn_lstm_model(model, X_test, y_test)

CNN + lstm + svm

In [None]:
import numpy as np
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, BatchNormalization, Flatten
from tensorflow.keras.layers import TimeDistributed, LSTM
from tensorflow.keras.optimizers import Adam

# Reshape your data (Assumes original X shape is (samples, 128, 4, 14))
X_cnn_lstm = X.reshape(X.shape[0], 128, 4, 14, 1)
y_labels = np.argmax(y, axis=1)

# Build the CNN-LSTM Feature Extractor
input_layer = Input(shape=(128, 4, 14, 1))

x = TimeDistributed(Conv2D(64, (3, 3), activation='relu', padding='same'))(input_layer)
x = TimeDistributed(BatchNormalization())(x)
x = TimeDistributed(MaxPooling2D((2, 2)))(x)

x = TimeDistributed(Conv2D(128, (3, 3), activation='relu', padding='same'))(x)
x = TimeDistributed(BatchNormalization())(x)
x = TimeDistributed(MaxPooling2D((2, 2)))(x)

x = TimeDistributed(Flatten())(x)
x = LSTM(128, return_sequences=False)(x)

# Define feature extractor model
feature_extractor = Model(inputs=input_layer, outputs=x)
feature_extractor.compile(optimizer=Adam(), loss='mse')
feature_extractor.summary()


In [None]:
# Extract CNN-LSTM features
features = feature_extractor.predict(X_cnn_lstm, verbose=1)


In [None]:
from sklearn.model_selection import train_test_split

X_train_svm, X_test_svm, y_train_svm, y_test_svm = train_test_split(
    features, y_labels, test_size=0.2, random_state=42
)


In [None]:
from sklearn.svm import SVC

svm_model = SVC(kernel='rbf', C=10, gamma='scale', probability=True)  # You can tune C, gamma
svm_model.fit(X_train_svm, y_train_svm)


In [None]:
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score, f1_score
import seaborn as sns
import matplotlib.pyplot as plt

def evaluate_svm_model(model, X_test, y_test):
    y_pred = model.predict(X_test)

    print(f"Accuracy: {accuracy_score(y_test, y_pred):.4f}")
    print(f"F1 Score (macro): {f1_score(y_test, y_pred, average='macro'):.4f}")
    print("\nClassification Report:\n", classification_report(y_test, y_pred))

    cm = confusion_matrix(y_test, y_pred)
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
    plt.title("Confusion Matrix")
    plt.xlabel("Predicted")
    plt.ylabel("Actual")
    plt.show()

# Call the evaluation
evaluate_svm_model(svm_model, X_test_svm, y_test_svm)


**RANDOM FOREST**

In [None]:
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score, f1_score
import seaborn as sns
import matplotlib.pyplot as plt

def evaluate_random_model(model, X_test, y_test):
    y_pred = model.predict(X_test)

    print(f"Accuracy: {accuracy_score(y_test, y_pred):.4f}")
    print(f"F1 Score (macro): {f1_score(y_test, y_pred, average='macro'):.4f}")
    print("\nClassification Report:\n", classification_report(y_test, y_pred))

    cm = confusion_matrix(y_test, y_pred)
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
    plt.title("Confusion Matrix")
    plt.xlabel("Predicted")
    plt.ylabel("Actual")
    plt.show()

In [None]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.decomposition import PCA

X_flattened = X.reshape(X.shape[0], -1)
y_labels = np.argmax(y, axis=1)

# Optional: Reduce dimensions
pca = PCA(n_components=100)
X_pca = pca.fit_transform(X_flattened)

X_train_rf, X_test_rf, y_train_rf, y_test_rf = train_test_split(X_pca, y_labels, test_size=0.2)

rf_model = RandomForestClassifier(n_estimators=200)
rf_model.fit(X_train_rf, y_train_rf)
y_pred = rf_model.predict(X_test_rf)

print(classification_report(y_test_rf, y_pred))


In [None]:
evaluate_random_model(rf_model, X_test_rf, y_test_rf)

**TRANSFORMER MODEL**

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.layers import Dense, Dropout, LayerNormalization, MultiHeadAttention
from tensorflow.keras.models import Model
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
import matplotlib.pyplot as plt
import seaborn as sns

# ✅ Assuming you have EEG data ready as (samples, 512, 14)
# X shape: (samples, 512, 14)
# y shape: one-hot encoded with 3 classes

# Split data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# -------------------------------
# Transformer Encoder Block
# -------------------------------
def transformer_encoder(inputs, head_size=64, num_heads=4, ff_dim=128, dropout=0.1):
    # Multi-head self-attention
    x = MultiHeadAttention(num_heads=num_heads, key_dim=head_size)(inputs, inputs)
    x = Dropout(dropout)(x)
    x = LayerNormalization(epsilon=1e-6)(x + inputs)

    # Feed-forward network with residual
    ff = layers.Dense(ff_dim, activation="relu")(x)
    ff = layers.Dense(x.shape[-1])(ff)  # Ensures matching dimensions
    ff = Dropout(dropout)(ff)
    x = LayerNormalization(epsilon=1e-6)(x + ff)
    return x

# -------------------------------
# Build the Transformer Model
# -------------------------------
input_layer = layers.Input(shape=(512, 14))

# Positional Encoding (simple learnable projection here)
x = layers.Dense(64)(input_layer)

# Stack multiple transformer blocks
for _ in range(2):  # try 2 or 3 layers
    x = transformer_encoder(x)

# Global pooling and dense layers
x = layers.GlobalAveragePooling1D()(x)
x = Dropout(0.3)(x)
x = Dense(64, activation='relu')(x)
x = Dropout(0.3)(x)
output_layer = Dense(3, activation='softmax')(x)

model = Model(inputs=input_layer, outputs=output_layer)

# Compile
model.compile(optimizer='adam',
              loss='categorical_crossentropy',
              metrics=['accuracy',
                       tf.keras.metrics.Precision(name='precision'),
                       tf.keras.metrics.Recall(name='recall'),
                       tf.keras.metrics.AUC(name='auc')])

model.summary()


In [None]:
history = model.fit(X_train, y_train, validation_split=0.2, epochs=30, batch_size=64)

In [None]:
# -------------------------------
# Evaluation Function
# -------------------------------
def evaluate_model(model, X_test, y_test):
    results = model.evaluate(X_test, y_test)
    test_loss, test_accuracy, test_precision, test_recall, test_auc = results
    test_f1 = 2 * (test_precision * test_recall) / (test_precision + test_recall + 1e-7)

    print(f"\nTest Loss: {test_loss:.4f}")
    print(f"Test Accuracy: {test_accuracy:.4f}")
    print(f"Test Precision: {test_precision:.4f}")
    print(f"Test Recall: {test_recall:.4f}")
    print(f"Test AUC: {test_auc:.4f}")
    print(f"Test F1 Score: {test_f1:.4f}")

    # Classification report
    y_pred = model.predict(X_test)
    y_pred_classes = np.argmax(y_pred, axis=1)
    y_true = np.argmax(y_test, axis=1)

    print("\nClassification Report:\n", classification_report(y_true, y_pred_classes))

    cm = confusion_matrix(y_true, y_pred_classes)
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
    plt.title("Confusion Matrix")
    plt.xlabel("Predicted")
    plt.ylabel("True")
    plt.show()

# -------------------------------
# Evaluate
# -------------------------------
evaluate_model(model, X_test, y_test)