In [None]:
import pickle
import mne
from pathlib import Path
import numpy as np
import pandas as pd

In [None]:
eeg_electrodes_32 = ['FP1', 'FP2', 'F7', 'F3', 'Fz', 'F4', 'F8', 'FT9', 'FC5', 'FC1', 'FC2', 'FC6', 'FT10', 'T7', 'C3', 'Cz', 'C4', 'T8', 'TP9', 'CP5', 'CP1', 'CP2', 'CP6', 'TP10', 'P7', 'P3', 'Pz', 'P4', 'P8', 'O1', 'Oz', 'O2']
eeg_electrodes_29 = ['F1', 'F2', 'Fz', 'CP3', 'C3', 'FC3', 'FCz', 'Cz', 'CPz', 'FC4', 'C4', 'CP4', 'FC5', 'C5', 'CP5', 'FC1', 'C1', 'CP1', 'FC2', 'C2', 'CP2', 'FC6', 'C6', 'CP6', 'P4', 'P3', 'P1', 'Pz', 'P2']
common_electrodes = set(eeg_electrodes_32).intersection(set(eeg_electrodes_29))

In [None]:
data_indexes = {'left_real': [], 'right_real': []}
data_stat = {'subject': [], 'left': [], 'right': []} # subject_trial_ids
data_subjects_32 = {'left_real': [], 'right_real': []} # data without trials
data_subjects_29 = {'left_real': [], 'right_real': []} # data without trials
data_subjects = {'left_real': [], 'right_real': []} # data without trials

Read 29 electrodes data

In [None]:
data_29_path = 'E:/YandexDisk/EEG/raw/MI_NN/29_chanels/Real/'
data_codes = {'left_real': 1.0, 'right_real': 2.0}
subject_id = 0
for subject_path in Path(data_29_path).iterdir():
    data_stat['subject'].append(f"S{subject_id}")
    with open(f'{subject_path}/data_full.pkl', "rb") as f:
        data_full = pickle.load(f, encoding="bytes")
    with open(f'{subject_path}/states_full.pkl', "rb") as f:
        states_full = pickle.load(f, encoding="bytes")
    for movement in data_codes:
        movement_ids = np.argwhere(states_full == data_codes[movement])
        diff_values = np.diff(movement_ids[:, 1])
        split_indexes_ids = np.where(diff_values > 1.0)[0]
        split_indexes = movement_ids[:, 1][split_indexes_ids]
        data_stat[movement[:-5]].append(len(split_indexes_ids) + 1)
        for trial_id in range(0, len(split_indexes_ids) + 1):
            if trial_id == 0:
                trial_start = movement_ids[0, 1]
                trial_end = movement_ids[split_indexes_ids[trial_id], 1] + 1
            elif trial_id == len(split_indexes_ids):
                trial_start = movement_ids[split_indexes_ids[trial_id - 1], 1]
                trial_end = movement_ids[-1, 1] + 1
            else:
                trial_start = movement_ids[split_indexes_ids[trial_id - 1] + 1, 1]
                trial_end = movement_ids[split_indexes_ids[trial_id], 1] + 1
            if trial_id == 0 and states_full[:, trial_start - 1] in [0.0, 3.0]:
                data_subjects_29[movement].append(data_full[:, trial_start-2500:trial_start+2500] * 10 ** 6)
            if trial_id > 0 and states_full[:, trial_start - 1] in [0.0, 3.0]:
                data_subjects_29[movement][subject_id] = np.concatenate((data_subjects_29[movement][subject_id], data_full[:, trial_start-2000:trial_start+2000] * 10 ** 6), axis=1)
            data_indexes[movement].append(f"S{subject_id}_{movement}_29")
    subject_id += 1

num_subjects_29 = subject_id - 1
mne_python_raw = mne.io.read_raw_fif(f'{subject_path}/mne_python_raw.fif')
sample_frequency_29 = mne_python_raw.info['sfreq']
electrodes_names_29 = mne_python_raw.info['ch_names']

Read 32 electrodes data

In [None]:
from pymatreader import read_mat

data_32_path = 'E:/YandexDisk/EEG/raw/'
data_32_file = '1st_Day.mat'

curr_32_data = read_mat(data_32_path + data_32_file)

