In [1]:
%pip install mne

Note: you may need to restart the kernel to use updated packages.


In [1]:
import mne
import numpy as np
import matplotlib.pyplot as plt

In [2]:
all_edf_file_path='/Users/shivanshmundra/Downloads/eeg-during-mental-arithmetic-tasks-1.0.0/edf files'

In [3]:
import os
rest_files = []
stress_files = []

# Iterate through the folder to collect file paths
for file_name in os.listdir(all_edf_file_path):
    if file_name.endswith('_1.edf'):
        rest_files.append(os.path.join(all_edf_file_path, file_name))
    elif file_name.endswith('_2.edf'):
        stress_files.append(os.path.join(all_edf_file_path, file_name))

In [4]:
print(f"\nNumber of rest files: {len(rest_files)}")
print(f"Number of stress files: {len(stress_files)}")


Number of rest files: 36
Number of stress files: 36


In [5]:
def read_raw_data(file_path):
    raw_data=mne.io.read_raw_edf(file_path, preload=True)
    raw_array=raw_data.get_data()
    return raw_array

In [6]:
all_rest_raw_array=[read_raw_data(i) for i in rest_files]
all_stress_raw_array=[read_raw_data(i) for i in stress_files]

Extracting EDF parameters from /Users/shivanshmundra/Downloads/eeg-during-mental-arithmetic-tasks-1.0.0/edf files/Subject20_1.edf...
EDF file detected
Setting channel info structure...
Creating raw.info structure...
Reading 0 ... 90999  =      0.000 ...   181.998 secs...
Extracting EDF parameters from /Users/shivanshmundra/Downloads/eeg-during-mental-arithmetic-tasks-1.0.0/edf files/Subject04_1.edf...
EDF file detected
Setting channel info structure...
Creating raw.info structure...
Reading 0 ... 84999  =      0.000 ...   169.998 secs...
Extracting EDF parameters from /Users/shivanshmundra/Downloads/eeg-during-mental-arithmetic-tasks-1.0.0/edf files/Subject06_1.edf...
EDF file detected
Setting channel info structure...
Creating raw.info structure...
Reading 0 ... 90999  =      0.000 ...   181.998 secs...
Extracting EDF parameters from /Users/shivanshmundra/Downloads/eeg-during-mental-arithmetic-tasks-1.0.0/edf files/Subject22_1.edf...
EDF file detected
Setting channel info structure...

In [7]:
all_rest_raw_array[0].shape

(21, 91000)

In [8]:
def read_data(file_path):
    raw_data=mne.io.read_raw_edf(file_path, preload=True)
    epochs = mne.make_fixed_length_epochs(raw_data, duration=2, overlap=1, preload=False)
    epoch_array=epochs.get_data()
    return epoch_array


In [9]:
all_rest_edf_array=[read_data(i) for i in rest_files]
all_stress_edf_array=[read_data(i) for i in stress_files]

Extracting EDF parameters from /Users/shivanshmundra/Downloads/eeg-during-mental-arithmetic-tasks-1.0.0/edf files/Subject20_1.edf...
EDF file detected
Setting channel info structure...
Creating raw.info structure...
Reading 0 ... 90999  =      0.000 ...   181.998 secs...
Not setting metadata
181 matching events found
No baseline correction applied
0 projection items activated
Using data from preloaded Raw for 181 events and 1000 original time points ...
0 bad epochs dropped
Extracting EDF parameters from /Users/shivanshmundra/Downloads/eeg-during-mental-arithmetic-tasks-1.0.0/edf files/Subject04_1.edf...
EDF file detected
Setting channel info structure...
Creating raw.info structure...
Reading 0 ... 84999  =      0.000 ...   169.998 secs...
Not setting metadata
169 matching events found
No baseline correction applied
0 projection items activated
Using data from preloaded Raw for 169 events and 1000 original time points ...
0 bad epochs dropped
Extracting EDF parameters from /Users/shiv

In [10]:
all_rest_edf_array[0].shape

(181, 21, 1000)

In [11]:
rest_edf_labels=[len(i)*[0] for i in all_rest_edf_array]
stress_edf_labels=[len(i)*[1] for i in all_stress_edf_array]

print(len(rest_edf_labels),len(stress_edf_labels))

