In [1]:
import os
import pandas as pd
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
import mne
from mne.preprocessing import ICA
from tqdm import tqdm
import numpy as np

import matplotlib.pyplot as plt
%matplotlib inline

In [2]:
class EEG_Dataset(Dataset):
    def __init__(
        self,
        index_csv: str,
        base_path: str,
        ch_names: list,
        sfreq: float = 250.0,
        transform: callable = None,  # type: ignore
    ):
        self.base_path = base_path
        self.ch_names = ch_names
        self.sfreq = sfreq
        self.transform = transform

        # Load the index CSV
        self.df = pd.read_csv(os.path.join(base_path, index_csv))
        # the fist half of the dataset
        # If 'label' not in test.csv, self.df['label'] will KeyError; handle below.

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        # Determine split
        id_num = row["id"]
        if id_num <= 4800:
            split = "train"
        elif id_num <= 4900:
            split = "validation"
        else:
            split = "test"

        # Path to session CSV
        eeg_path = os.path.join(
            self.base_path,
            row["task"],
            split,
            f"{row['subject_id']}",
            str(row["trial_session"]),
            "EEGdata.csv",
        )

        # Load full session
        sess_df = pd.read_csv(eeg_path)

        # Determine samples per trial
        if row["task"] == "MI":
            samp = 9 * self.sfreq  # 2250
        else:  # SSVEP
            samp = 7 * self.sfreq  # 1750

        n = int(samp)
        start = (int(row["trial"]) - 1) * n
        end = start + n

        # Slice out EEG channels only
        data = sess_df[self.ch_names].iloc[start:end].to_numpy().T  # shape (8, n)

        # Optional MNE preprocessing
        info = mne.create_info(
            ch_names=self.ch_names,
            sfreq=self.sfreq,
            ch_types=["eeg"] * len(self.ch_names),  # type: ignore
        )
        if self.transform is not None:
            with mne.utils.use_log_level("WARNING"):
                info = mne.create_info(
                    ch_names=self.ch_names,
                    sfreq=self.sfreq,
                    ch_types=["eeg"] * len(self.ch_names),
                )
                raw = mne.io.RawArray(data, info)
                raw = self.transform(raw)
                data = raw.get_data()

        # To tensor
        eeg_tensor = torch.from_numpy(data).float()

        # Return with/without label
        if "label" in self.df.columns:
            label = row["label"]
            return eeg_tensor, label
        else:
            return eeg_tensor


class MI_Dataset(EEG_Dataset):
    def __init__(
        self,
        base_path: str,
        index_csv: str,
        ch_names: list,
        sfreq: float = 250.0,
        transform: callable = None,  # type: ignore
    ):
        super().__init__(
            base_path=base_path,
            index_csv=index_csv,
            ch_names=ch_names,
            sfreq=sfreq,
            transform=transform,
        )

        self.df = self.df[self.df["task"] == "MI"].reset_index(drop=True)


class SSVEP_Dataset(EEG_Dataset):
    def __init__(
        self,
        base_path: str,
        index_csv: str,
        ch_names: list,
        sfreq: float = 250.0,
        transform: callable = None,  # type: ignore
    ):
        super().__init__(
            base_path=base_path,
            index_csv=index_csv,
            ch_names=ch_names,
            sfreq=sfreq,
            transform=transform,
        )

        self.df = self.df[self.df["task"] == "SSVEP"].reset_index(drop=True)

In [4]:
# ch_names = ["FZ", "C3", "CZ", "C4", "PZ", "PO7", "OZ", "PO8"]
from sklearn.calibration import LabelEncoder

CH_NAMES = ["C3", "CZ", "C4"]
SFREQ = 250.0

# --- Load Datasets ---
# For simplicity in this example, we won't apply any MNE transforms at the dataset level yet.
train_dataset = MI_Dataset(
    base_path="",
    index_csv="train.csv",
    ch_names=CH_NAMES,
    sfreq=SFREQ,
    transform=lambda raw: raw.notch_filter(
        freqs=50.0, picks="eeg", fir_design="firwin"
    ),
)

validation_dataset = MI_Dataset(
    base_path="",
    index_csv="validation.csv",
    ch_names=CH_NAMES,
    sfreq=SFREQ,
    transform=lambda raw: raw.notch_filter(
        freqs=50.0, picks="eeg", fir_design="firwin"
    ),
)

