# Mental imagery

This notebook provides loading functions and is a basic analysis pipeline for mental imagery recordings.

## Imports


In [None]:
from pathlib import Path
from itertools import chain
from datetime import datetime

import numpy as np
import pandas as pd

import pyxdf
import mne
from mne.decoding import CSP

from sklearn.linear_model import LogisticRegression
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as LDA
from sklearn.pipeline import make_pipeline
from sklearn.model_selection import (
    KFold,
    LeaveOneGroupOut,
    LeavePGroupsOut,
    cross_val_score,
    cross_val_predict,
)
from sklearn.metrics import confusion_matrix

## Loading functions

Please refer to the doc-strings of the functions bellow to understand what they are doing.


In [None]:
# Some hyper-parameters:

root = (
    Path("~/").expanduser() / "Desktop" / "imagery_2025"
)  # Change this according to where you saved the data

subject = "VPtest"
session = "S001"

# blocks with executed movements (see README.md file in the data for more details)
blocks_exe = [5, 6, 8]

# blocks with imagined movements (see README.md file in the data for more details)
blocks_ima = [3, 4, 7]

eeg_stream_name = "BrainVision RDA"
imagery_markers_stream_name = "ImageryParadigmMarkerStream"

id_event = {
    110: "left_hand",
    111: "right_hand",
    112: "feet",
    113: "mental_arithmetic",
}
classes = list(id_event.values())
cue_time_s = 4.0

In [None]:
def get_filepath(
    block: int,
    task: str,
    subject: str = subject,
    session: str = session,
    root: Path = root,
):
    """
    Returns the path to a file saved in a BIDS data structure.

    for example: sub-VPtest/ses-S001/eegsub-VPtest_ses-S001_task-execution_run-002_eeg.xdf

    See https://bids.neuroimaging.io
    """
    return (
        root
        / f"sub-{subject}"
        / f"ses-{session}"
        / "eeg"
        / f"sub-{subject}_ses-{session}_task-{task}_run-{block:03d}_eeg.xdf"
    )

In [None]:
def load_imagery_xdf_mne(filepath: Path):
    """
    Used to load a `.xdf` file and place it in an `mne.io.Raw` object.
    """
    streams, _ = pyxdf.load_xdf(filepath)
    stream_names = [s["info"]["name"][0] for s in streams]
    eeg_stream = streams[stream_names.index(eeg_stream_name)]
    markers_stream = streams[stream_names.index(imagery_markers_stream_name)]
    t = eeg_stream["time_stamps"]
    sfreq = float(eeg_stream["info"]["nominal_srate"][0])

    data = eeg_stream["time_series"].T
    chans = [
        (d["label"][0], d["type"][0].lower())
        for d in eeg_stream["info"]["desc"][0]["channels"][0]["channel"]
    ]
    mask = [t == "eeg" for _, t in chans]
    chans = [(n, t) for m, (n, t) in zip(mask, chans) if m]
    data = data[mask]
    ch_names, ch_types = zip(*chans)

    info = mne.create_info(ch_names=ch_names, sfreq=sfreq, ch_types=ch_types)
    dirname = filepath.parent.name.split("_")
    # info['subject_info'] = dict(
    #     his_id=dirname[0],
    # )
    # info['meas_date'] = datetime.fromisoformat('20'+'-'.join(dirname[1:])+'T00:00:00Z')

    raw = mne.io.RawArray(data, info)
    markers_time = markers_stream["time_stamps"]
    markers = markers_stream["time_series"].flatten()
    mask = np.isin(markers, list(id_event.keys()))

    annotations = mne.Annotations(
        onset=markers_time[mask] - t[0],
        duration=cue_time_s,
        description=np.vectorize(id_event.get)(markers[mask]),
    )
    raw.set_annotations(annotations)
    return raw


