# AIM: Extract features on labeled data using the pretrained EEGNet

In [1]:
import numpy as np
import pandas as pd
import mne
import lightning.pytorch as pl
import torch
import torch.nn.functional as F
import torch.nn as nn
from torchmetrics import F1Score, Accuracy
import random
import os
import matplotlib.pyplot as plt

%matplotlib inline

# prevent extensive logging
mne.set_log_level('WARNING')

In [93]:
# loading the pretrained weights
pretrained_dict = torch.load('pretext_model_weights.pt')
pretrained_dict.keys()

odict_keys(['EEGNet1.block1.0.weight', 'EEGNet1.block1.1.weight', 'EEGNet1.block1.1.bias', 'EEGNet1.block1.1.running_mean', 'EEGNet1.block1.1.running_var', 'EEGNet1.block1.1.num_batches_tracked', 'EEGNet1.block1.2.weight', 'EEGNet1.block1.3.weight', 'EEGNet1.block1.3.bias', 'EEGNet1.block1.3.running_mean', 'EEGNet1.block1.3.running_var', 'EEGNet1.block1.3.num_batches_tracked', 'EEGNet1.block2.0.weight', 'EEGNet1.block2.1.weight', 'EEGNet1.block2.2.weight', 'EEGNet1.block2.2.bias', 'EEGNet1.block2.2.running_mean', 'EEGNet1.block2.2.running_var', 'EEGNet1.block2.2.num_batches_tracked', 'EEGNet1.lin.weight', 'EEGNet2.block1.0.weight', 'EEGNet2.block1.1.weight', 'EEGNet2.block1.1.bias', 'EEGNet2.block1.1.running_mean', 'EEGNet2.block1.1.running_var', 'EEGNet2.block1.1.num_batches_tracked', 'EEGNet2.block1.2.weight', 'EEGNet2.block1.3.weight', 'EEGNet2.block1.3.bias', 'EEGNet2.block1.3.running_mean', 'EEGNet2.block1.3.running_var', 'EEGNet2.block1.3.num_batches_tracked', 'EEGNet2.block2.0.w

## EEGNet architecture

In [3]:
# create Conv2d with max norm constraint
class Conv2dWithConstraint(nn.Conv2d):
    def __init__(self, *args, max_norm: int = 1, **kwargs):
        self.max_norm = max_norm
        super(Conv2dWithConstraint, self).__init__(*args, **kwargs)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        self.weight.data = torch.renorm(self.weight.data, p=2, dim=0, maxnorm=self.max_norm)
        return super(Conv2dWithConstraint, self).forward(x)
    
class EEGNet(nn.Module):
    """
    Code taken and adjusted from pytorch implementation of EEGNet
    url: https://github.com/torcheeg/torcheeg/blob/v1.1.0/torcheeg/models/cnn/eegnet.py#L5
    """
    def __init__(self,
                 chunk_size: int = 1244, # number of data points in each EEG chunk
                 num_electrodes: int = 26, # number of EEG electrodes
                 F1: int = 8, # number of filters in first convolutional layer
                 F2: int = 16, # number of filters in second convolutional layer
                 D: int = 2, # depth multiplier
                 num_extracted_features: int = 100, # number of features to extract
                 kernel_1: int = 64, # the filter size of block 1 (half of sfreq (125 Hz))
                 kernel_2: int = 16, # the filter size of block 2 (one eight of sfreq (500 Hz))
                 dropout: float = 0.25): # dropout rate
        super(EEGNet, self).__init__()
        self.F1 = F1
        self.F2 = F2
        self.D = D
        self.chunk_size = chunk_size
        self.num_extracted_features = num_extracted_features
        self.num_electrodes = num_electrodes
        self.kernel_1 = kernel_1
        self.kernel_2 = kernel_2
        self.dropout = dropout

        self.block1 = nn.Sequential(
            nn.Conv2d(1, self.F1, (1, self.kernel_1), stride=1, padding=(0, self.kernel_1 // 2), bias=False),
            nn.BatchNorm2d(self.F1, momentum=0.01, affine=True, eps=1e-3),
            Conv2dWithConstraint(self.F1,
                                 self.F1 * self.D, (self.num_electrodes, 1),
                                 max_norm=1,
                                 stride=1,
                                 padding=(0, 0),
                                 groups=self.F1,
                                 bias=False), nn.BatchNorm2d(self.F1 * self.D, momentum=0.01, affine=True, eps=1e-3),
            nn.ELU(), nn.AvgPool2d((1, 4), stride=4), nn.Dropout(p=dropout))

        self.block2 = nn.Sequential(
            nn.Conv2d(self.F1 * self.D,
                      self.F1 * self.D, (1, self.kernel_2),
                      stride=1,
                      padding=(0, self.kernel_2 // 2),
                      bias=False,
                      groups=self.F1 * self.D),
            nn.Conv2d(self.F1 * self.D, self.F2, 1, padding=(0, 0), groups=1, bias=False, stride=1),
            nn.BatchNorm2d(self.F2, momentum=0.01, affine=True, eps=1e-3), nn.ELU(), nn.AvgPool2d((1, 8), stride=8),
            nn.Dropout(p=dropout))

        self.lin = nn.Linear(self.feature_dim(), num_extracted_features, bias=False)


    def feature_dim(self):
        # function to calculate the number of features after the convolutional blocks
        with torch.no_grad():
            mock_eeg = torch.zeros(1, 1, self.num_electrodes, self.chunk_size)

            mock_eeg = self.block1(mock_eeg)
            mock_eeg = self.block2(mock_eeg)

        return self.F2 * mock_eeg.shape[3]

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.block1(x)
        x = self.block2(x)
        x = x.flatten(start_dim=1)
        x = self.lin(x)
        return x

## Transfering pretrained weights

In [12]:
pretrained_EEGNet = EEGNet()
model_dict = pretrained_EEGNet.state_dict()

processed_dict = {}
for k in pretrained_dict.keys(): 
    decomposed_key = k.split(".")
    if("EEGNet1" in decomposed_key):
        pretrained_key = ".".join(decomposed_key[1:])
        processed_dict[pretrained_key] = pretrained_dict[k]

pretrained_EEGNet.load_state_dict(processed_dict, strict=True)

# Assert that the weights are the same
for key in processed_dict:
    assert torch.equal(pretrained_EEGNet.state_dict()[key], processed_dict[key]), f"Mismatch found in key: {key}"

print("All weights match successfully.")

All weights match successfully.


## Extract features

### Load participant data of labeled sample
These dataframes have been filtered and stored in a previous project. See https://github.com/TSmolders/Internship_EEG for original code

In [17]:

df_participants = pd.read_pickle(r'D:\Documents\RU\Master_Neurobiology\Internship_jaar_2\Project\TD-BRAIN\TDBRAIN_participants_V2_data\df_participants.pkl')
sample_df = pd.read_pickle(r'D:\Documents\RU\Master_Neurobiology\Internship_jaar_2\Project\TD-BRAIN\TD-BRAIN_extracted_features\df_selected_stat_features.pkl')
sample_ids = sample_df['ID'].unique() # obtain unique IDs from subsampled dataframe containing epoched features
df_sample = df_participants[df_participants['participants_ID'].isin(sample_ids)] # filter participants dataframe to only include subsampled IDs
df_sample = df_sample[df_sample['sessID'] == 1] # filter first session
print(df_sample.shape)
print(df_sample['diagnosis'].value_counts())



(225, 12)
diagnosis
ADHD       45
HEALTHY    45
MDD        45
OCD        45
SMC        45
Name: count, dtype: int64


In [19]:
df_sample.sample(5)

Unnamed: 0,participants_ID,DISC/REP,indication,formal_status,Dataset,age,gender,sessID,nrSessions,EC,EO,diagnosis
1065,sub-88055301,DISCOVERY,HEALTHY,HEALTHY,,40.02,1,1,1,True,True,HEALTHY
972,sub-88048193,DISCOVERY,OCD,OCD,OCD,53.25,0,1,2,True,True,OCD
557,sub-88017633,DISCOVERY,ADHD,UNKNOWN,,7.33,1,1,1,True,True,ADHD
1038,sub-88053273,DISCOVERY,ADHD,ADHD,ADHD_NF,9.45,1,1,1,True,True,ADHD
1298,sub-88075593,DISCOVERY,ADHD,UNKNOWN,,35.38,1,1,1,True,True,ADHD


### Extract features for these participants

In [59]:
# functions
def get_filepath(epoch_dir, participant_ids):
    """
    Function to get the filepath of the epoched EEG recording
    :param epoch_dir: directory containing the epoched EEG recordings
    :param ID: list of participant IDs to include
    """
    filepaths = []
    for subdir, dirs, files in os.walk(epoch_dir):
        for file in files:
            if any(participant_id in file for participant_id in participant_ids):
                filepaths.append(os.path.join(subdir, file))
    return filepaths

class EpochDataset(torch.utils.data.Dataset):
    def __init__(self, participant_ids, epoch_dir):
        self.filepaths = get_filepath(epoch_dir, participant_ids)
        self.participant_ids = participant_ids
        self.epochs = []
        self.participant_ids = []
        self._load_data()
        print(f"Number of epochs: {self.epochs.shape[0]}")
        print(f"Number of participants: {len(self.participant_ids)}")

    def _load_data(self):
        all_epochs = []
        for filepath in self.filepaths:
            epochs = torch.load(filepath)
            # get participant ID from filepath to make sure the participant ID is correct
            participant_id = filepath.split("\\")[-1].split(".")[0]
            all_epochs.append(epochs)
            self.participant_ids.extend([participant_id]*epochs.shape[0])
        self.epochs = np.concatenate(all_epochs, axis=0)

    def __len__(self):
        return self.epochs.shape[0]
    
    def __getitem__(self, idx):
        epoch = self.epochs[idx]
        participant_id = self.participant_ids[idx]
        return torch.tensor(epoch, dtype=torch.float32), participant_id

In [60]:
participant_ids = df_sample['participants_ID'].tolist()
dataset = EpochDataset(participant_ids, r"D:\Documents\RU\Master_Neurobiology\Internship_jaar_2\Project\TD-BRAIN\TDBRAIN-dataset-derivatives\thesis_epoched_data\EC")
print(len(dataset))
print(dataset[0][0].shape)
print(dataset[0][1])
print(dataset[1][1])

Number of epochs: 2688
Number of participants: 2688
2688
torch.Size([26, 1244])
sub-87964717
sub-87964717


In [96]:
# created with help of GitHub Copilot
dataloader = torch.utils.data.DataLoader(dataset, batch_size=1, shuffle=False)
pretrained_EEGNet.eval() ## TODO: Should I set the model to evaluation mode?

features_list = []
participant_ids = []
with torch.no_grad():  # Disable gradient calculation
    for batch in dataloader:
        epoch, participant_id = batch  # Remove the batch dimension
        epoch = epoch.unsqueeze(0)  # Add dimension
        # print(epoch.shape)
        features = pretrained_EEGNet(epoch)  # Extract features
        features = features.squeeze(0)
        features = features.numpy()
        features_list.append(features)
        participant_ids.append(participant_id[0])

print(len(features_list))
print(features_list[0].shape)

2688
(100,)


In [97]:
# store the features to disk
features_df = pd.DataFrame(features_list)

# Add participant IDs to the DataFrame
features_df['ID'] = participant_ids

# Map the diagnosis values from df_sample to the DataFrame based on participant IDs
features_df['diagnosis'] = features_df['ID'].map(df_sample.set_index('participants_ID')['diagnosis'])

features_df.to_pickle(r'D:\Documents\Master_Data_Science\Thesis\thesis_code\DataScience_Thesis\data\df_ssl_features.pkl')

In [98]:
features_df.sample(5)

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,92,93,94,95,96,97,98,99,ID,diagnosis
1397,3.664361,-3.131516,1.69551,0.155439,-0.828389,-1.076838,-0.74505,1.110973,0.017292,-0.978348,...,-3.024903,0.565701,2.176804,1.036991,1.820049,1.932001,-0.626927,-2.327834,sub-88041305,ADHD
2102,2.625418,-1.087079,0.747795,0.62114,-0.333993,0.171692,-0.697683,0.583098,0.197683,-0.453607,...,-0.38332,0.201196,2.172565,-0.747482,0.811691,0.752884,-0.484254,-1.29053,sub-88059977,OCD
1932,2.466621,2.535407,2.016782,1.503323,1.683569,-0.807519,0.955875,0.192149,3.316376,-0.926541,...,1.24719,-1.724805,-3.091214,-1.538718,-2.412516,-0.33415,1.11913,-0.960216,sub-88056021,ADHD
2625,-0.521252,0.191501,0.153712,-0.137607,0.068807,-0.106556,0.205999,-0.331544,0.819242,-0.566306,...,0.219668,-1.222554,-0.980601,0.792679,-1.333334,0.213945,0.695682,1.165446,sub-88075053,HEALTHY
2604,-0.345381,1.803453,-0.258202,1.175605,-2.197935,1.675406,0.006974,2.722747,0.556865,-1.083907,...,4.931505,-2.216218,-0.089509,-2.244158,-0.412924,-0.674755,-1.632735,0.326206,sub-88074917,OCD


In [99]:
# quick svm model
from sklearn.model_selection import train_test_split
from sklearn.svm import SVC
from sklearn.metrics import classification_report, confusion_matrix, f1_score

X = features_df.drop(['ID', 'diagnosis'], axis=1)
y = features_df['diagnosis']

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

clf = SVC()
clf.fit(X_train, y_train)
y_pred = clf.predict(X_test)
print(confusion_matrix(y_test, y_pred))
print(classification_report(y_test, y_pred))
print(f1_score(y_test, y_pred, average='macro'))


[[34 20 15 19 18]
 [20 14 15 22 37]
 [ 8 18 23 24 35]
 [14 18 18 23 35]
 [ 8 20 14 16 50]]
              precision    recall  f1-score   support

        ADHD       0.40      0.32      0.36       106
     HEALTHY       0.16      0.13      0.14       108
         MDD       0.27      0.21      0.24       108
         OCD       0.22      0.21      0.22       108
         SMC       0.29      0.46      0.35       108

    accuracy                           0.27       538
   macro avg       0.27      0.27      0.26       538
weighted avg       0.27      0.27      0.26       538

0.2615977739405999


In [94]:
# extracting features with randomly initialized EEGNet
random_EEGNet = EEGNet()
random_features_list = []
random_participant_ids = []
# created with help of GitHub Copilot
dataloader = torch.utils.data.DataLoader(dataset, batch_size=1, shuffle=False)
random_EEGNet.eval() ## TODO: Should I set the model to evaluation mode?


with torch.no_grad():  # Disable gradient calculation
    for batch in dataloader:
        epoch, participant_id = batch  # Remove the batch dimension
        epoch = epoch.unsqueeze(0)  # Add dimension
        # print(epoch.shape)
        features = random_EEGNet(epoch)  # Extract features
        features = features.squeeze(0)
        features = features.numpy()
        random_features_list.append(features)
        random_participant_ids.append(participant_id[0])

print(len(random_features_list))
print(random_features_list[0].shape)

# store the features to disk
random_features_df = pd.DataFrame(random_features_list)
# Add participant IDs to the DataFrame
random_features_df['ID'] = random_participant_ids
# Map the diagnosis values from df_sample to the DataFrame based on participant IDs
random_features_df['diagnosis'] = random_features_df['ID'].map(df_sample.set_index('participants_ID')['diagnosis'])

2688
(100,)


In [95]:
X = random_features_df.drop(['ID', 'diagnosis'], axis=1)
y = random_features_df['diagnosis']

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

clf = SVC()
clf.fit(X_train, y_train)
y_pred = clf.predict(X_test)
print(confusion_matrix(y_test, y_pred))
print(classification_report(y_test, y_pred))
print(f1_score(y_test, y_pred, average='macro'))

[[44 10  8 18 26]
 [ 3 24 14 36 31]
 [ 8 13 16 28 43]
 [ 3 14 15 33 43]
 [ 0  8 16 22 62]]
              precision    recall  f1-score   support

        ADHD       0.76      0.42      0.54       106
     HEALTHY       0.35      0.22      0.27       108
         MDD       0.23      0.15      0.18       108
         OCD       0.24      0.31      0.27       108
         SMC       0.30      0.57      0.40       108

    accuracy                           0.33       538
   macro avg       0.38      0.33      0.33       538
weighted avg       0.37      0.33      0.33       538

0.3308233312541893