36 36


In [12]:

labeled_edf_data=rest_edf_labels + stress_edf_labels


In [13]:
all_epoch_data=all_rest_edf_array + all_stress_edf_array

In [20]:
##Building the Variational Autoencoders for data Augmentation. The model is referenced from a research paper 

#importing all librararies
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, Conv2DTranspose, BatchNormalization, LeakyReLU, Flatten, Dense, Reshape, Lambda
from tensorflow.keras.losses import mse
from tensorflow.keras.losses import binary_crossentropy
from tensorflow.keras import backend as K
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, classification_report

#function for encoder
def build_encoder(input_shape):
    inputs = Input(shape=input_shape)
    
    # Temporal convolution layer
    x = Conv2D(8, (1, 64), padding='same', use_bias=False)(inputs)
    x = BatchNormalization()(x)
    x = LeakyReLU(alpha=0.2)(x)
    
    # Spatial convolution layer
    x = Conv2D(8, (21, 1), padding='same', use_bias=False)(x)
    x = BatchNormalization()(x)
    x = LeakyReLU(alpha=0.2)(x)
    
    shape_before_flattening = K.int_shape(x)
    x = Flatten()(x)
    
    
    # Mean and variance for latent space
    z_mean = Dense(2, name='z_mean')(x)
    z_log_var = Dense(2, name='z_log_var')(x)
    ## sampling layer
    def sampling(args):
        z_mean, z_log_var = args
        batch = K.shape(z_mean)[0]
        dim = K.int_shape(z_mean)[1]
        epsilon = K.random_normal(shape=(batch, dim))
        return z_mean + K.exp(0.5 * z_log_var) * epsilon
    
    z = Lambda(sampling, output_shape=(2,), name='z')([z_mean, z_log_var])
    
    return Model(inputs, [z_mean, z_log_var, z], name='encoder'), shape_before_flattening

def build_decoder(latent_dim, shape_before_flattening):
    latent_inputs = Input(shape=(latent_dim,))
    
    x = Dense(np.prod(shape_before_flattening[1:]), activation='relu')(latent_inputs)
    x = Reshape(shape_before_flattening[1:])(x)
    
    # Inverse of spatial convolution layer
    x = Conv2DTranspose(8, (21, 1), padding='same', use_bias=False)(x)
    x = BatchNormalization()(x)
    x = LeakyReLU(alpha=0.2)(x)
    
    # Inverse of temporal convolution layer
    x = Conv2DTranspose(1, (1, 64), padding='same', use_bias=False)(x)
    x = BatchNormalization()(x)
    x = LeakyReLU(alpha=0.2)(x)

    outputs = x

    return Model(latent_inputs, outputs, name='decoder')

def build_vae(input_shape, latent_dim):
    encoder, shape_before_flattening = build_encoder(input_shape)
    decoder = build_decoder(latent_dim, shape_before_flattening)
    
    inputs = Input(shape=input_shape)
    z_mean, z_log_var, z = encoder(inputs)
    outputs = decoder(z)
    
    vae = Model(inputs, outputs, name='vae')
    ##defining the losses need for VAE
    reconstruction_loss = mse(K.flatten(inputs), K.flatten(outputs))
    reconstruction_loss *= input_shape[0] * input_shape[1]
    kl_loss = 1 + z_log_var - K.square(z_mean) - K.exp(z_log_var)
    kl_loss = K.sum(kl_loss, axis=-1)
    kl_loss *= -0.5
    vae_loss = K.mean(reconstruction_loss + kl_loss)
    
    vae.add_loss(vae_loss)
    vae.compile(optimizer='adam')
    
    return vae, encoder, decoder

In [21]:
## checking if all shapes are in correct order 
from sklearn.preprocessing import MinMaxScaler
scaler=MinMaxScaler()
input_shape=(21,1000,1)
latent_dim=2
x_dummy = np.random.rand(1, *input_shape).astype('float32') #dummy data
print(x_dummy.shape)
x_dummy_reshaped = x_dummy.reshape(-1, 21 * 1000)
x_dummy_reshaped.shape
x_scaled = scaler.fit_transform(x_dummy_reshaped).reshape(1, 21, 1000, 1)


print(x_dummy.shape)

