In [1]:
!pip install pyedflib

Collecting pyedflib
  Downloading pyEDFlib-0.1.37-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.7/2.7 MB[0m [31m10.9 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: pyedflib
Successfully installed pyedflib-0.1.37


## EEG Data Preprocessing and EEGNet Model Training
### Introduction

This documentation provides a step-by-step guide on how to preprocess EEG data from EDF files, segment the data, and train an EEGNet model using TensorFlow/Keras. The dataset used is from the PhysioNet EEG recordings of subjects before and during mental arithmetic tasks.



In [2]:
import os
import pyedflib
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from scipy.signal import resample
import matplotlib.pyplot as plt


In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## Step 1: Loading EDF Files and Extracting Signals
### Objective: Load EEG data from EDF files and extract the signals into numpy arrays.

Explanation: EDF (European Data Format) files are commonly used for storing EEG data. The MNE library in Python is a powerful tool for loading and manipulating EEG data from EDF files. In this step, we read the EDF files from the specified folder, extract the EEG signals, and store them in a list.

In [4]:
# Step 1: Load and Extract Signals from EDF Files
def load_edf_files(folder_path):
    edf_files = [os.path.join(folder_path, f) for f in os.listdir(folder_path) if f.endswith('.edf')]
    all_signals = []
    max_samples = 0

    # Determine the maximum number of samples across all files
    for edf_file in edf_files:
        edf_reader = pyedflib.EdfReader(edf_file)
        max_samples = max(max_samples, edf_reader.getNSamples()[0])
        edf_reader.close()

    for edf_file in edf_files:
        edf_reader = pyedflib.EdfReader(edf_file)
        num_channels = edf_reader.signals_in_file
        signals = np.zeros((num_channels, max_samples))

        for i in range(num_channels):
            signal = edf_reader.readSignal(i)
            if len(signal) < max_samples:
                # Pad with zeros if the signal is shorter than the max_samples
                signal = np.pad(signal, (0, max_samples - len(signal)), 'constant')
            signals[i, :] = signal

        all_signals.append(signals)
        edf_reader.close()

    return np.array(all_signals), edf_files

folder_path = "/content/drive/MyDrive/eeg-during-mental-arithmetic-tasks-1.0.0/"
X_raw, edf_files = load_edf_files(folder_path)
print(f"X_raw shape: {X_raw.shape}")

X_raw shape: (72, 21, 94000)


In [5]:
import pandas as pd

# Load subject information
subject_info = pd.read_csv("/content/subject-info.csv")

# Extract labels (assuming 'Count quality' is the column for labels)
labels_dict = {str(row['Subject']): row['Count quality'] for _, row in subject_info.iterrows()}

# Extract labels from file names
def get_labels(edf_files, labels_dict):
    labels = []
    for file in edf_files:
        subject_id = os.path.basename(file).split('_')[0]
        if subject_id in labels_dict:
            labels.append(labels_dict[subject_id])
        else:
            labels.append(None)  # Handle missing labels if necessary
    return np.array(labels)

y_raw = get_labels(edf_files, labels_dict)
print(f"y_raw shape: {y_raw.shape}")
print(f"y_raw: {y_raw}")


y_raw shape: (72,)
y_raw: [1 1 1 1 1 0 1 0 1 0 1 0 0 1 1 1 1 0 0 1 1 1 1 1 1 1 0 0 0 1 1 1 1 1 0 1 0
 1 1 1 1 1 0 1 1 0 1 1 1 1 1 1 1 1 1 0 1 0 1 1 1 1 1 0 1 0 1 1 1 0 1 0]


In [6]:
from sklearn.preprocessing import StandardScaler

def preprocess_signals(signals):
    # Transpose to have shape (samples, channels, time_points)
    signals = np.transpose(signals, (0, 2, 1))

    # Reshape for CNN input (samples, channels, time_points, 1)
    samples, time_points, channels = signals.shape
    signals = signals.reshape(samples, time_points, channels, 1)

    # Normalize
    scaler = StandardScaler()
    signals_scaled = scaler.fit_transform(signals.reshape(-1, channels)).reshape(signals.shape)

    return signals_scaled

X_scaled = preprocess_signals(X_raw)
print(f"X_scaled shape: {X_scaled.shape}")


X_scaled shape: (72, 94000, 21, 1)


## Step 2: Downsampling the Signals
### Objective: Reduce the sampling rate of the EEG signals to make the data more manageable for processing.

Explanation: EEG data often comes with a high sampling rate, such as 1000 Hz, which means there are 1000 data points per second. Downsampling reduces the number of data points, making it easier to handle computationally. Here, we downsample the signals from 1000 Hz to 250 Hz.

In [7]:
from scipy.signal import resample

def downsample_signals(signals, target_length):
    downsampled_signals = []
    for signal in signals:
        num_channels = signal.shape[0]
        downsampled_signal = np.zeros((num_channels, target_length))
        for i in range(num_channels):
            downsampled_signal[i, :] = resample(signal[i, :], target_length)
        downsampled_signals.append(downsampled_signal)
    return np.array(downsampled_signals)

# Assuming we want to downsample to 250 Hz from 1000 Hz and the original length is 94000 (for 60 seconds at 1000 Hz)
original_length = 94000
target_length = int(original_length * 250 / 1000)

X_downsampled = downsample_signals(X_raw, target_length)
print(f"X_downsampled shape: {X_downsampled.shape}")


X_downsampled shape: (72, 21, 23500)


## Step 3: Segmenting the Signals
### Objective: Split the continuous EEG signals into shorter, fixed-length segments.

Explanation: Segmenting the EEG data into shorter chunks (e.g., 1-second segments) can improve the performance of machine learning models by providing more training samples. Overlapping segments can also be used to increase the dataset size further. In this step, we split the signals into 1-second segments with a 50% overlap.

In [8]:
def segment_signals(signals, segment_length, overlap):
    segments = []
    for signal in signals:
        num_channels, num_time_points = signal.shape
        for start in range(0, num_time_points - segment_length + 1, segment_length // overlap):
            segments.append(signal[:, start:start + segment_length])
    return np.array(segments)

segment_length = 250  # 1 second segments
overlap = 2  # 50% overlap

X_segmented = segment_signals(X_downsampled, segment_length, overlap)
print(f"X_segmented shape: {X_segmented.shape}")


X_segmented shape: (13464, 21, 250)


## Step 4: Preprocessing the Signals
### Objective: Normalize and reshape the segmented signals to prepare them for input into the neural network.

Explanation: Normalization ensures that the features have similar scales, which helps in faster convergence during training. The segmented signals are reshaped to match the input shape expected by the EEGNet model, which is (samples, time points, channels, 1).

In [9]:
def preprocess_signals(signals):
    samples, channels, time_points = signals.shape
    signals = signals.transpose((0, 2, 1)).reshape(samples, time_points, channels, 1)

    scaler = StandardScaler()
    signals_scaled = scaler.fit_transform(signals.reshape(-1, channels)).reshape(signals.shape)

    return signals_scaled

X_preprocessed = preprocess_signals(X_segmented)
print(f"X_preprocessed shape: {X_preprocessed.shape}")

X_preprocessed shape: (13464, 250, 21, 1)


## Step 5: Creating Labels
### Objective: Ensure that each segmented signal has a corresponding label.

Explanation: Since we have segmented the data, we need to repeat the labels for each segment. This step ensures that each segment has a label corresponding to its original signal.



In [11]:
# Repeat labels to match the number of segments per original sample
num_segments_per_sample = X_segmented.shape[0] // len(X_downsampled)
y_repeated = np.repeat(y_raw, num_segments_per_sample)
print(f"y_repeated shape: {y_repeated.shape}")



y_repeated shape: (13464,)


## Step 6: Splitting the Dataset
### Objective: Split the dataset into training and testing sets.

Explanation: To evaluate the performance of the model, we need to separate the data into training and testing sets. The training set is used to train the model, while the testing set is used to evaluate its performance on unseen data.

In [12]:
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(X_preprocessed, y_repeated, test_size=0.2, random_state=42)
print(f"X_train shape: {X_train.shape}")
print(f"X_test shape: {X_test.shape}")
print(f"y_train shape: {y_train.shape}")
print(f"y_test shape: {y_test.shape}")


X_train shape: (10771, 250, 21, 1)
X_test shape: (2693, 250, 21, 1)
y_train shape: (10771,)
y_test shape: (2693,)


In [13]:
def create_patches(X, patch_size):
    patches = []
    for signal in X:
        signal_patches = np.array([signal[i:i + patch_size] for i in range(0, signal.shape[0] - patch_size + 1, patch_size)])
        patches.append(signal_patches)
    return np.array(patches)

patch_size = 25  # Adjust this based on your data and ViT configuration
X_patches = create_patches(X_train, patch_size)
X_patches = X_patches.reshape(X_patches.shape[0], -1, patch_size * X_patches.shape[-1])
print(f"X_patches shape: {X_patches.shape}")

# Prepare the test data similarly
X_test_patches = create_patches(X_test, patch_size)
X_test_patches = X_test_patches.reshape(X_test_patches.shape[0], -1, patch_size * X_test_patches.shape[-1])
print(f"X_test_patches shape: {X_test_patches.shape}")


X_patches shape: (10771, 210, 25)
X_test_patches shape: (2693, 210, 25)


In [14]:
import tensorflow as tf
from tensorflow.keras.layers import Input, Dense, LayerNormalization, Dropout
from tensorflow.keras.models import Model

def ViT(patch_size, num_patches, num_classes, d_model, num_heads, num_layers, mlp_dim, dropout_rate):
    inputs = Input(shape=(num_patches, patch_size))
    x = Dense(d_model)(inputs)
    x = LayerNormalization(epsilon=1e-6)(x)

    for _ in range(num_layers):
        # Multi-head Self-Attention
        attn_output = tf.keras.layers.MultiHeadAttention(num_heads=num_heads, key_dim=d_model)(x, x)
        attn_output = Dropout(dropout_rate)(attn_output)
        x = tf.keras.layers.Add()([x, attn_output])
        x = LayerNormalization(epsilon=1e-6)(x)

        # Feed-Forward Network
        ffn = tf.keras.Sequential([
            Dense(mlp_dim, activation='relu'),
            Dense(d_model),
        ])
        ffn_output = ffn(x)
        ffn_output = Dropout(dropout_rate)(ffn_output)
        x = tf.keras.layers.Add()([x, ffn_output])
        x = LayerNormalization(epsilon=1e-6)(x)

    x = tf.keras.layers.GlobalAveragePooling1D()(x)
    outputs = Dense(num_classes, activation='softmax')(x)

    return Model(inputs, outputs)

# Parameters
num_classes = len(np.unique(y_train))
num_patches = X_patches.shape[1]
d_model = 64  # Embedding dimension
num_heads = 4  # Number of attention heads
num_layers = 2  # Number of transformer layers
mlp_dim = 64  # Dimension of the feed-forward layer
dropout_rate = 0.1  # Dropout rate

# Create the model
vit_model = ViT(patch_size, num_patches, num_classes, d_model, num_heads, num_layers, mlp_dim, dropout_rate)

# Compile the model
vit_model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

# Print the model summary
vit_model.summary()


Model: "model"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_1 (InputLayer)        [(None, 210, 25)]            0         []                            
                                                                                                  
 dense (Dense)               (None, 210, 64)              1664      ['input_1[0][0]']             
                                                                                                  
 layer_normalization (Layer  (None, 210, 64)              128       ['dense[0][0]']               
 Normalization)                                                                                   
                                                                                                  
 multi_head_attention (Mult  (None, 210, 64)              66368     ['layer_normalization[0][0

In [15]:
# Train the model
history = vit_model.fit(X_patches, y_train, epochs=100, batch_size=32, validation_data=(X_test_patches, y_test))

# Evaluate the model
score = vit_model.evaluate(X_test_patches, y_test, verbose=0)
print('Test loss:', score[0])
print('Test accuracy:', score[1])


Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 23/100
Epoch 24/100
Epoch 25/100
Epoch 26/100
Epoch 27/100
Epoch 28/100
Epoch 29/100
Epoch 30/100
Epoch 31/100
Epoch 32/100
Epoch 33/100
Epoch 34/100
Epoch 35/100
Epoch 36/100
Epoch 37/100
Epoch 38/100
Epoch 39/100
Epoch 40/100
Epoch 41/100
Epoch 42/100
Epoch 43/100
Epoch 44/100
Epoch 45/100
Epoch 46/100
Epoch 47/100
Epoch 48/100
Epoch 49/100
Epoch 50/100
Epoch 51/100
Epoch 52/100
Epoch 53/100
Epoch 54/100
Epoch 55/100
Epoch 56/100
Epoch 57/100
Epoch 58/100
Epoch 59/100
Epoch 60/100
Epoch 61/100
Epoch 62/100
Epoch 63/100
Epoch 64/100
Epoch 65/100
Epoch 66/100
Epoch 67/100
Epoch 68/100
Epoch 69/100
Epoch 70/100
Epoch 71/100
Epoch 72/100
Epoch 73/100
Epoch 74/100
Epoch 75/100
Epoch 76/100
Epoch 77/100
Epoch 78