# Five-Point Task — Cleaned EEG Analysis

This notebook loads ICA/AutoReject-cleaned five-point data, extracts stimulus- and response-locked epochs, applies robust pre-event z-scoring, and generates ERPs and time–frequency summaries. Settings follow slow-wave friendly filters (HPF 0.1–0.3 Hz, LPF 40 Hz).

In [None]:
%load_ext autoreload
%autoreload 2

import os
import sys
import json
from pathlib import Path
import numpy as np
import matplotlib.pyplot as plt
import mne
import yaml

# Add repo root to path for scr/* imports
ROOT = Path('..').resolve().parents[0]  # project root
if str(ROOT) not in sys.path:
    sys.path.append(str(ROOT))

from scr.steps.project_paths import ProjectPaths

mne.set_log_level('INFO')
plt.rcParams['figure.figsize'] = (10, 5)
plt.rcParams['figure.dpi'] = 120


In [None]:
# Configuration
cfg_path = ROOT / 'configs' / 'fivepoint_pipeline.yml'
with open(cfg_path, 'r') as f:
    CFG = yaml.safe_load(f)

sub = CFG.get('default_subject', '01')
ses = CFG.get('default_session', '001')
task = '5pt'
run = CFG.get('default_run', '01')

paths = ProjectPaths(CFG)
RAW_DIR = Path(CFG['directory']['root']) / CFG['directory']['raw_data_dir']
PROC_DIR = Path(CFG['directory']['root']) / CFG['directory']['processed_dir']
REPORTS_DIR = Path(CFG['directory']['root']) / CFG['directory']['reports_dir']

report_dir = REPORTS_DIR / 'fivepoint' / 'analysis' / sub / ses / f'task-{task}' / f'run-{run}'
report_dir.mkdir(parents=True, exist_ok=True)

print('PROC_DIR:', PROC_DIR)
print('Report dir:', report_dir)


In [None]:
# Data loading helpers
def try_read_raw(path: Path):
    if path.exists():
        try:
            if path.suffix.lower() == '.edf':
                return mne.io.read_raw_edf(path, preload=True)
            return mne.io.read_raw_fif(path, preload=True)
        except Exception as e:
            print(f'Failed to read raw: {path} -> {e}')
    return None

def try_read_epochs(path: Path):
    if path.exists():
        try:
            return mne.read_epochs(path, preload=True, verbose=False)
        except Exception as e:
            print(f'Failed to read epochs: {path} -> {e}')
    return None

# Candidate files (most to least preferred)
cand_epochs = [
    PROC_DIR / f'sub-{sub}' / f'ses-{ses}' / f'sub-{sub}_ses-{ses}_task-{task}_run-{run}_desc-ica_epo.fif',
    PROC_DIR / 'autoreject' / f'sub-{sub}_ses-{ses}_run-{run}_ar_final_pass_epo.fif',
    PROC_DIR / f'sub-{sub}' / f'ses-{ses}' / f'sub-{sub}_ses-{ses}_task-{task}_run-{run}_preprocessed-epoched.fif',
]

cand_raw = [
    PROC_DIR / f'sub-{sub}' / f'ses-{ses}' / f'sub-{sub}_ses-{ses}_desc-ica_cleaned.fif',
    RAW_DIR / f'sub-{sub}_ses-{ses}_task-{task}_run-{run}_raw.edf',
]

epochs = None
for p in cand_epochs:
    epochs = try_read_epochs(p)
    if epochs is not None:
        print('Loaded epochs:', p)
        break

raw = None if epochs is not None else None
if epochs is None:
    for p in cand_raw:
        raw = try_read_raw(p)
        if raw is not None:
            print('Loaded raw:', p)
            break

if (epochs is None) and (raw is None):
    raise FileNotFoundError('Could not find cleaned epochs or raw for five-point task. Check paths.')

raw if raw is not None else epochs.info


In [None]:
# Filter settings to preserve slow components
HPF = 0.1
LPF = 40.0

if raw is None:
    # If we already have epochs, ensure they were filtered reasonably; optionally refilter
    print('Using precomputed epochs; skipping refilter of raw.')
    sfreq = epochs.info['sfreq']
    picks_eeg = mne.pick_types(epochs.info, eeg=True, eog=True)
else:
    print(f'Filtering raw: {HPF}–{LPF} Hz (zero-phase)')
    raw = raw.copy().filter(l_freq=HPF, h_freq=LPF, phase='zero-double', fir_design='firwin')
    sfreq = raw.info['sfreq']
    picks_eeg = mne.pick_types(raw.info, eeg=True, eog=True)