vae, encoder, decoder= build_vae(input_shape, latent_dim)

vae.fit(x_scaled, epochs=1, batch_size=20)


z_mean, z_log_var, z = encoder.predict(x_dummy)
print(f'Shape of z_mean: {z_mean.shape}')
print(f'Shape of z_log_var: {z_log_var.shape}')
print(f'Shape of z: {z.shape}')

reconstructed_x = decoder.predict(z)
print(f'Shape of reconstructed_x: {reconstructed_x.shape}')

reconstruction_loss_value = mse(K.flatten(x_scaled), K.flatten(reconstructed_x)).numpy()
reconstruction_loss_value *= 21*1000
print(f'Reconstruction loss for each sample: {reconstruction_loss_value}')
# print(f'Reconstruction loss for each sample: {reconstructed_loss_value}')

# vae_loss_value=K.eval(vae_loss)
# print(f'Reconstruction loss for each sample: {vae_loss_value}')

# kl_loss_value=K.eval(kl_loss)
# print(f'Reconstruction loss for each sample: {kl_loss_value}')


(1, 21, 1000, 1)
(1, 21, 1000, 1)
Shape of z_mean: (1, 2)
Shape of z_log_var: (1, 2)
Shape of z: (1, 2)
Shape of reconstructed_x: (1, 21, 1000, 1)
Reconstruction loss for each sample: 0.003322522502458014


In [24]:
## stacking all epoched data and labeled array
epoch_array=np.vstack(all_epoch_data)

label_edf_arr=np.hstack(labeled_edf_data)
print(epoch_array.shape,label_edf_arr.shape)
epoch_array = epoch_array[:, :, :, np.newaxis]
epoch_array.shape

(8604, 21, 1000) (8604,)


(8604, 21, 1000, 1)

In [25]:
##spillting the data 
x_train, x_test, y_train, y_test = train_test_split(epoch_array,label_edf_arr, test_size=0.35, random_state=42)

#defining the input_shape and latent dimension
input_shape = (21,1000,1)
latent_dim = 2
batch_size = 64

# Build and compile the VAE model
input_shape
# Train the VAE model
# vae.fit(x_train,x_train, epochs=2, batch_size=batch_size, validation_data=(x_test, x_test))

(21, 1000, 1)

In [27]:
from tensorflow.keras.callbacks import EarlyStopping
## callback function for Early stopping
vae, encoder,decoder = build_vae(input_shape, latent_dim)
callbacks = EarlyStopping(monitor = 'val_loss',
                          mode='min',
                          patience =50,
                          verbose = 1,
                          restore_best_weights = True)


In [28]:
#fitting the model 
vae.fit(x_train, epochs=100, batch_size=batch_size, validation_data=(x_test, None),callbacks=callbacks)

In [29]:
##taking the data augmented 
generated_data=vae.predict(x_test)

In [None]:
#shape of genertaed data
generated_data.shape,y_test.shape

In [None]:
from tensorflow.keras.utils import to_categorical
## making the augmented data ready for classification
assert len(np.unique(y_test)) == 2, "Labels should be binary for binary classification."
X_train_vae, X_test_vae, y_train_vae, y_test_vae = train_test_split(generated_data, y_test, test_size=0.2, random_state=42)

#converting to categorical
y_train_vae = to_categorical(y_train_vae)
y_test_vae = to_categorical(y_test_vae)

In [None]:
## Classifcation Model Architecture for augmented data
import tensorflow as tf
from tensorflow.keras.layers import Input, Conv1D, BatchNormalization, ReLU, AveragePooling1D, Dropout, Dense, Flatten, Lambda
from tensorflow.keras.models import Model
from tensorflow.keras import backend as K
inputs = Input(shape=input_shape)
x = Dense(64, activation='relu')(inputs)
x = BatchNormalization()(x)
x = ReLU()(x)
x = Dropout(0.25)(x)
x = Dense(32, activation='relu')(x)
x = BatchNormalization()(x)
x = ReLU()(x)
x = Dropout(0.25)(x)
x = Dense(16, activation='relu')(x)
x = BatchNormalization()(x)
x = ReLU()(x)
x = Dropout(0.25)(x)
x = Flatten()(x)
outputs = Dense(1, activation='sigmoid')(x)  # 


