## **Description**



This script is a semi-automated script for subject inclusion/exclusion, preprocessing and ΔERN value extraction.


### Modules and packages

- Python version: 3.15.5  
- Required packages:
    - `mne`
    - `autoreject`
    - `asrpy`
    - `matplotlib`
    - `pandas`
    - `numpy`
    - `csv` 
    - `os` 
- Run the cell below for all imports 

In [7]:
import mne
from mne import preprocessing
import os
import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt
from autoreject import AutoReject
import csv 
from mne.preprocessing import peak_finder
import pandas as pd
from mne import Annotations
import asrpy
import time 
from mne.time_frequency import psd_array_welch
import seaborn as sns
import plotly.express as px

### Data BIDS structure

```text
project_root
├── rawdata
│   └── sub-AG04EN28
│       └── eeg
│           ├── sub-AG04EN28_task-ern_eeg.set
│           ├── sub-AG04EN28_task-ern_channels.tsv
│           └── …
├── derivatives
│   └── ernpipeline
│       ├── plots (For subject-specific ERN plots)
│       │   ├── sub-AG04EN28_plot.png
│       │   ├── sub-CK06ST11_plot.png
│       │   └── …
│       ├── sub-AG04EN28
│       │   ├── sub-AG04EN28_desc-epochs_eeg.fif
│       │   ├── sub-AG04EN28_desc-evokedCorrect_eeg.fif
│       │   ├── sub-AG04EN28_desc-evokedIncorrect_eeg.fif
│       │   ├── sub-AG04EN28_desc-preprocSummary.txt
│       │   └── sub-AG04EN28_desc-ernValues.tsv
│       └── …


The scripts follows a BIDS valid organization and syntax. It creates all the necessary folders for the outputs in your project directory folder. The only path to update will be the root_path to the project directory below. 

In [8]:
######## Setting the paths to fit the BIDS requirements ########

pipeline_name = "ernpipeline"  # change this if needed

# Set the root path to your project directory !!
root_path = "C:/Users/qmoreau/Documents/Work/EEGManyAnalysts"

# Base directories
paths = {
    "raw": os.path.join(root_path, "raw"),
    "deriv": os.path.join(root_path, "derivatives", pipeline_name),
    "plots": os.path.join(root_path, "derivatives", pipeline_name, "plots"),
    "results": os.path.join(root_path, "results")
}

os.makedirs(paths["plots"], exist_ok=True)
os.makedirs(paths["results"], exist_ok=True)

# Utility functions to construct paths (BIDS compliant - adjust if necessary)

def raw_set_path(participant):
    return os.path.join(paths["raw"], participant, "eeg", f"{participant}_task-Flanker_eeg.set")

def events_json_path(participant):
    return os.path.join(paths["raw"], participant, "eeg", f"{participant}_task-Flanker_events.json")

def participant_dir(participant):
    return os.path.join(paths["deriv"], f"{participant}")

def epochs_file(participant):
    return os.path.join(participant_dir(participant), f"{participant}_desc-epochs_eeg.fif")

def evoked_file(participant, kind):  # kind in ['Correct', 'Incorrect', 'dif']
    suffix = {
        'Correct': '_desc-evokedCorrect_eeg.fif',
        'Incorrect': '_desc-evokedIncorrect_eeg.fif',
        'dif': '_desc-evokedDiff_eeg.fif'
    }[kind]
    return os.path.join(participant_dir(participant), suffix.replace('-', f"-{participant}-"))

def summary_txt_path(participant):
    return os.path.join(participant_dir(participant), f"{participant}_desc-preprocSummary.txt")

def ern_values_tsv(participant):
    return os.path.join(participant_dir(participant), f"{participant}_desc-ernValues.tsv")

def out_plot(participant):
    return os.path.join(paths["plots"], f"{participant}")



In [9]:
######## Gather participantects from raw folder )
# Only include directories starting with 'sub-'
files = [d for d in os.listdir(paths["raw"]) \
                if d.startswith("sub-") and os.path.isdir(os.path.join(paths["raw"], d))]
print(files)

['sub-AL07NK06', 'sub-CH05NK28', 'sub-ER05RT01', 'sub-ER06OR21', 'sub-ER07RL02', 'sub-FF07NS13', 'sub-IR05ET18', 'sub-LS08LF22', 'sub-NG03YI17', 'sub-NN07EN14', 'sub-NN09EN10', 'sub-ON05EL22', 'sub-RI04ZA30', 'sub-RT09CH24', 'sub-SE05ZI24', 'sub-TI04NS30', 'sub-US08RI28', 'sub-VA06OV12', 'sub-XX04AS28']


## **Subject quality check for inclusion/exclusion**

This section allows to perfrom an automatic quality control across all subjects using four quality metrics (OHA, THV, CHV, BCR) (Extracted from Automagic's pipeline [DOI:http://dx.doi.org/10.1101/460469doi]). A quality score is computed based on OHA, THV, and CHV, where higher scores indicate noisier data. Subjects with a quality score in the top 5% (i.e., above the 95th percentile) are excluded. Additionally, any subject with a Bad Channels Ratio (BCR) above 20% is also excluded.

 

In [10]:
subject_list = files

######## Defining a function that computes our 3 metrics of interest
def compute_oha_thv_chv(raw, theta_oha, theta_thv, theta_chv, window_s=1.0, step_s=0.5):
    
    # Extraction EEG data (n_chan × n_times)
    picks = mne.pick_types(raw.info, eeg=True, meg=False)
    data = raw.get_data(picks=picks)             
    sfreq = raw.info['sfreq']

    # OHA : Overall High Amplitude
    oha = np.mean(np.abs(data) > theta_oha)          

    # THV : Timepoints of high variance
    w = int(window_s * sfreq)
    st = int(step_s  * sfreq)
    n_win = n_bad = 0
    for start in range(0, data.shape[1] - w + 1, st):
        seg = data[:, start:start + w]               
        n_win += 1
        std_ch_t = np.nanstd(seg, axis=0)          
        if np.any(std_ch_t > theta_thv):
            n_bad += 1
    thv = n_bad / n_win if n_win > 0 else np.nan

    # CHV : Channels of high variance
    chan_sd = np.nanstd(data, axis=1)                
    chv = np.mean(chan_sd > theta_chv)              

    return oha, thv, chv

qc_rows = []

######## Loop on all subjects 

for participant in subject_list : 
    
    # load subject
    raw = mne.io.read_raw_eeglab(raw_set_path(participant), preload=True, verbose=False)
    unused_channels = ['MASTl','MASTr']
    raw = raw.drop_channels(unused_channels)
  
    # automatically detect bad channels using Local Outlier Factof (LOF), interpolate and keep them
    bad_channels = mne.preprocessing.find_bad_channels_lof(raw, n_neighbors=20, picks=None, metric="euclidean", threshold=1.5, return_scores=False, verbose=None)
    bad_channels_raw = raw.copy()
    for ch in bad_channels:
        if ch not in raw.info["bads"]:
            bad_channels_raw.info["bads"].append(ch)

    # computing the ratio of bad channels to good channels (BCR)
    n_bads = len(bad_channels_raw.info["bads"])
    n_eeg = len(mne.pick_types(raw.info, eeg=True))
    n_goods = n_eeg - n_bads
    BCR = n_bads / n_goods
    
    # Rereferencing to average 
    bad_channels_raw.set_eeg_reference('average')

    # filtering 
    bad_channels_raw.filter(l_freq=.1, h_freq=30)

    # compute OHA at 30µV
    oha30, _, _ = compute_oha_thv_chv(bad_channels_raw, theta_oha=30e-6, theta_thv=30e-6, theta_chv=30e-6)
    # compute THV and CHV  at 15µV
    _, thv15, chv15 = compute_oha_thv_chv(bad_channels_raw, theta_oha=15e-6, theta_thv=15e-6, theta_chv=15e-6)

    qc_rows.append({
        'sub':    participant,
        'OHA_30': oha30*100,
        'THV_15': thv15*100,
        'CHV_15': chv15*100,
        'BCR' : BCR*100
    })

######## build DataFrame
qc_df = pd.DataFrame(qc_rows)
df = qc_df.copy()



  raw = mne.io.read_raw_eeglab(raw_set_path(participant), preload=True, verbose=False)
  raw = mne.io.read_raw_eeglab(raw_set_path(participant), preload=True, verbose=False)
['VOGbelow']
Consider setting the channel types to be of EEG/sEEG/ECoG/DBS/fNIRS using inst.set_channel_types before calling inst.set_montage, or omit these channels when creating your montage.
  raw = mne.io.read_raw_eeglab(raw_set_path(participant), preload=True, verbose=False)


AttributeError: module 'mne.preprocessing' has no attribute 'find_bad_channels_lof'

The cell below allows to plot the number of subjects for each quality score value for data visualization 


In [None]:

######## Computing quality score using adjustable weights 

# Weight definition for each metric (Adjust the weights)
w = {"OHA_30":0.50, "THV_15":0.15, "CHV_15":0.30, "BCR":0.05}

# Calculate quality score and add to dataframe 
df["Qscore"]= (df[list(w)].mul(w).sum(axis=1))

######## Plotting for visual inspection allowing wieghts and thresholds adjustment

# 4d scattered plot (interactive)
fig = px.scatter_3d(df, x="OHA_30", y="THV_15", z="CHV_15", color="BCR", hover_name="sub", opacity=0.8)
fig.update_layout(scene=dict(xaxis_title="OHA_30 (%)", yaxis_title="THV_15 (%)", zaxis_title="CHV_15 (%)"), title="EEG QC space")
fig.show()

# Distribution plot 
plt.figure(figsize=(8, 5))
sns.histplot(df["Qscore"], bins=30, kde=True)
plt.xlabel("Qscore")
plt.title("Quality Score Distribution")
plt.grid(True, linestyle='--', alpha=0.5)
plt.show()


In [None]:
######## Setting threshold at 5th percentile of Qscore distribution 

qscore_thresh = df['Qscore'].quantile(0.05)

# Keep only subjects above the Qscore threshold
df_filtered = df[df['Qscore'] >= qscore_thresh]

# Identify subjects to exclude based on Qscore OR BCR ≥ 20
exclude = df[(df['Qscore'] < qscore_thresh) | (df['BCR'] >= 20)]['sub'].tolist()

# Save filtered dataframe (only subjects above threshold) to CSV
df_filtered.to_csv(os.path.join(paths['deriv'], 'quality_control.csv'), index=False)

# Print how many subjects were excluded
print("Number of subjects excluded based on quality label and BCR threshold:", len(exclude))

## **Preprocessing and ERN analysis**


In [None]:
######## Defining used functions for the pipeline

def compute_congruency_ERPs(epochs, congruency, participant, channel="CZ", plot_dir=out_plot):
        """
        Computes ERPs, Diff, mean_amp_diff, and plots + saves result for a given congruency level.

        Parameters:
        - epochs : mne.Epochs
        - congruency : str, one of ["0", "33", "66", "100", "all"]
        - participant : str, participant ID for filenames
        - channel : str
        - plot_dir : str, folder to save plots. If None, no saving.

        Returns:
        - ERP_correct, ERP_incorrect, Diff, mean_amp_diff
        """
        event_codes = {
            "0":    {"correct": ["106", "107"], "incorrect": ["108", "109"]},
            "33":   {"correct": ["116", "117"], "incorrect": ["118", "119"]},
            "66":   {"correct": ["126", "127"], "incorrect": ["128", "129"]},
            "100":  {"correct": ["136", "137"], "incorrect": ["138", "139"]},
            "all":  {"correct": ["106", "107", "116", "117", "126", "127", "136", "137"], "incorrect": ["108", "109", "118", "119", "128", "129", "138", "139"]}
        }

        if congruency not in event_codes:
            raise ValueError(f"Invalid congruency: {congruency}")

        correct_ids = [e for e in event_codes[congruency]["correct"] if e in epochs.event_id]
        incorrect_ids = [e for e in event_codes[congruency]["incorrect"] if e in epochs.event_id]

        epochs_correct = epochs[correct_ids] if correct_ids else None
        epochs_incorrect = epochs[incorrect_ids] if incorrect_ids else None

        def safe_avg(e, name):
            if e is not None and len(e) > 0:
                evoked = e.average()
                evoked.comment = name
                return evoked
            else:
                print(f"No epochs found for {name}")
                return None

        ERP_correct = safe_avg(epochs_correct, f"Correct {congruency}%")
        ERP_incorrect = safe_avg(epochs_incorrect, f"Incorrect {congruency}%")

        mean_amp_diff = None
        Diff = None

        if ERP_correct and ERP_incorrect:
            Diff = ERP_correct.copy()
            Diff._data = ERP_incorrect._data - ERP_correct._data
            Diff.comment = f"Difference Wave {congruency}%"

            # Peak detection in the 0–0.5 s window
            ch_idx = Diff.ch_names.index(channel)
            data = Diff.data[ch_idx]
            mask = (Diff.times >= 0) & (Diff.times <= 0.5)
            data_win = data[mask]
            times_win = Diff.times[mask]

            peak_locs, peak_vals = mne.preprocessing.peak_finder(data_win, extrema=-1)

            if len(peak_locs) > 0:
                # Take the strongest negative peak (minimum value)
                peak = np.argmin(peak_vals)
                peak_time = times_win[peak_locs[peak]]
                tmin_win = peak_time - 0.1
                tmax_win = peak_time + 0.1
                Diff_cropped = Diff.copy().pick(channel).crop(tmin_win, tmax_win)
                mean_amp_diff = Diff_cropped.data.mean(axis=1) * 1e6  
            else:
                mean_amp_diff = None

        # Plot & save
        if ERP_correct and ERP_incorrect and Diff and plot_dir:
            os.makedirs(plot_dir, exist_ok=True)
            out_plot = os.path.join(plot_dir, f"{participant}_cong-{congruency}_plot.png")
            fig = mne.viz.plot_compare_evokeds(
                [ERP_correct, ERP_incorrect, Diff], picks=channel
            )[0]
            fig.savefig(out_plot, dpi=300)
            print(f"Saved plot for {congruency}% → {out_plot}")

        return ERP_correct, ERP_incorrect, Diff, mean_amp_diff


In [None]:
subject_list = files

failed_subjects = []

for participant in subject_list : 
    
    # Define preprocessed epochs file
    preproc_file = epochs_file(participant)
    # Skip if QC excluded or already preprocessed
    if participant in exclude or os.path.exists(preproc_file):
        reason = "excluded because too noisy :(" if participant in exclude else "already preprocessed :)!"
        print(f"Subject {participant} {reason}, skipping...")
        continue
    
    #################
    # PREPROCESSING #
    #################
    
    print(f"Processing subject {participant}...")
  
    #load subject
    raw = mne.io.read_raw_eeglab(raw_set_path(participant), preload=True, verbose=False)
    unused_channels = ['MASTl','MASTr']
    raw = raw.drop_channels(unused_channels)

    #Adding HEOG as EOG
    heog_data = raw.copy().pick_channels(['F7','F8']).get_data()
    heog = heog_data[0] - heog_data[1]
    heog_info = mne.create_info(['HEOG'], raw.info['sfreq'], 'eog')
    heog_raw = mne.io.RawArray(heog[np.newaxis,:], heog_info)
    raw_heog = raw.copy()
    raw_heog = raw_heog.add_channels([heog_raw], force_update_info=True)
  
    # automatically detect bad channels using Local Outlier Factof (LOF), interpolate and keep them
    bad_channels = mne.preprocessing.find_bad_channels_lof(raw_heog, n_neighbors=20, picks=None, metric="euclidean", threshold=1.5, return_scores=False, verbose=None)
    bad_channels_raw = raw_heog.copy()
    for ch in bad_channels:
        if ch not in raw.info["bads"]:
            bad_channels_raw.info["bads"].append(ch)

    # computing the ratio of bad channels to good channels (BCR)
    n_bads = len(bad_channels_raw.info["bads"])
    n_eeg = len(mne.pick_types(raw.info, eeg=True))
    n_goods = n_eeg - n_bads
    BCR = n_bads / n_goods
    
    raw_interp = bad_channels_raw.copy().interpolate_bads()
    
    # Rereferencing to average 
    raw_interp.set_eeg_reference('average')
    
    # filtering to remove slow drifts
    filt_raw = raw_interp.copy()
    filt_raw.load_data().filter(l_freq=1., h_freq=None)
    
    # cleaning with artifact subspace reconstruction (ASR)
    try : 
        eeg_picks = mne.pick_types(filt_raw.info, eeg=True)
        asr = asrpy.ASR(sfreq=filt_raw.info['sfreq'], cutoff=15)
        asr.fit(filt_raw.copy().pick(eeg_picks))
        asr_raw = asr.transform(filt_raw.copy())
    except Exception as e:
        print(f"ASR failed for {participant}: {e}, skipping...")
        # log failed particpants 
        failed_subjects.append(participant)
        with open(os.path.join(paths['deriv'], "asr_failed_subjects.txt"), "a") as f:
            f.write(f"{participant}: {e}\n")
        continue

    # VEOG: VOG_below and FP1 
    raw_eog = asr_raw.copy()
    raw_eog = mne.set_bipolar_reference(asr_raw, anode="VOGbelow", cathode="FP1", ch_name='VEOG', copy=False, drop_refs=False)
    raw_eog.set_channel_types({"VEOG": 'eog'})
    raw_eog.drop_channels(['VOGbelow'])

    # ICA - Only computing the 25 first components for speed issue, using 42 as a random seed
    ica = mne.preprocessing.ICA(n_components=25, max_iter=1000, random_state=42)
    
    # applying ICA only to EEG channels only, to filtered raw
    ica.fit(raw_eog)
    
    # Automatically detect bad EOG components using correlation 
    eog_idx_v, eog_scores_v = ica.find_bads_eog(raw_eog, ch_name='VEOG', measure="zscore", threshold=3)
    eog_idx_h, eog_scores_h = ica.find_bads_eog(raw_eog, ch_name='HEOG', measure="zscore", threshold=3)
    ica.exclude = eog_idx_v + eog_idx_h 
    clean_raw = ica.apply(raw_eog.copy())
    ica.plot_overlay(raw_eog, exclude = ica.exclude, picks="eeg")

    postica = ica.apply(raw_interp)
    prep_continuous = postica.filter(l_freq=.1, h_freq=30)
    
    
    # defining epochs of interest 
    events, event_id = mne.events_from_annotations(prep_continuous)
    epochs = mne.Epochs(prep_continuous, events, event_id, tmin=-1, tmax=1.5, baseline= (-.2, 0), preload=True, event_repeated="drop")
    
    correct_events = [e for e in ["106", "107", "116", "117", "126", "127", "136", "137"] if e in epochs.event_id]
    incorrect_events = [e for e in ["108", "109", "118", "119",  "128", "129", "138", "139"] if e in epochs.event_id]
    event_codes_of_interest = correct_events + incorrect_events
    epochs_of_interest = epochs[event_codes_of_interest]
    
    # applying autoreject on epochs of interest 
    ar = AutoReject()
    epochs_clean = ar.fit_transform(epochs_of_interest)
    
    # Track number of retained epochs
    n_retained_epochs = len(epochs_clean)
    n_total_epochs = len(epochs_of_interest)

    # Get reject log (based on original epochs)
    reject_log = ar.get_reject_log(epochs_of_interest)
    removed_indices = np.flatnonzero(reject_log.bad_epochs)   
    
    # Count incorrect trials per congruency
    trial_counts = {}
    for label, codes in [('0',['108','109']),('33',['118','119']),
                         ('66',['128','129']),('100',['138','139'])]:
        valid = [e for e in codes if e in epochs_clean.event_id]
        trial_counts[f'n_trials_incorrect_{label}pct'] = len(epochs_clean[valid]) if valid else 0

    # Save cleaned epochs to preproc folder
    os.makedirs(os.path.dirname(preproc_file), exist_ok=True)
    epochs_clean.save(preproc_file, overwrite=True)

    # Write summary
    summary = {'subject_ID': participant,
        'interpolated_channels': bad_channels_raw.info['bads'],
        'ica_excluded': np.unique(ica.exclude).tolist(),
        'n_total_epochs': n_total_epochs,
        'n_retained_epochs': n_retained_epochs,
        'removed_epoch_indices': removed_indices.tolist(),
        **trial_counts}
        
    with open(summary_txt_path(participant), 'w') as f:
        for k, v in summary.items():
            f.write(f"{k}: {v}\n")
         
    print(f"Subject {participant} preprocessed successfully :)!")

    #################
    #  PROCESSING   #
    #################

    # Only include event codes that exist in epochs.event_id
    codes = {
        'correct': [c for c in ["106","107","116","117","126","127","136","137"] if c in epochs_clean.event_id],
        'incorrect': [c for c in ["108","109","118","119","128","129","138","139"] if c in epochs_clean.event_id]
    }
    
    epochs_corr = epochs_clean[codes['correct']] if codes['correct'] else None
    epochs_inc  = epochs_clean[codes['incorrect']] if codes['incorrect'] else None
    
    ERP_corr = epochs_corr.average() if epochs_corr else None
    ERP_inc  = epochs_inc.average()  if epochs_inc  else None
    
    if ERP_corr: ERP_corr.comment = 'Correct'
    if ERP_inc:  ERP_inc.comment  = 'Incorrect'
    Diff = None
    if ERP_corr and ERP_inc:
        Diff = ERP_inc.copy(); Diff._data = ERP_inc._data - ERP_corr._data; Diff.comment='Difference Wave'
    
    # Save evokeds
    ERP_corr.save(evoked_file(participant, 'Correct'), overwrite=True)
    ERP_inc.save(evoked_file(participant, 'Incorrect'), overwrite=True)
    Diff.save(evoked_file(participant, 'dif'), overwrite=True)
    
    # Run congruency-specific analysis
    plot_dir = paths['plots']
    ern_vals = []
    for cong in ["all","0","33","66","100"]:
        ERPc, ERPi, DIF, mean_amp = compute_congruency_ERPs(epochs_clean, cong, participant, plot_dir=plot_dir)
        ern_vals.append((cong, mean_amp[0] if mean_amp is not None else np.nan))
        
    # save ERPs in tsv file 
    csv_dir = os.path.join(paths['results'], 'ern_values')
    os.makedirs(csv_dir, exist_ok=True)
    with open(os.path.join(csv_dir, f"{participant}_desc-ernvalues.tsv"), 'w', newline='') as f:
        writer = csv.writer(f, delimiter='\t')
        writer.writerow(["Condition","MeanAmp_µV"])
        writer.writerows(ern_vals)
        
    print(f"Processing done :) Saved ERN values for {participant} to {csv_dir}")
   