sfreq


In [None]:
# Five-point event extraction (pattern-based)
# Assumptions: all triggers share a code (often 8); first=START, last=END; middle alternate onset/response
STIM_CH = 'Trigger'
ONSET_CODE = 801
RESP_CODE = 802

def extract_5pt_events(raw_like):
    ev = mne.find_events(raw_like, stim_channel=STIM_CH, shortest_event=1, verbose=True)
    if len(ev) < 3:
        raise RuntimeError('Not enough events detected for 5PT (need at least start, one pair, end).')
    sf = raw_like.info['sfreq']
    # First and last as boundary
    start, end = ev[0], ev[-1]
    middle = ev[1:-1]
    onset_ev, resp_ev = [], []
    for i, e in enumerate(middle):
        if i % 2 == 0:
            onset_ev.append(e.copy())
        else:
            resp_ev.append(e.copy())
    # Truncate to pairs
    n = min(len(onset_ev), len(resp_ev))
    onset_ev, resp_ev = onset_ev[:n], resp_ev[:n]
    # Make coded event arrays
    on = np.array([[e[0], 0, ONSET_CODE] for e in onset_ev], dtype=int) if n>0 else np.empty((0,3), int)
    rp = np.array([[e[0], 0, RESP_CODE] for e in resp_ev], dtype=int) if n>0 else np.empty((0,3), int)
    # Reaction times in seconds
    rts = (np.array([r[0] for r in resp_ev]) - np.array([o[0] for o in onset_ev])) / sf if n>0 else np.array([])
    return on, rp, rts, start, end

if epochs is not None and hasattr(epochs, 'events') and epochs.events is not None:
    # If epochs exist but we still want response-locked, re-derive from underlying raw
    raw_for_events = epochs.copy().average()._project_as_onto_raw() if hasattr(epochs, 'copy') else None
    # Fallback: try to use epochs._raw if present (MNE stores original Raw in private attr sometimes)
    if raw_for_events is None and hasattr(epochs, '_raw') and epochs._raw is not None:
        raw_for_events = epochs._raw
else:
    raw_for_events = raw

on_events, resp_events, RTs, start_evt, end_evt = extract_5pt_events(raw_for_events)
print(f'N onset={len(on_events)}, N response={len(resp_events)}')
print(f'RT: mean={RTs.mean():.3f}s, median={np.median(RTs):.3f}s')

# RT trimming: remove <150 ms and >95th percentile
rt_lo, rt_hi = 0.150, np.percentile(RTs, 95) if len(RTs) else (0.150, 0.0)
keep = (RTs >= rt_lo) & (RTs <= rt_hi) if len(RTs) else np.array([], dtype=bool)
on_events_trim = on_events[keep] if len(keep) else on_events
resp_events_trim = resp_events[keep] if len(keep) else resp_events
RTs_trim = RTs[keep] if len(keep) else RTs
print(f'Kept {keep.sum() if len(RTs) else len(on_events)} trials after RT trimming')


In [None]:
# Epoching: stimulus-locked and response-locked
tmin_stim, tmax_stim = -0.5, 3.0
tmin_resp, tmax_resp = -2.0, 0.8

if raw is not None:
    e_stim = mne.Epochs(raw, on_events_trim, event_id={'onset': ONSET_CODE},
                        tmin=tmin_stim, tmax=tmax_stim, baseline=None, preload=True)
    e_resp = mne.Epochs(raw, resp_events_trim, event_id={'resp': RESP_CODE},
                        tmin=tmin_resp, tmax=tmax_resp, baseline=None, preload=True)
else:
    # If we already have onset-centered epochs, re-epoch response-locked from raw_for_events if possible
    e_stim = mne.EpochsArray(epochs.get_data(), info=epochs.info, events=on_events_trim,
                             tmin=tmin_stim) if False else epochs.copy()  # keep existing stim epochs
    if raw_for_events is not None:
        e_resp = mne.Epochs(raw_for_events, resp_events_trim, event_id={'resp': RESP_CODE},
                            tmin=tmin_resp, tmax=tmax_resp, baseline=None, preload=True)
    else:
        raise RuntimeError('Cannot create response-locked epochs without access to raw-like data.')

e_stim, e_resp