In [None]:
#calling the model and compiling the model
cnn_model = Model(inputs, outputs)
cnn_model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])


In [None]:
#fitting the model
cnn_model.fit(X_train_vae, y_train_vae, epochs=100, batch_size=10, validation_data=(X_test_vae, y_test_vae))

In [None]:
# prediction of labels
y_pred = cnn_model.predict(X_test_vae)
y_pred = (y_pred > 0.5).astype(int)

In [None]:
from sklearn.metrics import accuracy_score, classification_report
## printing the accuracy and classifictaion report of the model
print(f"Test Accuracy: {accuracy_score(y_test, y_pred)}")
print(classification_report(y_test, y_pred))

In [None]:
#caaling the eegnet Model 
import keras
from sklearn.model_selection import train_test_split
keras.backend.set_image_data_format('channels_first')

from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, Activation, Permute, Dropout
from tensorflow.keras.layers import Conv2D, MaxPooling2D, AveragePooling2D
from tensorflow.keras.layers import SeparableConv2D, DepthwiseConv2D
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.layers import SpatialDropout2D
from tensorflow.keras.regularizers import l1_l2
from tensorflow.keras.layers import Input, Flatten
from tensorflow.keras.constraints import max_norm
from tensorflow.keras import backend as K
def EEGNet(nb_classes, Channels , Samples , dropout_rate = 0.5, kernelLength = 64, F1 = 8, D = 2, F2 = 16):
    
    """     
      Ref: Lawhern, Vernon & Solon, Amelia & Waytowich, Nicholas & Gordon, Stephen & Hung, Chou & Lance, Brent. (2016). 
      EEGNet: A Compact Convolutional Network for EEG-based Brain-Computer Interfaces. 
      Journal of Neural Engineering. 15. 10.1088/1741-2552/aace8c. 

      nb_classes         : number of classes to classify
      Channels, Samples  : number of channels and time steps 
      dropout_rate       : dropout rate
      kernelLength       : length of temporal convolution in first layer.    
      F1, F2             : number of temporal filters (F1) and number of pointwise filters (F2).  
      D                  : number of spatial filters
    """
    
    
    inputs   = Input(shape = (Channels, Samples, 1))

    block1       = Conv2D(F1, (1, kernelLength), padding = 'same', input_shape = (Channels, Samples, 1))(inputs)
    block1       = BatchNormalization()(block1)
    block1       = DepthwiseConv2D((Channels, 1), depth_multiplier = D, depthwise_constraint = max_norm(1.))(block1)
    block1       = BatchNormalization()(block1)
    block1       = Activation('elu')(block1)
    block1       = AveragePooling2D((1, 4))(block1)
    block1       = Dropout(dropout_rate)(block1)
    
    block2       = SeparableConv2D(F2, (1, 16), padding = 'same')(block1)
    block2       = BatchNormalization()(block2)
    block2       = Activation('elu')(block2)
    block2       = AveragePooling2D((1, 8))(block2)
    block2       = Dropout(dropout_rate)(block2)
        
    flatten      = Flatten()(block2)
    
    dense        = Dense(nb_classes)(flatten)
    softmax      = Activation('softmax')(dense)
    
    model = Model(inputs=inputs, outputs=softmax)
    model.summary()
    
    return model

In [None]:
#defing the parameters and compiling it 
eegnet_vae_model = EEGNet(2, Channels = 21, Samples = 1000, dropout_rate = 0.2, kernelLength = 64, F1 = 8,  D = 2, F2 = 16)
eegnet_vae_model.compile(optimizer = 'adam', loss = 'categorical_crossentropy', metrics= ['accuracy'])

In [None]:
#fitting the model
eegnet_vae_model.fit(X_train_vae, y_train_vae, epochs=50, batch_size=10, validation_data=(X_test_vae, y_test_vae))


In [None]:
#prediction using eegnet 
y_pred_eeg = cnn_model.predict(X_test_vae)
y_pred_eeg = (y_pred_eeg > 0.5).astype(int)

In [None]:
from sklearn.metrics import accuracy_score, classification_report
## printing the accuracy and classifictaion report of the model
print(f"Test Accuracy: {accuracy_score(y_test, y_pred)}")
print(classification_report(y_test, y_pred))