# --- Create DataLoaders ---
# We can use a batch size of 1 for this kind of feature extraction
train_loader = DataLoader(train_dataset, batch_size=1, shuffle=True)
validation_loader = DataLoader(validation_dataset, batch_size=1, shuffle=False)

print(f"Number of training samples: {len(train_dataset)}")
print(f"Number of validation samples: {len(validation_dataset)}")

# --- Inspect a sample ---
eeg_tensor, label = next(iter(train_loader))
print(f"\nSample EEG tensor shape: {eeg_tensor.shape}")
print(f"Sample label: {label[0]}")

Number of training samples: 2400
Number of validation samples: 50

Sample EEG tensor shape: torch.Size([1, 3, 2250])
Sample label: Right


In [14]:
X_tr = []
y_tr = []
with mne.utils.use_log_level("WARNING"):
    for i in tqdm(range(len(train_dataset))):
        eeg_tensor, label = train_dataset[i]
        raw = mne.io.RawArray(
            eeg_tensor.numpy(),
            mne.create_info(
                ch_names=CH_NAMES, sfreq=SFREQ, ch_types=["eeg"] * len(CH_NAMES)
            ),
        )
        # Compute the PSD for the current EEG trial
        psds, freqs = raw.compute_psd(
            method="welch",
            fmin=8.0,
            fmax=30.0,  # Ensure this covers your highest harmonic
            n_fft=1024,  # Use a larger n_fft for better frequency resolution
        ).get_data(return_freqs=True)
        # Define frequencies of interest (adjust based on your specific needs)
        freqs_of_interest = np.arange(8, 31, 1)  # Example: 8 Hz to 30 Hz in 2 Hz steps
        feature_vector = []
        for freq in freqs_of_interest:
            # Find the index of the closest frequency in the PSD
            idx = np.argmin(np.abs(freqs - freq))
            # Extract the power spectral density value for that frequency
            psd_value = psds[:, idx]
            feature_vector.extend(psd_value)
                # Append the feature vector and label
        X_tr.append(feature_vector)
        y_tr.append(0 if label == "Left" else 1)
        
X_tr = np.array(X_tr)
y_tr = np.array(y_tr)

X_tr.shape, y_tr.shape

100%|██████████| 2400/2400 [03:36<00:00, 11.09it/s]


((2400, 69), (2400,))

In [15]:
# dev
X_dev = []
y_dev = []

with mne.utils.use_log_level("WARNING"):
    for i in tqdm(range(len(validation_dataset))):
        eeg_tensor, label = validation_dataset[i]
        raw = mne.io.RawArray(
            eeg_tensor.numpy(),
            mne.create_info(
                ch_names=CH_NAMES, sfreq=SFREQ, ch_types=["eeg"] * len(CH_NAMES)
            ),
        )
        # Compute the PSD for the current EEG trial
        psds, freqs = raw.compute_psd(
            method="welch",
            fmin=8.0,
            fmax=30.0,  # Ensure this covers your highest harmonic
            n_fft=1024,  # Use a larger n_fft for better frequency resolution
        ).get_data(return_freqs=True)
        # Define frequencies of interest (adjust based on your specific needs)
        freqs_of_interest = np.arange(8, 31, 1)  # Example: 8 Hz to 30 Hz in 2 Hz steps
        feature_vector = []
        for freq in freqs_of_interest:
            # Find the index of the closest frequency in the PSD
            idx = np.argmin(np.abs(freqs - freq))
            # Extract the power spectral density value for that frequency
            psd_value = psds[:, idx]
            feature_vector.extend(psd_value)
            # Append the feature vector and label
        X_dev.append(feature_vector)
        y_dev.append(0 if label == "Left" else 1)

X_dev = np.array(X_dev)
y_dev = np.array(y_dev)

100%|██████████| 50/50 [00:05<00:00,  9.36it/s]


In [17]:
# KNN
from sklearn.neighbors import KNeighborsClassifier


knn_classifier = KNeighborsClassifier(n_neighbors=9, n_jobs=-1)
knn_classifier.fit(X_tr, y_tr)
# Evaluate on validation set
train_accuracy_knn = knn_classifier.score(X_tr, y_tr)
validation_accuracy_knn = knn_classifier.score(X_dev, y_dev)
print(f"KNN Training accuracy: {train_accuracy_knn:.4f}")
print(f"KNN Validation accuracy: {validation_accuracy_knn:.4f}")

