In [1]:
import sys

assert sys.version_info >= (3, 10)

In [2]:
from packaging.version import Version
import mne
import sklearn
import torch

assert Version(mne.__version__) >= Version("1.10.1")
assert Version(sklearn.__version__) >= Version("1.4.0")
assert Version(torch.__version__) >= Version("2.1.0")

In [3]:
from pathlib import Path
import importlib

DATASET_DIR = Path("../../0-raw-data/motor-imaginary")
EXTRACT_DIR = Path(DATASET_DIR / "data")

def download_and_extract_motor_imaginery_data():
    target_dir = DATASET_DIR.resolve()
    if str(target_dir) not in sys.path:
        sys.path.append(str(target_dir))

    import data_fetcher
    importlib.reload(data_fetcher)

    data_fetcher.download_and_extract_data(delete_zip=False)

download_and_extract_motor_imaginery_data()

[Fetcher] Starting data preparation...
[Download] Skip: BCICIV_2a_gdf.zip already exists
[Extract] Skip: already extracted at /home/kanathipp/Stuffs/Works/final-project-federated-learning/0-raw-data/motor-imaginary/data
[Fetcher] Completed.


In [4]:
import mne
def read_data(path):
    raw = mne.io.read_raw_gdf(path, 
                              preload=True,
                              eog=['EOG-left', 'EOG-central', 'EOG-right']
                             )
    raw.drop_channels(['EOG-left', 'EOG-central', 'EOG-right'])
    raw.set_eeg_reference()
    events = mne.events_from_annotations(raw)
    epochs = mne.Epochs(raw, events[0], event_id=[5,6,7,8],on_missing ='warn')
    features = epochs.get_data()
    labels = epochs.events[:,-1]
    return features,labels

In [5]:
%%capture
features,labels,groups=[],[],[]
for i in range(1,10):
  feature,label=read_data(Path(EXTRACT_DIR/ f'A0{i}E.gdf'))
  features.append(feature)
  labels.append(label)
  groups.append([i]*len(label))

In [6]:
import numpy as np

features = np.concatenate(features)
labels = np.concatenate(labels)
groups = np.concatenate(groups)
features = np.moveaxis(features, 1, 2)


features.shape,labels.shape,groups.shape

((5256, 176, 22), (5256,), (5256,))

In [7]:
np.isnan(features).sum()

np.int64(0)

In [8]:
unique, counts = np.unique(labels, return_counts=True)
unique, counts

(array([5, 6, 7]), array([  72, 2592, 2592]))

In [9]:
unique, counts = np.unique(groups, return_counts=True)
unique, counts

(array([1, 2, 3, 4, 5, 6, 7, 8, 9]),
 array([584, 584, 584, 584, 584, 584, 584, 584, 584]))

In [10]:
from sklearn.model_selection import GroupKFold, LeaveOneGroupOut
from sklearn.preprocessing import StandardScaler

gkf = GroupKFold()

from sklearn.base import TransformerMixin, BaseEstimator

#https://stackoverflow.com/questions/50125844/how-to-standard-scale-a-3d-matrix
class StandardScaler3D(BaseEstimator,TransformerMixin):
    #batch, sequence, channels
    def __init__(self):
        self.scaler = StandardScaler()

    def fit(self,X,y=None):
        self.scaler.fit(X.reshape(-1, X.shape[2]))
        return self

    def transform(self,X):
        return self.scaler.transform(X.reshape( -1,X.shape[2])).reshape(X.shape)

In [16]:
scaler = StandardScaler3D()
features = scaler.fit_transform(features)
features = np.moveaxis(features, 1, 2)



In [17]:
import torch

test_features = torch.Tensor(features)
test_labels = torch.Tensor(labels)

len(test_features), len(test_labels)

(5256, 5256)

In [18]:
test_features.shape

torch.Size([5256, 22, 176])

In [None]:
import numpy as np

def remap_np(y: torch.Tensor) -> torch.Tensor:
    y_np = y.view(-1).cpu().numpy()
    uniq = np.unique(y_np)                       # e.g., [5,6,7,8]
    lut = {u: i for i, u in enumerate(uniq)}     # {5:0, 6:1, 7:2, 8:3}
    y_new = np.vectorize(lut.get)(y_np)
    return torch.as_tensor(y_new, dtype=torch.long, device=y.device)

test_labels = remap_np(test_labels)