def get_epochs(raw):
    """
    Takes an `mne.io.Raw` object as input
    and split it into epochs according to it's `annotations` attribute.
    Returns an `mne.Epochs` object.
    """
    events, event_id = mne.events_from_annotations(
        raw, event_id={v: k for k, v in id_event.items()}
    )
    epochs = mne.Epochs(raw, events, event_id, tmin=0, tmax=cue_time_s, baseline=None)
    return epochs


def get_X_y(fnames, fmin=1, fmax=40, resample=128, get_fname=False):
    """
    Takes a list of `.xdf` file paths as input,
    loads them into an mne format with `load_imagery_xdf_mne`,
    pre-processes them according to the parameters given,
    splits them into epochs with `get_epochs`,
    and returns concatenated numpy arrays.
    """
    X_list, y_list = [], []
    for fname in fnames:
        raw = load_imagery_xdf_mne(fname)
        raw = raw.filter(fmin, fmax).resample(resample)
        epochs = get_epochs(raw)
        X_list.append(epochs.get_data())
        y_list.append(np.vectorize(id_event.get)(epochs.events[:, 2]))
        del raw, epochs
    X = np.concatenate(X_list, axis=0)
    y = np.concatenate(y_list, axis=0)
    if get_fname:
        fnames_prod = [
            fname for fname, yy in zip(fnames, y_list) for _ in range(len(yy))
        ]
        return X, y, fnames_prod
    return X, y

In [None]:
# filenames = {
#     block: get_filepath(block, task)
#     for task, blocks in zip(["execution", "imagery"], [blocks_exe, blocks_ima])
#     for block in blocks
# }

# # check the data is correct:
# for filename in filenames.values():
#         raw = load_imagery_xdf_mne(filename)
#         epochs = get_epochs(raw)
#         epochs.load_data()
#         assert len(epochs) == 20

## Analysis


In [None]:
# Pre-processing parameters:
kwargs = dict(
    fmin=8,
    fmax=12,
    resample=40,
)

### Simple train/test split

#### load data


In [None]:
# train_blocks = [3]
# test_blocks = [4]

# train_X, train_y = get_X_y([filenames[block] for block in train_blocks], **kwargs)
# test_X, test_y = get_X_y([filenames[block] for block in train_blocks], **kwargs)

#### score


In [None]:
# clf = make_pipeline(CSP(n_components=8), LogisticRegression())

# clf.fit(train_X, train_y)
# clf.score(test_X, test_y)

### Cross-validation

#### load data


In [None]:
# Select which blocks will be used for cross-validation (see blocks_exe and blocks_ima above):
X, y, fnames = get_X_y([filenames[block ]for block in blocks_exe], get_fname=True, **kwargs)
# X, y, fnames = get_X_y(list(map(get_filepath, blocks_ima)), get_fname=True, **kwargs)

# You can play with the list bellow to only keep certain classes in the data:
classes_selected = ["left_hand", "right_hand", "feet", "mental_arithmetic"]

mask = np.isin(y, classes_selected)
X = X[mask]
y = y[mask]
fnames = np.array(fnames)[mask]

#### CV score

We use a leave-p-groups-out cross-validation strategy.
Here, a group is actually a block.
The parameter `n_groups` defines how many blocks will be left out of the training set (and used for testing).


In [None]:
lgo = LeavePGroupsOut(n_groups=1)
clf = make_pipeline(CSP(n_components=8), LogisticRegression())

score_LeavePGroupsOut = cross_val_score(clf, X, y, groups=fnames, cv=lgo, verbose=False)

In [None]:
score_LeavePGroupsOut

#### confusion matrix

We use a leave-one-group-out strategy to compute a prediction for every example in `X`.
Then, we display the classification results in a confusion matrix.


In [None]:
lgo = LeaveOneGroupOut()
clf = make_pipeline(CSP(n_components=8), LogisticRegression())

y_pred = cross_val_predict(clf, X, y, groups=fnames, cv=lgo, verbose=False)

In [None]:
conf_mat = pd.DataFrame(
    confusion_matrix(y, y_pred, labels=classes),
    columns=pd.Index(classes, name="PREDICTED"),
    index=pd.Index(classes, name="TRUE"),
)
conf_mat