In [1]:
##### Libraries #####
import dotenv
dotenv.load_dotenv(".env")
import os
import time
import utils
import shutil
import argparse
import numpy as np
np.random.seed(0)
import tensorflow as tf
tf.random.set_seed(0)
tf.keras.utils.set_random_seed(0)
import models_tensorflow.EEGModels
from typing import Literal
from tensorflow.keras import backend
from tensorflow.keras import utils as tf_utils
backend.set_image_data_format("channels_last")
from sklearn.model_selection import train_test_split

import utils
from utils import Metric, plot_confusion_matrix, plot_history
from libs.dataset import BcicIv2aDataset, InnerSpeechDataset

In [2]:
os.environ["DATASET_DIR"] = 'datasets'

In [3]:
dataset = BcicIv2aDataset()  # l_freq=4
inputs, truths = dataset.all_data_and_label

Loading BCIC IV 2a dataset - A09E: 100%|████████| 18/18 [00:22<00:00,  1.27s/it]


In [4]:
inputs.shape

(5184, 22, 257)

In [5]:
truths.shape

(5184, 4)

In [6]:
truths[0]

array([False, False, False,  True])

In [7]:
# !python -m pip install tensorflow-addons

In [8]:
# !conda install -c conda-forge tensorflow-addons -y

In [9]:
inputs = np.transpose(inputs, (0, 2, 1))
print(inputs.shape)

(5184, 257, 22)


## Transformer Build and Training

In [10]:
# from keras import backend as K

In [30]:
from tensorflow import keras
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras.layers import MultiHeadAttention, Dense, Dropout
from tensorflow.keras.callbacks import LearningRateScheduler

# Time2Vec layer: This layer provides a method to encode both linear and periodic components of time into the model inputs.
class Time2Vec(keras.layers.Layer):
    def __init__(self, kernel_size=1):
        super(Time2Vec, self).__init__(trainable=True, name='Time2VecLayer')
        self.k = kernel_size
    
    def build(self, input_shape):
        # trend
        self.wb = self.add_weight(name='wb',shape=(input_shape[1],),initializer='uniform',trainable=True)
        self.bb = self.add_weight(name='bb',shape=(input_shape[1],),initializer='uniform',trainable=True)
        # periodic
        self.wa = self.add_weight(name='wa',shape=(1, input_shape[1], self.k),initializer='uniform',trainable=True)
        self.ba = self.add_weight(name='ba',shape=(1, input_shape[1], self.k),initializer='uniform',trainable=True)
        super(Time2Vec, self).build(input_shape)
    
    def call(self, inputs, **kwargs):
        bias = self.wb * inputs + self.bb
        # dp = K.dot(inputs, self.wa) + self.ba
        dp = tf.reduce_sum(inputs * self.wa, axis=-1, keepdims=True) + self.ba  # Weighted sum across the time dimension
        # wgts = K.sin(dp) # or K.cos(.)
        wgts = tf.math.sin(dp)  # Apply the sinusoidal function
        

        ret = K.concatenate([K.expand_dims(bias, -1), wgts], -1)
        ret = K.reshape(ret, (-1, inputs.shape[1]*(self.k+1)))
        
        return ret
    
    def compute_output_shape(self, input_shape):
        return (input_shape[0], input_shape[1]*(self.k + 1))



# AttentionBlock: This is a custom layer that incorporates multi-head self-attention mechanism, allowing the model to focus on different parts of the input sequence.
# class AttentionBlock(keras.Model):
#     def __init__(self, num_heads=2, head_size=128, ff_dim=256, dropout=0.1, feature_dim=515, **kwargs):
#         super(AttentionBlock, self).__init__(**kwargs)
#         self.num_heads = num_heads
#         self.head_size = head_size
#         self.dropout = dropout
#         self.attention = MultiHeadAttention(num_heads=num_heads, key_dim=head_size, dropout=dropout)
#         self.att_norm = keras.layers.LayerNormalization(epsilon=1e-6)
#         self.ff_norm = keras.layers.LayerNormalization(epsilon=1e-6)
#         self.ff_conv1 = Dense(ff_dim, activation='relu')
#         self.ff_conv2 = Dense(feature_dim)  # Ensure this matches the input feature dimension

#     def call(self, inputs, training=False):
#         attn_output = self.attention(query=inputs, key=inputs, value=inputs)
#         attn_output = Dropout(self.dropout)(attn_output, training=training)
#         out1 = self.att_norm(inputs + attn_output)

#         ffn_output = self.ff_conv1(out1)
#         ffn_output = self.ff_conv2(ffn_output)
#         ffn_output = Dropout(self.dropout)(ffn_output, training=training)
#         return self.ff_norm(out1 + ffn_output)