KNN Training accuracy: 0.6508
KNN Validation accuracy: 0.6000


In [18]:
for i in range(2, 10):
    knn_classifier = KNeighborsClassifier(n_neighbors=i, n_jobs=-1)
    knn_classifier.fit(X_tr, y_tr)
    # Evaluate on validation set
    train_accuracy_knn = knn_classifier.score(X_tr, y_tr)
    validation_accuracy_knn = knn_classifier.score(X_dev, y_dev)
    print(
        f"KNN Training accuracy (k={i}): {train_accuracy_knn:.4f} | Validation accuracy: {validation_accuracy_knn:.4f}"
    )

KNN Training accuracy (k=2): 0.7692 | Validation accuracy: 0.6800
KNN Training accuracy (k=3): 0.7742 | Validation accuracy: 0.4400
KNN Training accuracy (k=4): 0.7083 | Validation accuracy: 0.5400
KNN Training accuracy (k=5): 0.7004 | Validation accuracy: 0.5600
KNN Training accuracy (k=6): 0.6675 | Validation accuracy: 0.6200
KNN Training accuracy (k=7): 0.6737 | Validation accuracy: 0.6200
KNN Training accuracy (k=8): 0.6512 | Validation accuracy: 0.6200
KNN Training accuracy (k=9): 0.6508 | Validation accuracy: 0.6000


In [22]:
from lazypredict.Supervised import LazyClassifier, LazyRegressor

lazy_classifier = LazyClassifier(
    ignore_warnings=True,
    random_state=42,
)
models, predictions = lazy_classifier.fit(X_tr, X_dev, y_tr, y_dev)
print(models)

 97%|█████████▋| 30/31 [00:17<00:00,  2.93it/s]

[LightGBM] [Info] Number of positive: 1213, number of negative: 1187
[LightGBM] [Info] Auto-choosing row-wise multi-threading, the overhead of testing was 0.001828 seconds.
You can set `force_row_wise=true` to remove the overhead.
And if memory is not enough, you can set `force_col_wise=true`.
[LightGBM] [Info] Total Bins 17595
[LightGBM] [Info] Number of data points in the train set: 2400, number of used features: 69
[LightGBM] [Info] [binary:BoostFromScore]: pavg=0.505417 -> initscore=0.021668
[LightGBM] [Info] Start training from score 0.021668


100%|██████████| 31/31 [00:18<00:00,  1.68it/s]

                               Accuracy  Balanced Accuracy  ROC AUC  F1 Score  \
Model                                                                           
LGBMClassifier                     0.62               0.63     0.63      0.62   
ExtraTreeClassifier                0.60               0.61     0.61      0.60   
ExtraTreesClassifier               0.54               0.54     0.54      0.54   
KNeighborsClassifier               0.54               0.54     0.54      0.54   
RandomForestClassifier             0.54               0.54     0.54      0.54   
BernoulliNB                        0.56               0.54     0.54      0.55   
PassiveAggressiveClassifier        0.56               0.54     0.54      0.55   
SGDClassifier                      0.56               0.53     0.53      0.54   
XGBClassifier                      0.52               0.52     0.52      0.52   
CalibratedClassifierCV             0.48               0.52     0.52      0.43   
NuSVC                       




In [30]:
from sklearn.ensemble import RandomForestClassifier
rf_classifier = RandomForestClassifier(n_estimators=100, random_state=42, n_jobs=-1)
rf_classifier.fit(X_tr, y_tr)
# Evaluate on validation set
train_accuracy_rf = rf_classifier.score(X_tr, y_tr)
validation_accuracy_rf = rf_classifier.score(X_dev, y_dev)
print(f"RF Training accuracy: {train_accuracy_rf:.4f}")
print(f"RF Validation accuracy: {validation_accuracy_rf:.4f}")

RF Training accuracy: 1.0000
RF Validation accuracy: 0.5400


In [32]:
grid = [(d, n) for d in range(2, 10) for n in range(50, 101, 20)]
best = [0]
for d, n in tqdm(grid):
    rf_classifier = RandomForestClassifier(
        n_estimators=n, max_depth=d, random_state=42, n_jobs=-1
    )
    rf_classifier.fit(X_tr, y_tr)
    # Evaluate on validation set
    train_accuracy_rf = rf_classifier.score(X_tr, y_tr)
    validation_accuracy_rf = rf_classifier.score(X_dev, y_dev)
    if validation_accuracy_rf > best[0]:
        best = [validation_accuracy_rf, d, n]
        print(
            f"New best RF Validation accuracy: {validation_accuracy_rf:.4f} with max_depth={d} and n_estimators={n}"
        )
