# Train XGBoost classifier using the eef2vec embeddings

In [11]:
from eeg2vec.models.eeg2vec import EEG2Vec

import numpy as np
import torch
from sklearn.metrics import f1_score, accuracy_score
from sklearn.model_selection import train_test_split
import xgboost as xgb
from sklearn.multioutput import MultiOutputClassifier

from pathlib import Path
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from scipy.signal import butter, lfilter
import pandas as pd
import pickle

In [2]:
ROOT_PATH = Path("train/")
training_data = [(np.load(ROOT_PATH / f"data_{i}.npy"),np.load(ROOT_PATH / f"target_{i}.npy")) for i in range(4)]

In [3]:
def butter_bandpass(lowcut, highcut, fs, order=5):
    return butter(order, [lowcut, highcut], fs=fs, btype='band')

def butter_bandpass_filter(data, lowcut, highcut, fs, order=5):
    b, a = butter_bandpass(lowcut, highcut, fs, order=order)
    y = lfilter(b, a, data)
    return y

In [6]:
# Apply bandpass filter to the data
training_data_filtered = [(butter_bandpass_filter(data, 0.1, 18, 100), target) for data, target in training_data]

In [7]:
# First we need to get the point that maps to a label

def reshape_array_into_windows(x, sample_rate, window_duration_in_seconds):
    """
    Reshape the data into an array of shape (C, T, window) where 'window' contains
    the points corresponding to 'window_duration' seconds of data.

    Parameters:
    x (numpy array): The input data array.
    sample_rate (int): The number of samples per second.
    window_duration_in_seconds (float): The duration of each window in seconds.

    Returns:
    reshaped_x (numpy array): The reshaped array with shape (C, T, window).
    """
    # Calculate the number of samples in one window
    window_size = int(window_duration_in_seconds * sample_rate)
    
    # Ensure the total length of x is a multiple of window_size
    total_samples = x.shape[-1]
    if total_samples % window_size != 0:
        # Truncate or pad x to make it divisible by window_size
        x = x[..., :total_samples - (total_samples % window_size)]
    # Reshape x into (C, T, window)
    reshaped_x = x.reshape(x.shape[0], -1, window_size)

    return reshaped_x

In [8]:
# We first load and reshape all the data
all_data = []
all_targets = []
# We need to have
# data of Shape: [num_samples, num_channels (5), sequence_length]
# labels of Shape: [num_samples, 5]

for data, target in training_data:
    reshaped_data = reshape_array_into_windows(data, 250, 2)
    reshaped_data = reshaped_data.transpose(1, 0, 2)
    target = target.reshape(-1, 5)
    all_data.append(reshaped_data)
    all_targets.append(target)

all_data = np.concatenate(all_data, axis=0)
all_targets = np.concatenate(all_targets, axis=0)

In [13]:
print(all_data.shape)

(52351, 5, 500)


In [44]:
data, labels = all_data[10000:20000], all_targets[10000:20000]

In [45]:
X_train, X_test, y_train, y_test = train_test_split(data, labels, test_size=0.2, random_state=42)

## Compute embeddings

In [46]:
## Load eeg2vecmodel
eeg2vec_model = EEG2Vec(8,2,5,2)
model_path = "eeg2vec/data/saved_models/eeg2vec_8_2_5_2_11dec_10000points.pth"
eeg2vec_model.load_state_dict(torch.load(model_path))


  eeg2vec_model.load_state_dict(torch.load(model_path))


<All keys matched successfully>

In [47]:
# Cuda if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [48]:
# Compute embeddings for xgboost training data
# Max size of the input is 10000 samples for cuda memory
for i in range(0, X_train.shape[1], 10000):
    with torch.no_grad():
        eeg2vec_model.to(device)
        eeg2vec_model.eval()
        training_embeddings = eeg2vec_model(torch.tensor(X_train[i:i+min(10000, len(X_train))], dtype=torch.float32).to(device))
        training_embeddings = training_embeddings.cpu().numpy()
        if i == 0:
            training_embeddings_all = training_embeddings
        else:
            training_embeddings_all = np.concatenate((training_embeddings_all, training_embeddings), axis=0)

training_embeddings = training_embeddings_all

In [49]:
# Compute embeddings for xgboost test data
# Max size of the input is 10000 samples for cuda memory
for i in range(0, X_test.shape[1], 10000):
    with torch.no_grad():
        eeg2vec_model.to(device)
        eeg2vec_model.eval()
        test_embeddings = eeg2vec_model(torch.tensor(X_test[i:i+10000], dtype=torch.float32).to(device))
        test_embeddings = test_embeddings.cpu().numpy()
        if i == 0:
            test_embeddings_all = test_embeddings
        else:
            test_embeddings_all = np.concatenate((test_embeddings_all, test_embeddings), axis=0)

test_embeddings = test_embeddings_all

In [50]:
training_embeddings = training_embeddings.reshape(training_embeddings.shape[0], -1)
test_embeddings = test_embeddings.reshape(test_embeddings.shape[0], -1)

## Train XGBOOST model

In [51]:
params = {
    'objective': 'binary:logistic',  # For binary classification; use 'multi:softmax' for multi-class
    'eval_metric': 'logloss',        # Evaluation metric (logarithmic loss)
    'learning_rate': 0.1,            # Step size shrinkage
    'max_depth': 4,                  # Maximum tree depth
    'subsample': 0.7,                # Percentage of samples to use per tree
    'colsample_bytree': 0.7,         # Percentage of features to use per tree
    'lambda': 1,                     # L2 regularization term
    'alpha': 0                       # L1 regularization term
}

xgb_model = MultiOutputClassifier(xgb.XGBClassifier(**params))

In [52]:
xgb_model.fit(training_embeddings, y_train)

## Evaluate classification

In [53]:
predictions = xgb_model.predict(test_embeddings)


accuracy = accuracy_score(y_test, predictions)
print(f'Accuracy: {accuracy:.2f}')
# F1 score
f1 = f1_score(y_test, predictions, average='weighted')
print(f'F1 Score: {f1:.2f}')

Accuracy: 0.57
F1 Score: 0.85