class AttentionBlock(keras.Model):
    def __init__(self, name='AttentionBlock', num_heads=2, head_size=128, ff_dim=None, dropout=0, **kwargs):
        super().__init__(name=name, **kwargs)

        if ff_dim is None:
            ff_dim = head_size

        self.attention = MultiHeadAttention(num_heads=num_heads, key_dim=head_size, dropout=dropout)
        self.attention_dropout = keras.layers.Dropout(dropout)
        self.attention_norm = keras.layers.LayerNormalization(epsilon=1e-6)

        self.ff_conv1 = keras.layers.Conv1D(filters=ff_dim, kernel_size=1, activation='relu')
        # self.ff_conv2 at build()
        self.ff_dropout = keras.layers.Dropout(dropout)
        self.ff_norm = keras.layers.LayerNormalization(epsilon=1e-6)

    def build(self, input_shape):
        self.ff_conv2 = keras.layers.Conv1D(filters=input_shape[-1], kernel_size=1) 

    def call(self, inputs):
        x = self.attention(query=inputs, key=inputs, value=inputs)
        x = self.attention_dropout(x)
        x = self.attention_norm(inputs + x)

        x = self.ff_conv1(x)
        x = self.ff_conv2(x)
        x = self.ff_dropout(x)

        x = self.ff_norm(inputs + x)
        return x



# CAN NAME THIS EEGTransformerModel:
# TransformerModel: This class defines the overall model architecture combining Time2Vec and multiple AttentionBlocks.
class TransformerModel(keras.Model):
    def __init__(self, time2vec_dim=1, num_heads=2, head_size=128, ff_dim=None, num_layers=1, dropout=0.1):
        super().__init__()
        self.time2vec = Time2Vec(kernel_size=time2vec_dim)
        self.attention_layers = [
            AttentionBlock(num_heads=num_heads, head_size=head_size, ff_dim=head_size if ff_dim is None else ff_dim, dropout=dropout)
            for _ in range(num_layers)
        ]
        self.global_pool = layers.GlobalAveragePooling1D()
        self.output_layer = layers.Dense(22, activation='linear')  # Assuming regression type output for each channel

    def call(self, inputs):
        time_embeddings = layers.TimeDistributed(self.time2vec)(inputs)
        x = tf.concat([inputs, time_embeddings], axis=-1)
        for attention_layer in self.attention_layers:
            x = attention_layer(x)
        x = self.global_pool(x)
        return self.output_layer(x)

In [31]:
inputs.shape

(5184, 257, 22)

In [13]:
# Example function to load dataset (this should be replaced with actual data loading)
def load_dataset():
    x =  inputs #  np.random.rand(100, 22, 257)  # Simulated input data
    y =  inputs # np.random.rand(100, 22, 257)  # Simulated ground truth data
    return x, y


In [14]:
inputs, truths = load_dataset()

In [15]:
truths.shape

(5184, 257, 22)

In [16]:
# Normalize data to range [0, 1] for consistency and to aid learning.
inputs = (inputs - np.min(inputs)) / (np.max(inputs) - np.min(inputs))
truths = (truths - np.min(truths)) / (np.max(truths) - np.min(truths))

In [17]:
# Split the data into training and testing sets to evaluate model performance.
x_train, x_test, y_train, y_test = train_test_split(inputs, truths, test_size=0.2, random_state=42)

# X_train, X_val, y_train, y_val = train_test_split(scaled_inputs, truths, test_size=0.2, random_state=42)

# history = model.fit(X_train, y_train, epochs=50, batch_size=32, validation_data=(X_val, y_val))


In [18]:
print(x_train.shape)
print(y_train.shape)
print(x_test.shape)
print(y_test.shape)


(4147, 257, 22)
(4147, 257, 22)
(1037, 257, 22)
(1037, 257, 22)


In [32]:
# # Configure and compile the model
# model = TransformerModel(time2vec_dim=1, num_heads=2, head_size=128, ff_dim=256, num_layers=3, dropout=0.1)
# model.compile(optimizer='adam', loss='mean_squared_error')  # Use Adam optimizer and MSE loss function

# Model instantiation and compilation
model = TransformerModel(time2vec_dim=1, num_heads=2, head_size=128, num_layers=1, dropout=0.1)
model.compile(optimizer='adam', loss='mse')

In [20]:
# model.summary()

In [21]:
# Define a Learning Rate Scheduler to adjust the learning rate dynamically during training for better performance.
def lr_scheduler(epoch, lr):
    warmup_epochs = 15
    decay_epochs = 100
    initial_lr = 1e-6
    base_lr = 1e-3
    min_lr = 5e-5
    if epoch <= warmup_epochs:
        pct = epoch / warmup_epochs
        return ((base_lr - initial_lr) * pct) + initial_lr
    if epoch > warmup_epochs and epoch < warmup_epochs + decay_epochs:
        pct = 1 - ((epoch - warmup_epochs) / decay_epochs)
        return ((base_lr - min_lr) * pct) + min_lr
    return min_lr


In [22]:
# from keras import backend as K
import tensorflow.keras.backend as K