In [None]:
electrodes_names_32 = curr_32_data['subs_ica'][0]['right_real']['label']
sample_frequency_32 = curr_32_data['subs_ica'][0]['right_real']['fsample']
for subject_id in range(0, len(curr_32_data['subs_ica'])):
    data_stat['subject'].append(f"S{num_subjects_29 + 1 + subject_id}")
    for key in data_codes:
        data_stat[key[:-5]].append(len(curr_32_data['subs_ica'][subject_id][key]['trial']))
        for trial_id in range(0, len(curr_32_data['subs_ica'][subject_id][key]['trial'])):
            if trial_id == 0:
                data_subjects_32[key].append(curr_32_data['subs_ica'][subject_id][key]['trial'][trial_id][:, 2500:7500] * 10 ** 6)
            if trial_id > 0:
                data_subjects_32[key][subject_id] = np.concatenate((data_subjects_32[key][subject_id], curr_32_data['subs_ica'][subject_id][key]['trial'][trial_id][:, 3000:7000] * 10 ** 6), axis=1)
            data_indexes[key].append(f"S{num_subjects_29 + 1 + subject_id}_{key}_32")

Data for common electrodes

In [None]:
electrodes_names = list(set(electrodes_names_32).intersection(set(electrodes_names_29)))
electrodes_32_ids = []
electrodes_29_ids = []
for common_electrode in common_electrodes:
    electrodes_32_ids.append(electrodes_names_32.index(common_electrode))
    electrodes_29_ids.append(electrodes_names_29.index(common_electrode))
for movement in data_codes:
    for subject_id in range(0, len(data_subjects_29[movement])):
        if data_indexes[movement][subject_id].endswith('29'):
            data_subjects[movement].append(data_subjects_29[movement][subject_id][electrodes_29_ids, :])
    for subject_id in range(0, len(data_subjects_32[movement])):
        if data_indexes[movement][subject_id].endswith('32'):
            data_subjects[movement].append(data_subjects_32[movement][subject_id][electrodes_32_ids, :])

Create raw data for mne

In [None]:
ch_names = electrodes_names
sfreq = sample_frequency_29
ch_types = ['eeg'] * len(ch_names)
info = mne.create_info(ch_names, sfreq, ch_types)

raw_data = mne.io.RawArray(data_subjects['left_real'][1], info)

events = mne.make_fixed_length_events(raw_data, id=1, start=2.5, stop=None, duration=2.5, first_samp=True, overlap=0.0)

event_ids = dict(left=1)
t_min = -2.5
t_max = 2.5
epochs = mne.Epochs(raw_data, events, event_id=event_ids, tmin=t_min, tmax=t_max, baseline=None, preload=True)

In [None]:
import matplotlib.pyplot as plt
from matplotlib.colors import TwoSlopeNorm
import seaborn as sns
import mne
from mne.time_frequency import tfr_multitaper
from mne.stats import permutation_cluster_1samp_test as pcluster_test

freqs = np.arange(2, 36)  # frequencies from 2-35Hz
vmin, vmax = -1, 1.5  # set min and max ERDS values in plot
baseline = (-1, 0)  # baseline interval (in s)
cnorm = TwoSlopeNorm(vmin=vmin, vcenter=0, vmax=vmax)  # min, center & max ERDS

kwargs = dict(
    n_permutations=100, step_down_p=0.05, seed=1, buffer_size=None, out_type="mask"
)  # for cluster test

tfr = tfr_multitaper(
    epochs,
    freqs=freqs,
    n_cycles=freqs,
    use_fft=True,
    return_itc=False,
    average=False,
    decim=2,
)
tfr.crop(t_min, t_max).apply_baseline(baseline, mode="percent")

for event in event_ids:
    # select desired epochs for visualization
    tfr_ev = tfr[event]
    fig, axes = plt.subplots(
        1, 16, figsize=(12, 4), gridspec_kw={"width_ratios": [20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 1]}
    )
    for ch, ax in enumerate(axes[:-1]):  # for each channel
        # positive clusters
        _, c1, p1, _ = pcluster_test(tfr_ev.data[:, ch], tail=1, **kwargs)
        # negative clusters
        _, c2, p2, _ = pcluster_test(tfr_ev.data[:, ch], tail=-1, **kwargs)

        # note that we keep clusters with p <= 0.05 from the combined clusters
        # of two independent tests; in this example, we do not correct for
        # these two comparisons
        c = np.stack(c1 + c2, axis=2)  # combined clusters
        p = np.concatenate((p1, p2))  # combined p-values
        mask = c[..., p <= 0.05].any(axis=-1)

        # plot TFR (ERDS map with masking)
        tfr_ev.average().plot(
            [ch],
            cmap="RdBu",
            cnorm=cnorm,
            axes=ax,
            colorbar=False,
            show=False,
            mask=mask,
            mask_style="mask",
        )

        ax.set_title(epochs.ch_names[ch], fontsize=10)
        ax.axvline(0, linewidth=1, color="black", linestyle=":")  # event
        if ch != 0:
            ax.set_ylabel("")
            ax.set_yticklabels("")
    fig.colorbar(axes[0].images[-1], cax=axes[-1]).ax.set_yscale("linear")
    fig.suptitle(f"ERDS ({event})")
    plt.show()