print(
    f"Best RF Validation accuracy: {best[0]:.4f} with max_depth={best[1]} and n_estimators={best[2]}"
)

  8%|▊         | 2/24 [00:00<00:05,  4.01it/s]

New best RF Validation accuracy: 0.5600 with max_depth=2 and n_estimators=50


 17%|█▋        | 4/24 [00:01<00:05,  3.90it/s]

New best RF Validation accuracy: 0.6400 with max_depth=3 and n_estimators=50
New best RF Validation accuracy: 0.6800 with max_depth=3 and n_estimators=70


100%|██████████| 24/24 [00:06<00:00,  3.91it/s]

Best RF Validation accuracy: 0.6800 with max_depth=3 and n_estimators=70





In [39]:
window = np.hamming(2250)

X_fft = []
y_fft = []
for i in tqdm(range(len(train_dataset))):
    eeg_tensor, label = train_dataset[i]
    x = eeg_tensor.numpy().T * window[:, None]  # Apply Hamming window
    fft_vals = np.fft.rfft(x, axis=0)
    freqs = np.fft.rfftfreq(2250, d=1/250)
    mask = (freqs >= 8) & (freqs <= 30)
    spectrum = fft_vals[mask, :]
    x_features = spectrum.flatten().real  # (n_bins * n_channels,)
    X_fft.append(x_features)
    y_fft.append(label)
    
X_fft = np.array(X_fft)
y_fft = np.array(y_fft)

100%|██████████| 2400/2400 [03:47<00:00, 10.56it/s]


In [50]:
X_fft.shape, y_fft.shape

((2400, 597), (2400,))

In [44]:
y_fft = np.array([0 if label == "Left" else 1 for label in y_fft])
y_fft_dev = np.array([0 if label == "Left" else 1 for label in y_dev])

In [40]:
X_fft_dev = []
y_fft_dev = []
for i in tqdm(range(len(validation_dataset))):
    eeg_tensor, label = validation_dataset[i]
    x = eeg_tensor.numpy().T * window[:, None]  # Apply Hamming window
    fft_vals = np.fft.rfft(x, axis=0)
    freqs = np.fft.rfftfreq(2250, d=1/250)
    mask = (freqs >= 8) & (freqs <= 30)
    spectrum = fft_vals[mask, :]
    x_features = spectrum.flatten().real  # (n_bins * n_channels,)
    X_fft_dev.append(x_features)
    y_fft_dev.append(label)
X_fft_dev = np.array(X_fft_dev)
y_fft_dev = np.array(y_fft_dev)

100%|██████████| 50/50 [00:04<00:00, 11.21it/s]


In [47]:
(y_fft != y_tr).sum()

0

In [52]:
X_big = np.concatenate((X_tr, X_fft), axis=1)
X_big_dev = np.concatenate((X_dev, X_fft_dev), axis=1)

X_big.shape, X_big_dev.shape, y_tr.shape

((2400, 666), (50, 666), (2400,))

In [53]:
# KNN
knn_classifier = KNeighborsClassifier(n_neighbors=9, n_jobs=-1)
knn_classifier.fit(X_big, y_tr)
# Evaluate on validation set
train_accuracy_knn = knn_classifier.score(X_big, y_tr)
validation_accuracy_knn = knn_classifier.score(X_big_dev, y_dev)
print(f"KNN Training accuracy: {train_accuracy_knn:.4f}")
print(f"KNN Validation accuracy: {validation_accuracy_knn:.4f}")

KNN Training accuracy: 0.6396
KNN Validation accuracy: 0.4200


In [55]:
for k in range(2, 20, 2):
    knn_classifier = KNeighborsClassifier(n_neighbors=k, n_jobs=-1)
    knn_classifier.fit(X_big, y_tr)
    # Evaluate on validation set
    train_accuracy_knn = knn_classifier.score(X_big, y_tr)
    validation_accuracy_knn = knn_classifier.score(X_big_dev, y_dev)
    print(
        f"KNN Training accuracy (k={k}): {train_accuracy_knn:.4f} | Validation accuracy: {validation_accuracy_knn:.4f}"
    )