In [None]:
# Robust pre-event z-scoring
def robust_z(epochs, tmin_ref, tmax_ref):
    X = epochs.get_data()  # (n_ep, n_ch, n_time)
    times = epochs.times
    ref = (times >= tmin_ref) & (times <= tmax_ref)
    refdat = X[:, :, ref].reshape(len(epochs), X.shape[1], -1)
    med = np.median(refdat, axis=(0,2), keepdims=True)
    mad = np.median(np.abs(refdat - med), axis=(0,2), keepdims=True)
    # Avoid zeros; use global median of positive MADs as floor
    mad_pos = mad[mad>0]
    floor = np.nanmedian(mad_pos) if mad_pos.size else 1.0
    scale = 1.4826 * np.maximum(mad, floor)
    Z = (X - med) / scale
    eZ = epochs.copy()
    eZ._data = Z
    return eZ

e_stimZ = robust_z(e_stim, -0.6, -0.1)
e_respZ = robust_z(e_resp, -1.2, -0.2)
e_stimZ, e_respZ


In [None]:
# ERPs: quick looks
roi_occ = ['O1','O2','PO7','PO8']
roi_cpar = ['CPz','Pz','CP1','CP2','P1','P2']
roi_cen = ['Cz']

def safe_picks(info, names):
    return [ch for ch in names if ch in info.ch_names]

ev_stim = e_stimZ.average()
ev_resp = e_respZ.average()

fig1 = ev_stim.plot(picks=safe_picks(ev_stim.info, roi_occ), titles='Stim-locked: Occipital (P1/N1)', show=False)
fig2 = ev_stim.plot(picks=safe_picks(ev_stim.info, roi_cpar), titles='Stim-locked: Centro-parietal (P3/CNV)', show=False)
fig3 = ev_resp.plot(picks=safe_picks(ev_resp.info, roi_cen), titles='Response-locked: MRCP/BP @ Cz', show=False)

fig_dir = report_dir / 'figures'
fig_dir.mkdir(parents=True, exist_ok=True)
fig1.savefig(fig_dir / 'erp_stim_occ.png'); plt.close(fig1)
fig2.savefig(fig_dir / 'erp_stim_cpar.png'); plt.close(fig2)
fig3.savefig(fig_dir / 'erp_resp_cz.png'); plt.close(fig3)
print('Saved ERP figures to', fig_dir)


In [None]:
# LRP (C3 - C4) if both present
if all(ch in e_respZ.info.ch_names for ch in ['C3','C4']):
    ev_resp_c3 = e_respZ.copy().pick(['C3']).average()
    ev_resp_c4 = e_respZ.copy().pick(['C4']).average()
    lrp = ev_resp_c3.data.squeeze() - ev_resp_c4.data.squeeze()
    plt.figure(); plt.plot(ev_resp.times, lrp)
    plt.axvline(0, color='k', ls='--'); plt.title('LRP (C3 - C4), response-locked')
    plt.xlabel('Time (s)'); plt.ylabel('Z (a.u.)')
    plt.tight_layout()
    plt.savefig(fig_dir / 'lrp_resp.png'); plt.close()
    print('Saved LRP figure')
else:
    print('C3/C4 not both available; skipping LRP.')


In [None]:
# Time–Frequency (Morlet) — response-locked for mu/beta ERD and PMBR
freqs = np.arange(2, 40, 1.0)
n_cycles = freqs / 2.0
picks_sens = mne.pick_types(e_respZ.info, eeg=True, eog=False)
tfr_resp = mne.time_frequency.tfr_morlet(e_respZ.copy().pick(picks_sens),
                                         freqs=freqs, n_cycles=n_cycles,
                                         use_fft=True, return_itc=False, average=True, n_jobs=1)
# Baseline in TF usually in dB, but here epochs already z-scored; skip further baseline
tfr_fig = tfr_resp.plot(picks='Cz' if 'Cz' in e_respZ.ch_names else None,
                        title='Response-locked TFR', show=False)
tfr_path = fig_dir / 'tfr_resp.png'
tfr_fig[0].savefig(tfr_path) if isinstance(tfr_fig, list) else tfr_fig.savefig(tfr_path)
plt.close('all')
print('Saved TFR figure to', tfr_path)


In [None]:
# Export per-trial table (RTs and basic metadata)
import pandas as pd
df = pd.DataFrame({
    'trial': np.arange(len(RTs_trim)),
    'RT_s': RTs_trim
})
csv_path = report_dir / 'fivepoint_trials.csv'
df.to_csv(csv_path, index=False)
print('Saved trial table to', csv_path)