In [33]:
callback_list = [LearningRateScheduler(lr_scheduler, verbose=1)]

# Train the model with the specified configurations.
model.fit(x_train, y_train, epochs=5, validation_data=(x_test, y_test), batch_size=32) # callbacks=callback_list

Epoch 1/5


IndexError: Exception encountered when calling TransformerModel.call().

[1mtuple index out of range[0m

Arguments received by TransformerModel.call():
  • inputs=tf.Tensor(shape=(None, 257, 22), dtype=float32)

In [79]:
# Assuming 'model' is the trained Transformer model and 'x_test' is the test dataset

# Predict the outputs for the test set
predicted_outputs = model.predict(x_train)

# You can now use 'predicted_outputs' for further analysis, visualization, or post-processing
# For example, printing the shape of the outputs and some sample data
print("Shape of the predicted outputs:", predicted_outputs.shape)
# print("Sample predictions:", predicted_outputs[:5])

# If you need to compare these predictions with the actual labels
# Assuming 'y_test' contains the true values for the test set
# print("Actual true outputs:", y_train[:5])

[1m130/130[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 33ms/step
Shape of the predicted outputs: (4147, 22, 257)


In [84]:


# # calculate the performance metrics, 'Mean Squared Error'
# from sklearn.metrics import mean_squared_error

# mse = mean_squared_error(y_train, predicted_outputs)
# print("Mean Squared Error on Test Set:", mse)


In [81]:
# Calculate MSE for each feature and average over all time steps
mse_per_feature = np.mean([mean_squared_error(y_train[:, i, :], predicted_outputs[:, i, :]) for i in range(y_train.shape[1])])
print("Mean Squared Error per feature, averaged over time steps:", mse_per_feature)

Mean Squared Error per feature, averaged over time steps: 0.002535971542483811


In [83]:
from sklearn.metrics import mean_squared_error
import numpy as np

# Initialize a list or numpy array to store the MSE values for each channel
mse_per_channel = np.zeros((y_train.shape[1],))  # y_train.shape[1] should be 22 if there are 22 channels

# Loop through each channel and calculate MSE
for channel_index in range(y_train.shape[1]):
    # Extract the channel data for both true and predicted values
    true_channel_data = y_train[:, channel_index, :]
    predicted_channel_data = predicted_outputs[:, channel_index, :]

    # Compute the MSE for this channel
    mse_per_channel[channel_index] = mean_squared_error(true_channel_data, predicted_channel_data)

    # Optionally, print the MSE for each channel
    print(f"MSE for Channel {channel_index + 1}: {mse_per_channel[channel_index]}")


MSE for Channel 1: 0.002892112152690684
MSE for Channel 2: 0.002365045193164491
MSE for Channel 3: 0.00265804448155546
MSE for Channel 4: 0.002930883197617
MSE for Channel 5: 0.0027586019778895925
MSE for Channel 6: 0.0025873186748943417
MSE for Channel 7: 0.001653848754940294
MSE for Channel 8: 0.002205535325460935
MSE for Channel 9: 0.0024582938756672764
MSE for Channel 10: 0.002783532164773731
MSE for Channel 11: 0.0026102892130813664
MSE for Channel 12: 0.002585270767988162
MSE for Channel 13: 0.002167161265563858
MSE for Channel 14: 0.002296732904307401
MSE for Channel 15: 0.0024126604748039232
MSE for Channel 16: 0.002615999525986123
MSE for Channel 17: 0.0025995530125059595
MSE for Channel 18: 0.0027031042723586005
MSE for Channel 19: 0.0024519940378041174
MSE for Channel 20: 0.002726288487523727
MSE for Channel 21: 0.0026341128879922068
MSE for Channel 22: 0.0026949912860745858


To DO:
We plan to jointly train this Transformer Denoiser network with EEGNet using a joint Loss function: Reconstruction loss for Denoiser + BCE loss for classification. 

Training standalone Transformer on reconstruction task is not necessary unless to make sure it's working. Fine tuning model hyperparameters and architecture has to be performed jointly on Transformer(Denoiser) + EEGNet pipeline, with frozen weights for EEGNet. 

Immediate task: Train the EEGNet by running necessary scripts and save the model weights, as they are currently missing. 

Next, import the EEGNet model from python script as a library into the Transformer notebook, instantiate the model, load the weights, and make trainable=False to freeze the weights. 

Then, build the pipeline of Transformer + EEGNet and optimize(train) the Transformer model using the joint loss function. 

Finally, write this as a script.

Note: For Transformer architecture, you can use classical time encoding methods or use learning methods like time2vec with trainable weights

Note: You can either design the Denoiser as an offset producer (have a skip connection from input and subtract it with pre-final output to effectively output the offset requiret to Denoise EEG data. OR view the Denoiser as a signal enhancer that will produce a cleaner version of raw signal with only important features, removing irrelevant details.