KNN Training accuracy (k=2): 0.7692 | Validation accuracy: 0.7200
KNN Training accuracy (k=4): 0.7104 | Validation accuracy: 0.5400
KNN Training accuracy (k=6): 0.6833 | Validation accuracy: 0.4800
KNN Training accuracy (k=8): 0.6417 | Validation accuracy: 0.5200
KNN Training accuracy (k=10): 0.6346 | Validation accuracy: 0.5200
KNN Training accuracy (k=12): 0.6317 | Validation accuracy: 0.4800
KNN Training accuracy (k=14): 0.6058 | Validation accuracy: 0.5000
KNN Training accuracy (k=16): 0.6092 | Validation accuracy: 0.4600
KNN Training accuracy (k=18): 0.6025 | Validation accuracy: 0.4600


In [None]:
# KNN Training accuracy (k=2): 0.9992 | Validation accuracy: 0.8000
knn = KNeighborsClassifier(n_neighbors=2, n_jobs=-1)
knn.fit(X_big, y_tr)
# Evaluate on validation set
train_accuracy_knn = knn.score(X_big, y_tr)
validation_accuracy_knn = knn.score(X_big_dev, y_dev)
print(f"KNN Training accuracy: {train_accuracy_knn:.4f}")
print(f"KNN Validation accuracy: {validation_accuracy_knn:.4f}")

KNN Training accuracy: 0.7692
KNN Validation accuracy: 0.7200


In [57]:
# classificaion report
from sklearn.metrics import classification_report
y_pred = knn.predict(X_big_dev)
print(classification_report(y_dev, y_pred, target_names=["Left", "Right"]))

              precision    recall  f1-score   support

        Left       0.68      0.93      0.79        28
       Right       0.83      0.45      0.59        22

    accuracy                           0.72        50
   macro avg       0.76      0.69      0.69        50
weighted avg       0.75      0.72      0.70        50



In [59]:
grid = [(d, n) for d in range(2, 10) for n in range(50, 101, 20)]
best = [0]
for d, n in tqdm(grid):
    rf_classifier = RandomForestClassifier(
        n_estimators=n, max_depth=d, random_state=42, n_jobs=-1
    )
    rf_classifier.fit(X_big, y_tr)
    # Evaluate on validation set
    train_accuracy_rf = rf_classifier.score(X_big, y_tr)
    validation_accuracy_rf = rf_classifier.score(X_big_dev, y_dev)
    if validation_accuracy_rf > best[0]:
        best = [validation_accuracy_rf, d, n]
        print(
            f"New best RF Validation accuracy: {validation_accuracy_rf:.4f} with max_depth={d} and n_estimators={n}"
        )
print(
    f"Best RF Validation accuracy: {best[0]:.4f} with max_depth={best[1]} and n_estimators={best[2]}"
)

  4%|▍         | 1/24 [00:00<00:05,  4.07it/s]

New best RF Validation accuracy: 0.4200 with max_depth=2 and n_estimators=50


 42%|████▏     | 10/24 [00:03<00:05,  2.69it/s]

New best RF Validation accuracy: 0.4600 with max_depth=5 and n_estimators=50


 46%|████▌     | 11/24 [00:03<00:05,  2.60it/s]

New best RF Validation accuracy: 0.4800 with max_depth=5 and n_estimators=70


 58%|█████▊    | 14/24 [00:05<00:04,  2.14it/s]

New best RF Validation accuracy: 0.5000 with max_depth=6 and n_estimators=70


 67%|██████▋   | 16/24 [00:06<00:03,  2.16it/s]

New best RF Validation accuracy: 0.5200 with max_depth=7 and n_estimators=50


 83%|████████▎ | 20/24 [00:08<00:02,  1.78it/s]

New best RF Validation accuracy: 0.5400 with max_depth=8 and n_estimators=70


 88%|████████▊ | 21/24 [00:09<00:01,  1.70it/s]

New best RF Validation accuracy: 0.5600 with max_depth=8 and n_estimators=90


100%|██████████| 24/24 [00:11<00:00,  2.10it/s]

Best RF Validation accuracy: 0.5600 with max_depth=8 and n_estimators=90





In [60]:
# lazy classifier
lazy_classifier = LazyClassifier(
    ignore_warnings=True,
    random_state=42,
)
models, predictions = lazy_classifier.fit(X_big, X_big_dev, y_tr, y_dev)
print(models)

 97%|█████████▋| 30/31 [03:21<00:02,  2.80s/it]

