In [None]:
import os
import requests
import numpy as np
import xarray as xr
import hmp
from mne.io import read_info
import matplotlib.pyplot as plt

# Declaring path where the EEG data will be stored
epoch_data_path = os.path.join('sample_data', 'eeg')
os.makedirs(epoch_data_path, exist_ok=True)

# URLs of the first 5 participants in the SAT experiment, navigate the osf folder and adapt those if you want to do this tutorial on other data (e.g. P3, N2pc)
file_urls = [
    "https://osf.io/download/92n3g/",
    "https://osf.io/download/fdqc7/",
    "https://osf.io/download/ezfdq/",
    "https://osf.io/download/4zu35/",
    "https://osf.io/download/d8432/",
    "https://osf.io/download/p2c4k/",
]

# Download and save each file if not already in folder
for i, url in enumerate(file_urls, start=1):
    file_path = os.path.join(epoch_data_path, f'participant{i}_epo.fif')
    if not os.path.exists(file_path):
        response = requests.get(url)
        with open(file_path, 'wb') as f:
            f.write(response.content)

# Recovering individual files and participant names
subj_files = [os.path.join(epoch_data_path, f) for f in os.listdir(epoch_data_path) if f.endswith('.fif')]  # Create a list of files with full paths
subj_names = [os.path.splitext(f)[0] for f in os.listdir(epoch_data_path) if f.endswith('.fif')]  # Extract subject names based on file names

# Recovering channel information (assuming the same for all participant)
info = read_info(subj_files[0], verbose=False)

# Importing an example epoch file just to show the format, of the metadata
import mne
epochs = mne.read_epochs(os.path.join(epoch_data_path,os.listdir(epoch_data_path)[0]))
epochs.metadata

In [None]:
# At what frequency we want the data, upsample only if you have a large amount of RAM (and time)
# Note that upsampling usually doesn't improve the results, it is however useful if you want to use patterns that are shorter than the standard 50ms Halfsines
sfreq = 250

# Then we read the data as shown in Tutorial 1
epoch_data = hmp.io.read_mne_data(subj_files, data_format='epochs', sfreq=sfreq,
                            lower_limit_rt=0.2, upper_limit_rt=2,
                            reject_threshold=1e-4,
                            rt_col = 'rt', scale = 1, #In this case the rts are contained in the dataframe column "RT" and is in milliseconds, thus we adapt
                            verbose=False, subj_name=subj_names)#Turning verbose off for the documentation but it is recommended to leave it on as some output from MNE might be useful

print(epoch_data)

In [None]:
preprocessed = hmp.preprocessing.Standard(epoch_data, n_comp=10)
# Defining the expected HMP pattern
event_properties = hmp.patterns.HalfSine.create_expected(sfreq=epoch_data.sfreq)
# Performing the crosscorrelation between the preprocessed data and the expected pattern
trial_data = hmp.trialdata.TrialData.from_preprocessed(preprocessed=preprocessed, pattern=event_properties.template)

In [None]:
model = hmp.models.CumulativeMethod(event_properties)
_, estimates_cumulative = model.fit_transform(trial_data)

In [None]:
hmp.visu.plot_topo_timecourse(epoch_data, estimates_cumulative, info, as_time=True)

In [None]:
import matplotlib.pyplot as plt
from scipy.stats import ttest_1samp
from mne.viz import plot_topomap, plot_montage
bin_diff = [{'condition':['speed','accuracy']},
           {'side':['left', 'right']}]

fig, ax = plt.subplots(len(bin_diff), len(estimates_cumulative.event)+1, figsize=(len(estimates_cumulative.event)+1,len(bin_diff)), dpi=300)

for d, diff in enumerate(bin_diff):
    a = hmp.utils.condition_selection_epoch(epoch_data, list(diff.values())[0][0], variable=list(diff.keys())[0])
    b = hmp.utils.condition_selection_epoch(epoch_data, list(diff.values())[0][1], variable=list(diff.keys())[0])
    a = hmp.utils.event_channels(a, estimates_cumulative, mean=False).unstack().mean('epoch')
    b = hmp.utils.event_channels(b, estimates_cumulative, mean=False).unstack().mean('epoch')

    for event in a.event:
        t_vals = np.zeros(a.sizes['channel'])
        for c, channel in enumerate(a.channel):
            t_val, _ = ttest_1samp(a.sel(event=event, channel=channel) - b.sel(event=event, channel=channel), popmean=0)
            t_vals[c] = t_val
        _ = plot_topomap(t_vals, info, axes=ax[d, event+1], show=False, sensors=False, cmap='Spectral_r', vlim=(-5, 5))
    ax[d, 0].text(0,0, str(list(diff.values())[0][0])+ '-' + list(diff.values())[0][1], rotation=0, fontsize=4)
    ax[d, 0].axis('off')
plt.tight_layout()