[LightGBM] [Info] Number of positive: 1213, number of negative: 1187
[LightGBM] [Info] Auto-choosing col-wise multi-threading, the overhead of testing was 0.023777 seconds.
You can set `force_col_wise=true` to remove the overhead.
[LightGBM] [Info] Total Bins 169830
[LightGBM] [Info] Number of data points in the train set: 2400, number of used features: 666
[LightGBM] [Info] [binary:BoostFromScore]: pavg=0.505417 -> initscore=0.021668
[LightGBM] [Info] Start training from score 0.021668


100%|██████████| 31/31 [03:24<00:00,  6.61s/it]

                               Accuracy  Balanced Accuracy  ROC AUC  F1 Score  \
Model                                                                           
SGDClassifier                      0.66               0.67     0.67      0.66   
LinearSVC                          0.66               0.66     0.66      0.66   
LogisticRegression                 0.62               0.62     0.62      0.62   
KNeighborsClassifier               0.60               0.59     0.59      0.60   
BaggingClassifier                  0.62               0.59     0.59      0.60   
RidgeClassifierCV                  0.58               0.59     0.59      0.58   
DecisionTreeClassifier             0.58               0.58     0.58      0.58   
BernoulliNB                        0.56               0.57     0.57      0.56   
ExtraTreesClassifier               0.56               0.57     0.57      0.56   
RidgeClassifier                    0.56               0.57     0.57      0.56   
Perceptron                  




In [62]:
# SGDClassifier
from sklearn.linear_model import LogisticRegression
sgd_classifier = LogisticRegression(
    solver="saga", max_iter=1000, random_state=42, n_jobs=-1
)
sgd_classifier.fit(X_big, y_tr)
# Evaluate on validation set
train_accuracy_sgd = sgd_classifier.score(X_big, y_tr)
validation_accuracy_sgd = sgd_classifier.score(X_big_dev, y_dev)

In [63]:
print(f"SGD Training accuracy: {train_accuracy_sgd:.4f}")
print(f"SGD Validation accuracy: {validation_accuracy_sgd:.4f}")

SGD Training accuracy: 0.5679
SGD Validation accuracy: 0.5400


In [65]:
from xgboost import XGBClassifier


xg = XGBClassifier(
    n_estimators=100,
    max_depth=4,
    learning_rate=0.1,
    random_state=42,
    n_jobs=-1,
)
xg.fit(X_big, y_tr)
# Evaluate on validation set
train_accuracy_xg = xg.score(X_big, y_tr)
validation_accuracy_xg = xg.score(X_big_dev, y_dev)
print(f"XGBoost Training accuracy: {train_accuracy_xg:.4f}")
print(f"XGBoost Validation accuracy: {validation_accuracy_xg:.4f}")

XGBoost Training accuracy: 0.9962
XGBoost Validation accuracy: 0.6200


In [66]:
grid = [(d, n) for d in range(2, 10) for n in range(50, 101, 20)]
best = [0]
for d, n in tqdm(grid):
    xg = XGBClassifier(
        n_estimators=n,
        max_depth=d,
        learning_rate=0.1,
        random_state=42,
        n_jobs=-1,
    )
    xg.fit(X_big, y_tr)
    # Evaluate on validation set
    train_accuracy_xg = xg.score(X_big, y_tr)
    validation_accuracy_xg = xg.score(X_big_dev, y_dev)
    if validation_accuracy_xg > best[0]:
        best = [validation_accuracy_xg, d, n]
        print(
            f"New best XGBoost Validation accuracy: {validation_accuracy_xg:.4f} with max_depth={d} and n_estimators={n}"
        )
print(
    f"Best XGBoost Validation accuracy: {best[0]:.4f} with max_depth={best[1]} and n_estimators={best[2]}"
)

  4%|▍         | 1/24 [00:01<00:38,  1.65s/it]

New best XGBoost Validation accuracy: 0.4600 with max_depth=2 and n_estimators=50


  8%|▊         | 2/24 [00:03<00:34,  1.58s/it]

New best XGBoost Validation accuracy: 0.4800 with max_depth=2 and n_estimators=70


 12%|█▎        | 3/24 [00:05<00:36,  1.76s/it]

New best XGBoost Validation accuracy: 0.5400 with max_depth=2 and n_estimators=90


 29%|██▉       | 7/24 [00:13<00:34,  2.00s/it]

New best XGBoost Validation accuracy: 0.6400 with max_depth=4 and n_estimators=50


100%|██████████| 24/24 [03:12<00:00,  8.03s/it]

Best XGBoost Validation accuracy: 0.6400 with max_depth=4 and n_estimators=50





In [69]:
# ensamble with KNN
from sklearn.ensemble import VotingClassifier
voting_classifier = VotingClassifier(
    estimators=[
        ("knn", knn),
        # ("rf", rf_classifier),
        ("sgd", sgd_classifier),
        # ("xg", xg),
    ],
    voting="soft",
    n_jobs=-1,
)
voting_classifier.fit(X_big, y_tr)
# Evaluate on validation set
train_accuracy_voting = voting_classifier.score(X_big, y_tr)
validation_accuracy_voting = voting_classifier.score(X_big_dev, y_dev)
print(f"Voting Classifier Training accuracy: {train_accuracy_voting:.4f}")
print(f"Voting Classifier Validation accuracy: {validation_accuracy_voting:.4f}")

Voting Classifier Training accuracy: 0.7833
Voting Classifier Validation accuracy: 0.6200


In [71]:
# KNN
y_pred = knn.predict(X_big_dev)
print(classification_report(y_dev, y_pred, target_names=["Left", "Right"]))

              precision    recall  f1-score   support

        Left       0.68      0.93      0.79        28
       Right       0.83      0.45      0.59        22

    accuracy                           0.72        50
   macro avg       0.76      0.69      0.69        50
weighted avg       0.75      0.72      0.70        50



In [72]:
from sklearn.metrics import f1_score


f1_score(y_dev, y_pred, average="weighted")

0.7000356506238858

In [75]:
# load test dataset
test_dataset = MI_Dataset(
    base_path="",
    index_csv="test.csv",
    ch_names=CH_NAMES,
    sfreq=SFREQ,
    transform=lambda raw: raw.notch_filter(
        freqs=50.0, picks="eeg", fir_design="firwin"
    ),
)
X_test = []

with mne.utils.use_log_level("WARNING"):
    for i in tqdm(range(len(test_dataset))):
        eeg_tensor = test_dataset[i]
        raw = mne.io.RawArray(
            eeg_tensor.numpy(),
            mne.create_info(
                ch_names=CH_NAMES, sfreq=SFREQ, ch_types=["eeg"] * len(CH_NAMES)
            ),
        )
        # Compute the PSD for the current EEG trial
        psds, freqs = raw.compute_psd(
            method="welch",
            fmin=8.0,
            fmax=30.0,  # Ensure this covers your highest harmonic
            n_fft=1024,  # Use a larger n_fft for better frequency resolution
        ).get_data(return_freqs=True)
        # Define frequencies of interest (adjust based on your specific needs)
        freqs_of_interest = np.arange(8, 31, 1)  # Example: 8 Hz to 30 Hz in 2 Hz steps
        feature_vector = []
        for freq in freqs_of_interest:
            # Find the index of the closest frequency in the PSD
            idx = np.argmin(np.abs(freqs - freq))
            # Extract the power spectral density value for that frequency
            psd_value = psds[:, idx]
            feature_vector.extend(psd_value)

        x = eeg_tensor.numpy().T * window[:, None]  # Apply Hamming window
        fft_vals = np.fft.rfft(x, axis=0)
        freqs = np.fft.rfftfreq(2250, d=1 / 250)
        mask = (freqs >= 8) & (freqs <= 30)
        spectrum = fft_vals[mask, :]
        x_features = spectrum.flatten().real  # (n_bins * n_channels,)

        # Combine PSD and FFT features
        combined_features = np.concatenate((feature_vector, x_features))
        X_test.append(combined_features)


X_test = np.array(X_test)
print(f"Test dataset shape: {X_test.shape}")

100%|██████████| 50/50 [00:04<00:00, 10.36it/s]

Test dataset shape: (50, 666)





In [77]:
predictions = knn.predict(X_test)
predictions = ["Left" if pred == 0 else "Right" for pred in predictions]
predictions_df = pd.DataFrame({
    "id": test_dataset.df["id"],
    "label": predictions
})
predictions_df.to_csv("predictions_KNN_MI.csv", index=False)