## 0. Import packages and define custom functions

In [28]:
# Import cell
try:
    import os
    import re
    import math
    import json
    import chardet
    import warnings
    import traceback
    import numpy as np
    import pandas as pd
    from pathlib import Path
    import ipywidgets as widgets
    from textwrap import shorten
    from ipyfilechooser import FileChooser
    from IPython.display import display, HTML, clear_output
except ImportError as e:
    print("⚠️ Error: ", e)
else:
    print("✅ Packages and functions successfully imported!")

# custom function to detect automatically and return the encoding of edf file
def detect_encoding(byte_string, min_confidence=0.6):
    result = chardet.detect(byte_string)
    encoding = result['encoding']
    confidence = result['confidence']
    if encoding is None or confidence < min_confidence:
        raise UnicodeDecodeError("chardet", byte_string, 0, len(byte_string),
                                 f"\tUnable to reliably detect encoding. Detected: {encoding} with confidence {confidence}")
    return encoding

# custom function to read information from EDF headers, without using the pyedflib package (that was too strict for ICEBERG)
# EDF file should follow a strict format, dedicating a specific number of octets for each type of information.
# it means that we can read the info octet by octet by specifying the number of octets we expect for the next variable (that is known from the EDF norm)
def read_edf_header_custom(file_path):
    with open(file_path, 'rb') as f: # open the file in binary mode, to read octet by octet. 
        header = {}
        # detect encoding
        raw_header = f.read(256)
        encoding = detect_encoding(raw_header)
        # print(f"\tDetected encoding for {file_path} : {encoding}")
        # Rewind to the beginning of the file
        f.seek(0)
        
        # the first 256 octets are global subject info
        header['version'] = f.read(8).decode(encoding).strip()
        header['patient_id'] = f.read(80).decode(encoding).strip()
        header['recording_id'] = f.read(80).decode(encoding).strip()
        header['start_date'] = f.read(8).decode(encoding).strip()
        header['start_time'] = f.read(8).decode(encoding).strip()
        header['header_bytes'] = int(f.read(8).decode(encoding).strip())
        header['reserved'] = f.read(44).decode(encoding).strip()
        header['n_data_records'] = int(f.read(8).decode(encoding).strip())
        header['duration_data_record'] = float(f.read(8).decode(encoding).strip())
        header['n_channels'] = int(f.read(4).decode(encoding).strip())
        
        # get info per channel
        n = header['n_channels']
        channel_fields = {
            'channel': [],
            'transducer_type': [],
            'dimension': [],
            'physical_min': [],
            'physical_max': [],
            'digital_min': [],
            'digital_max': [],
            'prefiltering': [],
            'sampling_frequency': [],
            'reserved': [],
        }

        for key in channel_fields:
            length = {
                'channel': 16,
                'transducer_type': 80,
                'dimension': 8,
                'physical_min': 8,
                'physical_max': 8,
                'digital_min': 8,
                'digital_max': 8,
                'prefiltering': 80,
                'sampling_frequency': 8,
                'reserved': 32,
            }[key]
            channel_fields[key] = [f.read(length).decode(encoding).strip() for _ in range(n)]

        header.update(channel_fields)
    
    return header

# function to extract filter information from the string in headers
def extract_filter_value(s, tag):
    if pd.isna(s):
        return None
    match = re.search(rf'{tag}[:\s]*([\d\.]+)\s*', s, re.IGNORECASE)
    return float(match.group(1)) if match else None

# custom function to get the sampling frequency out of a dataframe (the df needs to have 'subject' and 'channel' as columns)
def get_sf(df, subject, channel):
    df_sf = df[(df['subject'] == subject) & (df['channel'] == channel)]
    if not df_sf.empty:
        return df_sf.iloc[0]['sampling_frequency']
    else:
        return None

# function to create a widget slider to select the configuration to inspect
def mk_config_slider(value = 1, min = 1, max = 5):
    config_slider = widgets.IntSlider(
    value=value,
    min=min,
    max=max,
    step=1,
    description='Selected configuration:',
    style={'description_width': '150px'},   # increase description width (to adjust based on the description)
    layout=widgets.Layout(width='400px'),   # to adjust widget size
    disabled=False,
    continuous_update=False,
    orientation='horizontal',
    readout=True,
    readout_format='d'
    )
    return config_slider

# function to print the configuration of a dataset parameter
def print_config(i, config_dict, param):
    # get the key and value from the dictionary
    idx = i - 1
    # get participant ID
    value = list(config_dict.values())  
    v = value[idx]  
    # get configuration
    key = list(config_dict.keys())
    k = key[idx]
    
    # print info
    print(f'Selected configuration: # {i}')
    print(f'\t{len(k)} {param}: {k}')
    print(f'\t{len(v)} participants: {v}')

# function to create a scrollable box for long output (e.g., cell loading the data) 
def print_in_scrollable_box(text, height=300, font_size="12px"):
    display(HTML(f'<pre style="overflow-y:scroll; height:{height}px; border:1px solid black; padding:10px; font-size:{font_size};">{text}</pre>'))


✅ Packages and functions successfully imported!


## 1. Select study folder and extract EE/OGs information

### 1.1 Select the study folder

The code cell below will open a widget to select the folder containing your data.

In [2]:
# call widget to select your data folder 
chooser = FileChooser(os.getcwd())
chooser.title = "<b>Choose your study folder</b>"
chooser.show_only_dirs = True

# Define output widget to redirect the print within the function
out = widgets.Output()

# custom function to extract the folder path once the folder has been chosen: 
def on_folder_selected(chooser):
    out.clear_output()
    with out:
        chooser.folder_path = chooser.selected_path
        print("📁 Selected Path:", chooser.folder_path)
        
        # get the edf file list 
        chooser.edf_files = [
            f for f in Path(chooser.folder_path).rglob('*.edf')
            if not f.name.startswith('._') # don't select files starting with ._ (that can be found in mac for example)
            ]
        if not chooser.edf_files:
            print(f"⚠️ There is no .edf file in your folder")
        else:
            print(f"\nThere is {len(chooser.edf_files)} .edf files in your folder!")
        
        # check the existence and/or create the summary folder that will receive the summary tables and the report
        chooser.summary_path = f'{chooser.folder_path}/summary'
        if not os.path.exists(chooser.summary_path):
            os.makedirs(chooser.summary_path)
            print("\nCreated summary folder at: " + chooser.summary_path)
        else:
            print("\nSummary folder already exists. \nPrevious summary tables (if any) will be overwritten at: \n" + chooser.summary_path)

# callback to run the function only when a folder is selected
chooser.register_callback(on_folder_selected)
display(chooser, out)

FileChooser(path='C:\Users\yvan.nedelec\OneDrive - ICM\Documents\RE_yvan\projects\Inspect_EDF\tools', filename…

Output()

### 1.2 Extract infromation from each file parameters from each participant

The code cell below will loop across the .edf files to extract the information of each subject.\
It returns a table that can easily be manipulated to access specific information in the rest of the notebook.\
If some files failed to load, their names will be saved in a .tsv file in the summary folder.

In [3]:
# get variables from the chooser widget
folder_path = chooser.folder_path
summary_path = chooser.summary_path
edf_files = chooser.edf_files

# 
table_found = False
found_group = False

# Initialize a list of dataframes to store file info, which will be concatenated at the end (this is better for performance)
df_list = []
# Initialize an empty list for files that could not be read
failed_list = []

# intialize a dynamic output
output = ""
dynamic_out = widgets.Output()
display(dynamic_out)

for e, edf_path in enumerate(edf_files):
    with dynamic_out:
        output += (f'file {e+1}/{len(edf_files)}, currently opening file: {edf_path}\n')
        dynamic_out.clear_output(wait=True)
        print_in_scrollable_box(output, font_size = "12px")
        
        # read file with the custom function
        try:
            edf_header = read_edf_header_custom(edf_path) 
            
            # get subject name (corresponding to file_name)
            sub_name = edf_path.stem
            
            # get subject group (from the parent folder because in the ICEBERG database subfolders were created per patient group)
            sub_folder = edf_path.parent.name # get the parent folder of the subject file (path)
            
            # create df from signal info
            df = pd.DataFrame(edf_header)
                
            # theoretical resolution (edf are 16bit files so the eeg signal can take 2^16 values within the dynamic range)
            df['res_theoretical'] = (abs(pd.to_numeric(df['physical_min']))+abs(pd.to_numeric(df['physical_max'])))/pow(2,16)
            # turn theoretical resolution to uV if dimension is mV (if no dimension, it is a mess)
            df.loc[df['dimension'].str.contains('mv', case=False, na=False), 'res_theoretical'] *= 1000
            
            # get filtering info in different columns
            df['lowpass']   = df['prefiltering'].apply(lambda x: extract_filter_value(x, 'LP'))
            df['highpass']  = df['prefiltering'].apply(lambda x: extract_filter_value(x, 'HP'))
            df['notch']  = df['prefiltering'].apply(lambda x: extract_filter_value(x, 'NOTCH'))
            
            # add subject info in the dataframe
            df['subject'] = sub_name
            df['sub_folder'] = sub_folder
            df['group'] = np.nan # initialyze column 'group' with NaN
            # get group from participants table if any (else group will be inferred from subfolder or filename extension later)
            if found_group:
                df['group'] = subj_table.loc[subj_table['participant_id'] == sub_name, 'group'].iloc[0]

            # extract filename component before and after subject number (so we assume subject name contains at least incrementing numbers that are at the beginning of the file name)  
            #   ^       → start of string  
            # (.*?)     → group 1: as few chars as possible, up to the first digit  
            # (\d+)     → group 2: the number itself  
            # (.*)      → group 3: the rest of the string  
            # $         → end of string
            pre_comp = sub_num = post_comp = np.nan
            pattern = re.compile(r'^(.*?)(\d+)(.*)$')
            m = pattern.match(sub_name)
            if m:
                pre_comp = m.group(1) or np.nan
                sub_num = m.group(2) or np.nan
                post_comp = m.group(3) or np.nan
            df['pre_fn_comp'] = pre_comp
            df['post_fn_comp'] = post_comp
            df['sub_num'] = sub_num
            
            df['path'] = str(edf_path)
            df['session'] = np.nan # session will be inferred later from file name component
            
            # select only the columns of interest
            df = df[['subject', 'group', 'session', 'path', 'sub_folder', 'sub_num', 'pre_fn_comp', 'post_fn_comp', 'channel', 'transducer_type', 'dimension', 'sampling_frequency', 
                 'highpass', 'lowpass', 'notch', 'physical_min', 'physical_max', 'res_theoretical']]
            
            # store subject data
            df_list.append(df)
    
        except UnicodeDecodeError as e:
            err = f"⚠️ Encoding problem for {edf_path}\n"
            output += err
            clear_output(wait=True)
            print_in_scrollable_box(output, font_size="12px")
            failed_list.append((edf_path, 'encoding'))
        except Exception as e:
            # tb = traceback.format_exc()
            err = f"❌ Unexpected problem for {edf_path} : {e}\n"
            output += err
            clear_output(wait=True)
            print_in_scrollable_box(output, font_size="12px")
            failed_list.append((edf_path, 'other'))
   
# concatenate dataframe into one and only
with warnings.catch_warnings(): # this is to skip a warning not affecting our operation
    warnings.simplefilter("ignore", FutureWarning)
    df_full = pd.concat(df_list, ignore_index=True)

# save the failed list if not empty:
failed_df = pd.DataFrame(failed_list)
if not failed_df.empty:
    failed_df.to_csv(f'{summary_path}/failed_edf_read.tsv', sep = '\t')
    print(f'\nSaving the list of files that could not be read to: \n{summary_path}/failed_edf_read.tsv')

# select only EEG and EOGs channels and return a warning if the number of participant is smaller/higher
mask = df_full['transducer_type'].str.contains(r'\bEEG\b|\bAGAGCL ELECTRODE\b|\bEOG\b', case=False, na=False) | df_full['channel'].str.contains(r'EOG', case=False, na=False)
df_ch = df_full[mask]
# remove the emg/ecg channels that were captured with the AGAGCL ELECTRODE transducer type 
df_ch = df_ch[~df_ch['channel'].str.contains(r'emg|ecg', case=False, na=False)] # the ~ allows to not select the selection (like ! in matlab)

# get the EEG configuration per participant 
ch_per_sub = df_ch.groupby('subject')['channel'].apply(lambda x: tuple(sorted(set(x))))

# identify the channel configuration of each participant and store them in a dict to print per channel config
ch_config_dict = {}
for config in ch_per_sub.unique():
    sub = ch_per_sub[ch_per_sub == config].index.tolist()
    ch_config_dict[config] = sub

if len(ch_config_dict) > 1:
    print('\n>>> There is multiple EEG configurations in your dataset! <<<')    
    print(f'\n\tNumber of different configuration: {len(ch_config_dict)}\n')
else:
    print('\n>>> There is only one EEG configuration in your dataset! <<<\n')


Output()


>>> There is multiple EEG configurations in your dataset! <<<

	Number of different configuration: 7



### 2.2 Display channels configurations

In [4]:
# ------- A. Construire un tableau "aligné" colonnes=config -------
# ch_config_dict: { tuple(sorted(set(channels))) : [list_of_subjects] }
configs = list(ch_config_dict.keys())
n_configs = len(configs)

# prépares des étiquettes lisibles "Cfg 1 (n=12)  [ex: sub1, sub2, ...]"
col_labels = []
for i, cfg in enumerate(configs, start=1):
    subs = ch_config_dict[cfg]
    n = len(subs)
    # petit aperçu des participants dans l'en-tête (tronqué)
    preview = shorten(", ".join(subs[:5]), width=40, placeholder="…")
    col_labels.append(f"config. {i}<br>(n={n})")

# liste triée des canaux par config
cfg_channel_lists = [sorted(list(cfg)) for cfg in configs]
max_len = max(len(lst) for lst in cfg_channel_lists) if cfg_channel_lists else 0

# on padde les colonnes à la même hauteur
data = {}
for label, ch_list in zip(col_labels, cfg_channel_lists):
    padded = ch_list + [""] * (max_len - len(ch_list))
    data[label] = padded

df_configs_aligned = pd.DataFrame(data)
df_configs_aligned.index = pd.Index(range(1, max_len+1), name="rank")

display(HTML(df_configs_aligned.to_html(escape=False)))

Unnamed: 0_level_0,config 1 (n=26),config 2 (n=2),config 3 (n=14),config 4 (n=6),config 5 (n=6),config 6 (n=2),config 7 (n=10)
rank,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
1,A2,C3-M2,C3-M2,C3,C3-M2,C3,EEG A2
2,C3,C4-M1,C4-M1,C4,C4-M1,C4,EEG C3
3,EOG D,E1-M2,F4-M1,F4,E1-M2,E1,EEG C4
4,EOG G,E2-M2,O2-M1,M1,E2-M2,E2,EEG F1
5,Fp1,F4-M1,,M2,F3-M2,F3,EEG F2
6,O1,O2-M1,,O2,F4-M1,F4,EEG O1
7,,,,,O1-M2,M1,EEG O2
8,,,,,O2-M1,M2,EEG T3
9,,,,,,O1,EEG T4
10,,,,,,O2,


In [21]:
def normalize_label(x):
    return "" if x is None else str(x).strip()

# --- Prépare les données depuis ch_config_dict/configs ---
# configs : liste de tuples/channels (déjà construite dans ton code précédent)
# ch_config_dict : { tuple(sorted(set(channels))) : [list_of_subjects] }

if 'configs' not in globals():
    configs = list(ch_config_dict.keys())

config_labels = []
channels_by_cfg = []   # liste parallèle aux labels : list[str] de canaux raw par config

for i, cfg in enumerate(configs, start=1):
    subs = ch_config_dict[cfg]
    n = len(subs)
    label = f"config. {i} (n={n})"
    config_labels.append(label)
    channels_by_cfg.append(sorted(list(cfg)))

# --- Calcul canaux communs (optionnel : bouton "communs" utile) ---
canonical_by_cfg = [sorted({normalize_label(ch) for ch in cfg if normalize_label(ch)}) for cfg in channels_by_cfg]
common_canonical = set(canonical_by_cfg[0]) if canonical_by_cfg else set()
for cset in canonical_by_cfg[1:]:
    common_canonical &= set(cset)

# --------- Fabrique un panneau par configuration (avec recherche) ---------
def build_config_panel(ch_list):
    """
    Retourne (container_widget, state_dict) pour une configuration donnée.
    state_dict contient: 'checkboxes', 'filter', 'count_label'
    """
    # Widgets de contrôle
    filter_box   = widgets.Text(placeholder="Filter channels (regex or text)…", layout=widgets.Layout(width="320px"))
    btn_all      = widgets.Button(description="select all", tooltip="Sélectionner tous les canaux")
    btn_none     = widgets.Button(description="deselect all", tooltip="Déselectionner tous les canaux")
    btn_invert   = widgets.Button(description="inverse selection", tooltip="Inverser la sélection")
    # btn_common   = widgets.Button(description="Commun(s)", tooltip="Garder uniquement les canaux communs (canonisés)")

    # Cases à cocher (une par canal)
    checkboxes = [widgets.Checkbox(value=True, description=ch) for ch in ch_list]  # par défaut: tout coché
    count_label = widgets.HTML()  # affichera "X / N sélectionnés"

    # Mise à jour du compteur
    def update_count():
        sel = sum(cb.value for cb in checkboxes)
        total = len(checkboxes)
        count_label.value = f"<b>{sel}</b> / {total} sélectionnés"

    update_count()

    # Handlers boutons
    def on_all(_):
        for cb in checkboxes:
            cb.value = True
        update_count()

    def on_none(_):
        for cb in checkboxes:
            cb.value = False
        update_count()

    def on_invert(_):
        for cb in checkboxes:
            cb.value = not cb.value
        update_count()

    # def on_common(_):
    #     # On garde cochés seulement les canaux dont la forme canonisée est dans l'intersection
    #     keep = {cb: (normalize_label(cb.description) in common_canonical) for cb in checkboxes}
    #     for cb, k in keep.items():
    #         cb.value = bool(k)
    #     update_count()

    btn_all.on_click(on_all)
    btn_none.on_click(on_none)
    btn_invert.on_click(on_invert)
    # btn_common.on_click(on_common)

    # Filtrage (afficher/masquer visuellement selon filtre)
    # Accepte une regex ; si regex invalide, on tombe en "contains" insensible à la casse
    out_box = widgets.VBox(checkboxes, layout=widgets.Layout(max_height="300px", overflow="auto", border="1px solid #ddd", padding="4px"))

    def apply_filter(*args):
        patt = filter_box.value.strip()
        for cb in checkboxes:
            show = True
            label = cb.description
            if patt:
                try:
                    show = bool(re.search(patt, label, flags=re.IGNORECASE))
                except re.error:
                    show = patt.lower() in label.lower()
            cb.layout.display = "" if show else "none"

    filter_box.observe(apply_filter, names="value")

    # Chaque checkbox met à jour le compteur
    for cb in checkboxes:
        cb.observe(lambda ch: update_count(), names="value")

    controls = widgets.HBox([filter_box, btn_all, btn_none, btn_invert], layout=widgets.Layout(gap="8px", flex_flow="row wrap"))
    footer   = widgets.HBox([count_label])

    panel = widgets.VBox([controls, out_box, footer])
    state = {"checkboxes": checkboxes, "filter": filter_box, "count_label": count_label}
    return panel, state

# --------- Construire l’Accordion global ---------
panels = []
states = []  # un state par config
for ch_list in channels_by_cfg:
    panel, st = build_config_panel(ch_list)
    panels.append(panel)
    states.append(st)

acc = widgets.Accordion(children=panels)
for i, lbl in enumerate(config_labels):
    acc.set_title(i, lbl)

display(acc)

# --------- Bouton pour récupérer la sélection dans deux variables ---------
btn_save = widgets.Button(description="Save selection", button_style="success", icon="save")
save_out = widgets.Output()

def collect_selection(_=None):
    """
    Construit deux dicts:
      - selected_by_config_raw:  {config_label: [canaux 'raw' cochés]}
      - selected_by_config_canon: {config_label: [canaux canonisés (uniques)]}
    Les deux variables sont créées/écrasées dans l'espace global du notebook.
    """
    selected_raw = {}
    selected_canon = {}

    for lbl, st, ch_list in zip(config_labels, states, channels_by_cfg):
        # réassocier proprement description -> checkbox
        # (l’ordre de ch_list correspond à l’ordre de création)
        checked = []
        for cb in st["checkboxes"]:
            if cb.value:
                checked.append(cb.description)

        selected_raw[lbl] = checked
        # version canonisée (unique, triée)
        selected_canon[lbl] = sorted({normalize_label(x) for x in checked if normalize_label(x)})

    globals()['selected_by_config_raw'] = selected_raw
    globals()['selected_by_config_canonical'] = selected_canon

    with save_out:
        clear_output()
        print("✅ Sélections enregistrées dans :")
        print("   - selected_by_config_raw")
        # petit résumé
        for k in selected_raw:
            print(f"  • {k}: {len(selected_raw[k])} canaux choisis => {selected_raw[k]}")

btn_save.on_click(collect_selection)
display(widgets.HBox([btn_save]), save_out)

Accordion(children=(VBox(children=(HBox(children=(Text(value='', layout=Layout(width='320px'), placeholder='Fi…

HBox(children=(Button(button_style='success', description='Save selection', icon='save', style=ButtonStyle()),…

Output()

The sampling frequency is the number of recorded samples per time unit (expressed in Hz). It is set at the acquisition.\
Ideally, you expect to have only one sampling frequency for all the EEGs and participants.\
In multicentric dataset, you might end up with different sampling frequencies across participants (specific to each recording center).\
\
If you have multiple sampling frequencies across participants in your dataset, we recommend that you harmonize your dataset by downsampling your data to the lowest sampling frequency before your analyses.\
\
The code cell below checks how many different sampling frequencies your dataset contains.
___
_Side note: With multiple sampling frequencies within participants (that can happen for EEG and EOG), each EEG analysis software behaves differently. For example:_
- _MNE python will automatically upsample channels to the highest sampling frequency (with .edf/.bdf/.gdf format)_
- _Fieldtrip will load only a subset of channels (with the sampling frequency the most represented)_   

In [None]:
# the sampling frequency configuration
sf_per_sub = df_ch.groupby('subject')['sampling_frequency'].apply(lambda x: tuple(sorted(set(x))))
# identify the sampling frequency configuration of each participant and store them in a dict to print per sampling configuration config
sf_config_dict = {}
for config in sf_per_sub.unique():
    sub = sf_per_sub[sf_per_sub == config].index.tolist()
    sf_config_dict[config] = sub

# print info per sf configuration (maybe print it only for multiple config)
if len(sf_config_dict) > 1:
    print('\n>>> There is multiple sampling frequency for EEGs in your dataset! <<<')    
    print(f'\n\tNumber of different sampling frequency configuration: {len(sf_config_dict)}\n')
    print('Quick overlook of the EEGs associated to sampling frequencies:')
    for s, sf in enumerate(df_ch['sampling_frequency'].unique()):
        # select only rows with the current sf
        df_sf = df_ch[df_ch['sampling_frequency'] == sf].copy()
        print(f'\n{sf} Hz: {df_sf["channel"].unique()}\n')
else:
    print(f'\n>>> There is only one sampling frequency for EEGs in your dataset: {df_ch['sampling_frequency'].unique()} <<<\n')

# print('\nSampling frequency configurations:\n')
# for i, (config, participants) in enumerate(sf_config_dict.items(), 1):
#     print(f'Configuration #{i} ({len(participants)} participants):')
#     print(f'Sampling frequency ({len(config)}) : {config}\n')

Run the code cell below and move the slider to explore each sampling frequency configuration.\
The output will display the sampling frequency as well as the participant IDs associated with it.

In [None]:
if len(sf_config_dict)>=1:
    # widget to select the configuration of interest
    config_sf_slider = mk_config_slider(value = 1, min = 1, max = len(sf_config_dict))
    
    # print the configuration selected
    # interact with the slider output through the printing function 
    widgets.interact(lambda i: print_config(i, config_dict=sf_config_dict, param="sampling frequencies"), i=config_sf_slider);
else:
    print("No EEG sampling frequency found")

When we visualize EEG data, signals are classically filtered by the software (like compumedics).\
For analyses, we classically apply high-pass (to remove very low frequency), low-pass (to remove high frequency), and notch (to remove electric noise) filters.\
When we export the data, we can specify whether we want the data to be filtered or not.\
A good practice is to export the data without any filter, so that you can apply filters later according to your analyses.\
However, for whole-night recordings, we recommend to export the data with a high-pass filter of 0.01 Hz in order to remove slow drift on such long recordings.\
\
If you have multiple filter configurations, we recommend re-exporting the data without filters if possible.\
\
The code cell below checks which filters were applied and counts how many different ones were used when exporting your dataset.

In [None]:
if len(df_ch['highpass'].unique())+len(df_ch['lowpass'].unique())+len(df_ch['notch'].unique()) == 3:
    print('\n>>> All EEGs have the same filters! <<<')
elif len(df_ch['highpass'].unique())+len(df_ch['lowpass'].unique())+len(df_ch['notch'].unique()) > 3:
    print('\n>>> Filters are not fully consistent in EEGs across the dataset! <<<')
else:
    print('\n>>> There may have been a problem in reading the filters. Here is the output: <<<')

# Get the list of participants with different filtering parameters
# 1st replace NaN because groupby does not like NaN
df_filt = df_ch.copy()
df_filt[['lowpass', 'highpass', 'notch']] = df_filt[['lowpass', 'highpass', 'notch']].fillna('missing')

config_filters = (
    df_filt.groupby(['lowpass', 'highpass', 'notch'])['subject']
    .apply(lambda x: sorted(set(x)))
    .reset_index(name = 'subjects')
)

# print filter configuration
print(f'\n\tNumber of different EEG filters configurations: {len(config_filters)}\n')
# print('\nFilters configurations: ')
# r=1
# for row in config_filters.itertuples(index=False):
#     print(f'Configuration #{r} ({len(row.subjects)} participants)')
#     print(f'highpass: {row.highpass}, lowpass: {row.lowpass}, notch: {row.notch}\n')
#     r=r+1


Run the code cell below and move the slider to explore each filters' configuration.\
The output will display the filtering parameters as well as the participant IDs associated with those parameters.

In [None]:
if len(config_filters)>=1:
    # widget to select the configuration of interest
    config_filter_slider = mk_config_slider(value = 1, min = 1, max = len(config_filters))
    
    # function to rpint filters configurations
    def print_filters(config_slider):
        # get the info from the dataframe
        idx = config_slider - 1
        sID = config_filters.iloc[idx]['subjects']
        hpass = config_filters.iloc[idx]['highpass']
        lpass = config_filters.iloc[idx]['lowpass']
        notch = config_filters.iloc[idx]['notch']
        
        # print info
        print(f'Selected configuration: # {config_slider}')
        print(f'\tFilters configuration: highpass: {hpass}; lowpass: {lpass}; notch: {notch}')
        print(f'\t{len(sID)} participants: {sID}')
    
    widgets.interact(print_filters, config_slider = config_filter_slider);
else:
    print("No EEG filters found")

At the exportation, channels can be converted to different units.\
Each analysis software will handle units differently, so it can be helpful to know which units your dataset contains. 
- MNE python will automatically detect the units and convert the data to volts. However, if the unit is not read correctly, the data will **not** be converted (e.g. "UV" is not interpreted as µV, therefore data are not converted to Volt while "uV" and "uv" are correctly detected)
- fieldtrip is loading the data with their unit of origin, so you might want to convert all channels to the same unit before your analysis

The code cell below checks how many different units your dataset contains.

In [None]:
if len(df_ch['dimension'].unique()) == 1:
    print(f'\n>>> All EEGs have the same unit: {df_ch["dimension"].unique()} <<<\n')
elif len(df_ch['dimension'].unique()) > 1:
    print('\n>>> Multiple units were found! <<<')
    print(f'\n\tNumber of different units configurations: {len(df_ch['dimension'].unique())}\n')
    print('Quick overlook of EEGs associated to units:')
    for u, unit in enumerate(df_ch['dimension'].unique()):
        # select only rows with the current sf
        df_unit = df_ch[df_ch['dimension'] == unit].copy()
        print(f'\n{unit}: {df_unit["channel"].unique()}')
    print(f'\n')
    
# print the different configuration of units 
# if info about sf configuration is needed
unit_per_sub = df_ch.groupby('subject')['dimension'].apply(lambda x: tuple(sorted(set(x))))
ch_per_unit = df_ch.groupby('dimension')['channel'].apply(lambda x: tuple(sorted(set(x))))
# identify the sampling frequency configuration of each participant and store them in a dict to print per sampling configuration config
unit_config_dict = {}
for config in unit_per_sub.unique():
    sub = unit_per_sub[unit_per_sub == config].index.tolist()
    unit_config_dict[config] = sub

# print info per sf configuration
# print('\nUnits configurations:')
# for i, (config, participants) in enumerate(unit_config_dict.items(), 1):
#     print(f'Configuration #{i} ({len(participants)} participants):')
#     print(f'Unit ({len(config)}) : {config}\n')
#     # print(f"Participants : {participants}\n")

Run the code cell below and move the slider to explore each EEG units.\
The output will display the EEG units as well as the participant IDs associated with those units.

In [None]:
if len(unit_config_dict)>=1:
    # widget to select the configuration of interest
    config_unit_slider = mk_config_slider(value = 1, min = 1, max = len(unit_config_dict))
    
    # print the configuration selected
    # interact with the slider output through the printing function 
    widgets.interact(lambda i: print_config(i, config_dict=unit_config_dict, param="Units"), i=config_unit_slider);
else:
    print("No EEG unit found")

Some software (e.g. Profusion from Compumedics) allows to invert the polarity when exporting data. This feature can be extremely confusing and can lead to wrong results.\
\
Here, we inspect if the signal is inverted by checking if the minimum physical boundary is higher than the maximum physical boundary.\
For .edf file, the physical boundaries are values that are set when exporting the data by specifying the scale of the data.\
In Profusion (from Compumedics), a scale of 1mV will lead to a minimum physical boundary of -500 µV and a maximum physical boundary of +500 µV.\
\
For other EEG formats and software, the dynamical range might be set before recording (e.g. to be specified in the montage) and can't be changed when exporting.\
\
The code cell below checks, for each EEG channel, if the minimum physical boundary is greater than the maximum physical boundary, and saves a table containing the channels with inverted polarity.  

In [None]:
# select rows where the physical min is greater than the physical max
df_inv = df_ch[df_ch['physical_min'] > df_ch['physical_max']]

if not df_inv.empty:
    print('\n>>> Inverted polarity detected in EEGs! <<<')
    print(f'{df_inv.shape[0]} EEGs have an inverted polarity (from {df_ch.shape[0]} EEGs in {len(edf_files)} edf files)')
    print(df_inv[['subject', 'channel', 'dimension', 'physical_min', 'physical_max']])
else:
    print('\n>>> No inverted polarity was detected in EEGs <<<')
df_inv.to_csv(f'{summary_path}/EEG_inverted_polarity_edf.tsv', sep = '\t')
print(f'\nSaving informations from inverted polarity EEGs to:\n{summary_path}/EEG_inverted_polarity_edf.tsv \n(will be empty if no inverted polarity)')


The EDF format stores signals in 16 bits, meaning that each sample can take 65 536 discrete values (2¹⁶).\
In order to convert these values into real EEG amplitudes, a dynamic range (a minimum and maximum value) must be defined when exporting the data.\
Each sample is then given a value between this minimum and maximum (among the 2¹⁶ levels).\
\
This choice of dynamic range can lead to two opposing problems:
- **Clipping:**\
If the dynamic range is too small, certain signal amplitudes exceed the limits.\
The exceeding values are then cut off (therefore "locked" at the min/max), and information is lost.\
Example of data with a dynamic range of ± 100 µV:
<img src="images/clipped.png" width="250"/>

- **Loss of resolution:**\
If the dynamic range is too large, the 65 536 levels are spread over a too-wide amplitude.\
Each quantization step then becomes too large, and small variations in signal amplitude are no longer visible with precision.\
Example of data with a resolution of 30 µV:\
<img src="images/low_resolution.png" width="250"/>
\
Example of clean data (dynamic range = ± 500 µV; resolution = 0.01 µV:
<img src="images/clean.png" width="250"/>


#### Dynamic range
Typical physiological EEG data (good quality) varies from ± 500 µV.\
The code cell below checks if the dynamic range physical boundaries are lower than 500 µV (± 250 µV).\
You can change the dynamic range threshold with the widget.\
Detected bad channels are saved to a summary table.

In [None]:
dr_thres = widgets.BoundedFloatText(
    value=500,
    min=0,
    max=5000,
    step=0.1,
    style={'description_width': '200px'},  # augmente la largeur de la description
    layout=widgets.Layout(width='270px'),   # ajuste la taille totale du widget si besoin
    description='Dynamic range threshold (µV):',
    disabled=False
);

def check_bad_dr(threshold):
    dr_mask = df_ch['res_theoretical']*pow(2,16) <= threshold
    bad_dr = df_ch[dr_mask]
    
    if not bad_dr.empty:
        print(f'\n>>> Dynamic range <= {threshold} µV detected in EEGs! <<<\n')
        print(f'{bad_dr.shape[0]} EEGs detected (from {df_ch.shape[0]} EEGs in {len(edf_files)} edf files)')
        print(bad_dr[['subject', 'channel', 'dimension', 'physical_min', 'physical_max', 'res_theoretical']])
    else:
        print(f'\n>>> No EEG with a dynamic range <= {threshold} µV was detected! <<<')
    bad_dr.to_csv(f'{summary_path}/EEG_bad_dynamic_range_edf.tsv', sep = '\t')
    print(f'\nSaving informations from bad dynamic range EEGs to:\n{summary_path}/EEG_bad_dynamic_range_edf.tsv \n(will be empty if no bad resolution)')

widgets.interact(check_bad_dr, threshold = dr_thres);

#### Resolution
The theoretical resolution of .edf file is the minimum amplitude variation that can be recorded between two samples (influenced by the dynamic range, as stated above).\
The code cell below detects EEG channels that have a resolution higher than 0.1 µV.\
You can change the resolution threshold with the widget.\
Channels with a lower resolution than the threshold are saved to a summary table. 

In [None]:
# res_theo have been converted to uV, but if dimension was not read or not indicated in the headers, it might not work. I might need to add something more robust
r_thres = widgets.BoundedFloatText(
    value=0.1,
    min=0,
    max=10.0,
    step=0.1,
    style={'description_width': '150px'},  # augmente la largeur de la description
    layout=widgets.Layout(width='230px'),   # ajuste la taille totale du widget si besoin
    description='Resolution threshold (µV):',
    disabled=False
);

# define a function to interact with the widget
def check_bad_res(threshold):
    r_mask = df_ch['res_theoretical'] >= threshold
    bad_res = df_ch[r_mask]
    
    if not bad_res.empty:
        print(f'\n>>> EEGs with a resolution >= {threshold} µV detected! <<<')
        print(f'{bad_res.shape[0]} EEGs detected (from {df_ch.shape[0]} EEGs in {len(edf_files)} edf files)')
        print(bad_res[['subject', 'channel', 'dimension', 'physical_min', 'physical_max', 'res_theoretical']])
    else:
        print(f'\n>>> No EEG with a resolution >= {threshold} µV was detected! <<<')
    bad_res.to_csv(f'{summary_path}/EEG_bad_resolution_edf.tsv', sep = '\t')
    print(f'\nSaving informations from bad resolution EEGs to:\n{summary_path}/EEG_bad_resolution_edf.tsv \n(will be empty if no bad resolution)')

widgets.interact(check_bad_res, threshold=r_thres);

In [None]:
# get the EOG configuration per participant 
eog_per_sub = df_eog.groupby('subject')['channel'].apply(lambda x: tuple(sorted(set(x))))

# identify the EOG configuration of each participant and store them in a dict to print per EOG config
eog_config_dict = {}
for config in eog_per_sub.unique():
    sub = eog_per_sub[eog_per_sub == config].index.tolist()
    eog_config_dict[config] = sub

if len(eog_config_dict) > 1:
    print('\n>>> There is multiple EOG configurations in your dataset! <<<')    
    print(f'\n\tNumber of different configuration: {len(eog_config_dict)}\n')
else:
    print('\n>>> There is only one EOG configuration in your dataset! <<<')

# # print info per channel configuaration
# for i, (config, participants) in enumerate(ch_config_dict.items(), 1):
#     print(f'Configuration #{i} ({len(participants)} participants):')
#     print(f'Channels ({len(config)}) : {config}\n')

In [None]:
if len(eog_config_dict)>=1:
    # widget to select the configuration of interest
    config_eog_slider = mk_config_slider(value = 1, min = 1, max = len(eog_config_dict))
    
    # print the configuration selected
    # interact with the slider output through the printing function 
    widgets.interact(lambda i: print_config(i, config_dict=eog_config_dict, param="Channels"), i=config_eog_slider);
else:
    print("No EOG configuration found")

In [None]:
# the sampling frequency configuration
sfeog_per_sub = df_eog.groupby('subject')['sampling_frequency'].apply(lambda x: tuple(sorted(set(x))))
# identify the sampling frequency configuration of each participant and store them in a dict to print per sampling configuration config
sfeog_config_dict = {}
for config in sfeog_per_sub.unique():
    sub = sfeog_per_sub[sfeog_per_sub == config].index.tolist()
    sfeog_config_dict[config] = sub

# print info per sf configuration (maybe print it only for multiple config)
if len(sfeog_config_dict) > 1:
    print('\n>>> There is multiple sampling frequency for EOGs in your dataset! <<<')    
    print(f'\n\tNumber of different sampling frequency configuration: {len(sfeog_config_dict)}\n')
    print('Quick overlook of the EOGs associated to sampling frequencies:')
    for s, sf in enumerate(df_eog['sampling_frequency'].unique()):
        # select only rows with the current sf
        df_sf = df_eog[df_eog['sampling_frequency'] == sf].copy()
        print(f'\n{sf} Hz: {df_sf["channel"].unique()}')
else:
    print(f'\n>>> There is only one sampling frequency for EOGs in your dataset: {df_eog['sampling_frequency'].unique()} <<<')

# print('\nSampling frequency configurations:\n')
# for i, (config, participants) in enumerate(sf_config_dict.items(), 1):
#     print(f'Configuration #{i} ({len(participants)} participants):')
#     print(f'Sampling frequency ({len(config)}) : {config}\n')

In [None]:
if len(sfeog_config_dict)>=1:
    # widget to select the configuration of interest
    config_sfeog_slider = mk_config_slider(value = 1, min = 1, max = len(sfeog_config_dict))
    
    # print the configuration selected
    # interact with the slider output through the printing function 
    widgets.interact(lambda i: print_config(i, config_dict=sfeog_config_dict, param="Sampling frequencies"), i=config_sfeog_slider);
else:
    print("No EOG sampling frequency found")

In [None]:
if len(df_eog['highpass'].unique())+len(df_eog['lowpass'].unique())+len(df_eog['notch'].unique()) == 3:
    print('\n>>> All EOGs have the same filters! <<<')
elif len(df_eog['highpass'].unique())+len(df_eog['lowpass'].unique())+len(df_eog['notch'].unique()) > 3:
    print('\n>>> Filters are not fully consistent across the dataset! <<<')
else:
    print('\n>>> There may have been a problem in reading the filters. Here is the output: <<<')

# Get the list of participants with different filtering parameters
# 1st replace NaN because groupby does not like NaN
df_eogfilt = df_eog.copy()
df_eogfilt[['lowpass', 'highpass', 'notch']] = df_eogfilt[['lowpass', 'highpass', 'notch']].fillna('missing')

config_eogfilters = (
    df_eogfilt.groupby(['lowpass', 'highpass', 'notch'])['subject']
    .apply(lambda x: sorted(set(x)))
    .reset_index(name = 'subjects')
)

# print filter configuration
print(f'\n\tNumber of different EOG filters configurations: {len(config_eogfilters)}\n')
# print('\nFilters configurations: ')
# r=1
# for row in config_filters.itertuples(index=False):
#     print(f'Configuration #{r} ({len(row.subjects)} participants)')
#     print(f'highpass: {row.highpass}, lowpass: {row.lowpass}, notch: {row.notch}\n')
#     r=r+1

In [None]:
if len(config_eogfilters)>=1:
    # widget to select the configuration of interest
    config_eogfilter_slider = mk_config_slider(value = 1, min = 1, max = len(config_eogfilters))
    
    # function to rpint filters configurations
    def print_eogfilters(config_slider):
        # get the info from the dataframe
        idx = config_slider - 1
        sID = config_eogfilters.iloc[idx]['subjects']
        hpass = config_eogfilters.iloc[idx]['highpass']
        lpass = config_eogfilters.iloc[idx]['lowpass']
        notch = config_eogfilters.iloc[idx]['notch']
        
        # print info
        print(f'Selected configuration: # {config_slider}')
        print(f'\tFilters configuration: highpass: {hpass}; lowpass: {lpass}; notch: {notch}')
        print(f'\t{len(sID)} participants: {sID}')
    
    widgets.interact(print_eogfilters, config_slider = config_eogfilter_slider);
else:
    print("No EOG filters found")

In [None]:
if len(df_eog['dimension'].unique()) == 1:
    print(f'\n>>> All EOGs have the same unit: {df_eog["dimension"].unique()} <<<\n')
elif len(df_eog['dimension'].unique()) > 1:
    print('\n>>> Multiple units were found! <<<')
    print(f'\n\tNumber of different EOG units configurations: {len(df_eog['dimension'].unique())}\n')
    print('Quick overlook of EOGs associated to units:')
    for u, unit in enumerate(df_eog['dimension'].unique()):
        # select only rows with the current sf
        df_unit = df_eog[df_eog['dimension'] == unit].copy()
        print(f'\n{unit}: {df_unit["channel"].unique()}')
    

# print the different configuration of units 
# if info about sf configuration is needed
eogunit_per_sub = df_eog.groupby('subject')['dimension'].apply(lambda x: tuple(sorted(set(x))))
eog_per_unit = df_eog.groupby('dimension')['channel'].apply(lambda x: tuple(sorted(set(x))))
# identify the sampling frequency configuration of each participant and store them in a dict to print per sampling configuration config
eogunit_config_dict = {}
for config in eogunit_per_sub.unique():
    sub = eogunit_per_sub[eogunit_per_sub == config].index.tolist()
    eogunit_config_dict[config] = sub

# print info per sf configuration
# print('\nUnits configurations:')
# for i, (config, participants) in enumerate(unit_config_dict.items(), 1):
#     print(f'Configuration #{i} ({len(participants)} participants):')
#     print(f'Unit ({len(config)}) : {config}\n')
#     # print(f"Participants : {participants}\n")

In [None]:
if len(eogunit_config_dict)>=1:
    # widget to select the configuration of interest
    config_eogunit_slider = mk_config_slider(value = 1, min = 1, max = len(eogunit_config_dict))
    
    # print the configuration selected
    # interact with the slider output through the printing function 
    widgets.interact(lambda i: print_config(i, config_dict=eogunit_config_dict, param="Units"), i=config_eogunit_slider);
else:
    print("No EOG unit found")

In [None]:
# select rows where the physical min is greater than the physical max
df_eoginv = df_eog[df_eog['physical_min'] > df_eog['physical_max']]

if not df_eoginv.empty:
    print('\n>>> Inverted polarity detected in EOGs! <<<')
    print(f'{df_eoginv.shape[0]} EOGs have an inverted polarity (from {df_eog.shape[0]} EOGs in {len(edf_files)} edf files)')
    print(df_eoginv[['subject', 'channel', 'dimension', 'physical_min', 'physical_max']])
else:
    print('\n>>> No inverted polarity was detected in EOGs <<<')
df_eoginv.to_csv(f'{summary_path}/EOG_inverted_polarity_edf.tsv', sep = '\t')
print(f'\nSaving informations from inverted polarity EOGs to:\n{summary_path}/EOG_inverted_polarity_edf.tsv \n(will be empty if no inverted polarity)')


#### Dynamic range

In [None]:
dr_eogthres = widgets.BoundedFloatText(
    value=500,
    min=0,
    max=5000,
    step=0.1,
    style={'description_width': '200px'},  # augmente la largeur de la description
    layout=widgets.Layout(width='270px'),   # ajuste la taille totale du widget si besoin
    description='Dynamic range threshold (µV):',
    disabled=False
);


def check_bad_eogdr(threshold):
    dr_mask = df_eog['res_theoretical']*pow(2,16) <= threshold
    bad_dr = df_eog[dr_mask]
    
    if not bad_dr.empty:
        print(f'\n>>> Dynamic range <= {threshold} µV detected in EOGs! <<<\n')
        print(f'{bad_dr.shape[0]} EOGs detected (from {df_eog.shape[0]} EOGs in {len(edf_files)} edf files)')
        print(bad_dr[['subject', 'channel', 'dimension', 'physical_min', 'physical_max', 'res_theoretical']])
    else:
        print(f'\n>>> No EOG with a dynamic range <= {threshold} µV was detected! <<<')
    bad_dr.to_csv(f'{summary_path}/EOG_bad_dynamic_range_edf.tsv', sep = '\t')
    print(f'\nSaving informations from bad dynamic range EOGs to:\n{summary_path}/EOG_bad_dynamic_range_edf.tsv \n(will be empty if no bad resolution)')

widgets.interact(check_bad_eogdr, threshold = dr_eogthres);

#### Resolution

In [None]:
# res_theo have been converted to uV, but if dimension was not read or not indicated in the headers, it might not work. I might need to add something more robust
eogr_thres = widgets.BoundedFloatText(
    value=0.1,
    min=0,
    max=10.0,
    step=0.1,
    style={'description_width': '150px'},  # augmente la largeur de la description
    layout=widgets.Layout(width='230px'),   # ajuste la taille totale du widget si besoin
    description='Resolution threshold (µV):',
    disabled=False
);

# define a function to interact with the widget
def check_bad_eogres(threshold):
    r_mask = df_eog['res_theoretical'] >= threshold
    bad_res = df_eog[r_mask]
    
    if not bad_res.empty:
        print(f'\n>>> EOGs with a resolution >= {threshold} µV detected! <<<')
        print(f'{bad_res.shape[0]} EOGs detected (from {df_eog.shape[0]} EOGs in {len(edf_files)} edf files)')
        print(bad_res[['subject', 'channel', 'dimension', 'physical_min', 'physical_max', 'res_theoretical']])
    else:
        print(f'\n>>> No EOG with a resolution >= {threshold} µV was detected! <<<')
    bad_res.to_csv(f'{summary_path}/EOG_bad_resolution_edf.tsv', sep = '\t')
    print(f'\nSaving informations from bad resolution EOGs to:\n{summary_path}/EOG_bad_resolution_edf.tsv \n(will be empty if no bad resolution)')

widgets.interact(check_bad_eogres, threshold=eogr_thres);

In [None]:
# select only ECGs and return a warning if the number of participant is smaller/higher
mask_ecg = df_full['channel'].str.contains(r'ecg', case = False, na=False) # create a mask that returns true for lines containing either ecg in the channel column
df_ecg = df_full[mask_ecg]

# Check if the number of participants with only ECG is the same as df_full. 
# If not, it might be because the transducer type was no correctly detected. 
# One possibility is to add the type of transducer to the condition line 2 of this cell.
if len(df_full['subject'].unique()) > len(df_ecg['subject'].unique()):
    # identify missing subjects
    missing_sub = set(df_full['subject'].unique()) - set(df_ecg['subject'].unique())
    print('\n!!! There is less participants in the dataset with only ECGs !!!')
    print(f'Missing participants: {missing_sub}')
    print("\nEither these participants don't have ECGs.")
    print("Or the transducer type was not correctly detected.")
    # get df of missing sub to save and inspect
    df_ecgmiss = df_full[df_full['subject'].isin(missing_sub)]
    df_ecgmiss.to_csv(f'{summary_path}/ECG_missing_edf.tsv', sep = '\t')
    print(f'\nSaving informations from missing participants to:\n{summary_path}/ECG_missing_edf.tsv')
    print('Please inspect the file, and specifically the column transducer_type')
elif len(df_full['subject'].unique()) < len(df_ecg['subject'].unique()):
    print('\n!!! There is more participants in the dataset with only ECGs !!!')
    print('This should not be the case.')
    print('Please inspect what is happening in a code editor (spyder..), or ask Yvan.')
    more_sub = set(df_ecg['subject'].unique()) - set(df_full['subject'].unique())
    df_more = df_ecg[df_ecg['subject'].isin(more_sub)]
    df_more.to_csv(f'{summary_path}/ECG_suspect_edf.csv', sep = '\t')
    print(f'\nSaving informations from suspect participants to:\n{summary_path}/ECG_suspect_edf.tsv')

# saving info from ECG
df_ecg.to_csv(f'{summary_path}/ECG_summary_table.tsv', sep = '\t')
print(f'\nSaving informations from ECGs to:\n{summary_path}/ECG_summary_table.tsv')

In [None]:
# get the ECGs configuration per participant 
ecg_per_sub = df_ecg.groupby('subject')['channel'].apply(lambda x: tuple(sorted(set(x))))

# identify the ECG configuration of each participant and store them in a dict to print per ECG config
ecg_config_dict = {}
for config in ecg_per_sub.unique():
    sub = ecg_per_sub[ecg_per_sub == config].index.tolist()
    ecg_config_dict[config] = sub

if len(ecg_config_dict) > 1:
    print('\n>>> There is multiple ECG configurations in your dataset! <<<')    
    print(f'\n\tNumber of different ECG configuration: {len(ecg_config_dict)}\n')
else:
    print('\n>>> There is only one ECG configuration in your dataset! <<<')

# # print info per channel configuaration
# for i, (config, participants) in enumerate(ch_config_dict.items(), 1):
#     print(f'Configuration #{i} ({len(participants)} participants):')
#     print(f'Channels ({len(config)}) : {config}\n')

In [None]:
# widget to select the configuration of interest
if len(ecg_config_dict) >=1:
    config_ecg_slider = mk_config_slider(value = 1, min = 1, max = len(ecg_config_dict))

    # print the configuration selected
    # interact with the slider output through the printing function 
    widgets.interact(lambda i: print_config(i, config_dict=ecg_config_dict, param="Channels"), i=config_ecg_slider);
else:
    print("No ECG  configuration found")
    

In [None]:
# the sampling frequency configuration
ecgsf_per_sub = df_ecg.groupby('subject')['sampling_frequency'].apply(lambda x: tuple(sorted(set(x))))
# identify the sampling frequency configuration of each participant and store them in a dict to print per sampling configuration config
ecgsf_config_dict = {}
for config in ecgsf_per_sub.unique():
    sub = ecgsf_per_sub[ecgsf_per_sub == config].index.tolist()
    ecgsf_config_dict[config] = sub

# print info per sf configuration (maybe print it only for multiple config)
if len(ecgsf_config_dict) > 1:
    print('\n>>> There is multiple sampling frequency for ECGs in your dataset! <<<')    
    print(f'\n\tNumber of different sampling frequency configuration: {len(ecgsf_config_dict)}\n')
    print('Quick overlook of the ECGs associated to sampling frequencies:')
    for s, sf in enumerate(df_ecg['sampling_frequency'].unique()):
        # select only rows with the current sf
        df_sf = df_ecg[df_ecg['sampling_frequency'] == sf].copy()
        print(f'\n{sf} Hz: {df_sf["channel"].unique()}')
else:
    print(f'\n>>> There is only one sampling frequency for ECGs in your dataset: {df_ecg['sampling_frequency'].unique()} <<<')

# print('\nSampling frequency configurations:\n')
# for i, (config, participants) in enumerate(sf_config_dict.items(), 1):
#     print(f'Configuration #{i} ({len(participants)} participants):')
#     print(f'Sampling frequency ({len(config)}) : {config}\n')

In [None]:
# widget to select the configuration of interest
if len(ecgsf_config_dict) >=1:
    config_ecgsf_slider = mk_config_slider(value = 1, min = 1, max = len(ecgsf_config_dict))
    
    # print the configuration selected
    # interact with the slider output through the printing function 
    widgets.interact(lambda i: print_config(i, config_dict=ecgsf_config_dict, param="Sampling frequencies"), i=config_ecgsf_slider);
else:
    print("No ECG sampling frequency found")

In [None]:
if len(df_ecg['highpass'].unique())+len(df_ecg['lowpass'].unique())+len(df_ecg['notch'].unique()) == 3:
    print('\n>>> All ECGs have the same filters! <<<')
elif len(df_ecg['highpass'].unique())+len(df_ecg['lowpass'].unique())+len(df_ecg['notch'].unique()) > 3:
    print('\n>>> Filters are not fully consistent across the dataset! <<<')
else:
    print('\n>>> There may have been a problem in reading the filters. Here is the output: <<<')

# Get the list of participants with different filtering parameters
# 1st replace NaN because groupby does not like NaN
df_ecgfilt = df_ecg.copy()
df_ecgfilt[['lowpass', 'highpass', 'notch']] = df_ecgfilt[['lowpass', 'highpass', 'notch']].fillna('missing')

config_ecgfilters = (
    df_ecgfilt.groupby(['lowpass', 'highpass', 'notch'])['subject']
    .apply(lambda x: sorted(set(x)))
    .reset_index(name = 'subjects')
)

# print filter configuration
print(f'\n\tNumber of different ECG filters configurations: {len(config_ecgfilters)}\n')
# print('\nFilters configurations: ')
# r=1
# for row in config_filters.itertuples(index=False):
#     print(f'Configuration #{r} ({len(row.subjects)} participants)')
#     print(f'highpass: {row.highpass}, lowpass: {row.lowpass}, notch: {row.notch}\n')
#     r=r+1

In [None]:
# widget to select the configuration of interest
if len(config_ecgfilters)>=1:
    config_ecgfilter_slider = mk_config_slider(value = 1, min = 1, max = len(config_ecgfilters))
    
    # function to rpint filters configurations
    def print_ecgfilters(config_slider):
        # get the info from the dataframe
        idx = config_slider - 1
        sID = config_ecgfilters.iloc[idx]['subjects']
        hpass = config_ecgfilters.iloc[idx]['highpass']
        lpass = config_ecgfilters.iloc[idx]['lowpass']
        notch = config_ecgfilters.iloc[idx]['notch']
        
        # print info
        print(f'Selected configuration: # {config_slider}')
        print(f'\tFilters configuration: highpass: {hpass}; lowpass: {lpass}; notch: {notch}')
        print(f'\t{len(sID)} participants: {sID}')
    
    widgets.interact(print_ecgfilters, config_slider = config_ecgfilter_slider);
else:
    print("No ECG filters found")

In [None]:
if len(df_ecg['dimension'].unique()) == 1:
    print(f'\n>>> All ECGs have the same unit: {df_ecg["dimension"].unique()} <<<\n')
elif len(df_ecg['dimension'].unique()) > 1:
    print('\n>>> Multiple units were found for ECGs! <<<')
    print(f'\n\tNumber of different units configurations: {len(df_ecg['dimension'].unique())}\n')
    print('Quick overlook of ECGs associated to units:')
    for u, unit in enumerate(df_ecg['dimension'].unique()):
        # select only rows with the current sf
        df_unit = df_ecg[df_ecg['dimension'] == unit].copy()
        print(f'\n{unit}: {df_unit["channel"].unique()}')
    

# print the different configuration of units 
# if info about sf configuration is needed
ecgunit_per_sub = df_ecg.groupby('subject')['dimension'].apply(lambda x: tuple(sorted(set(x))))
ecg_per_unit = df_ecg.groupby('dimension')['channel'].apply(lambda x: tuple(sorted(set(x))))
# identify the sampling frequency configuration of each participant and store them in a dict to print per sampling configuration config
ecgunit_config_dict = {}
for config in ecgunit_per_sub.unique():
    sub = ecgunit_per_sub[ecgunit_per_sub == config].index.tolist()
    ecgunit_config_dict[config] = sub

# print info per sf configuration
# print('\nUnits configurations:')
# for i, (config, participants) in enumerate(unit_config_dict.items(), 1):
#     print(f'Configuration #{i} ({len(participants)} participants):')
#     print(f'Unit ({len(config)}) : {config}\n')
#     # print(f"Participants : {participants}\n")

In [None]:
if len(ecgunit_config_dict)>=1:
    # widget to select the configuration of interest
    config_ecgunit_slider = mk_config_slider(value = 1, min = 1, max = len(ecgunit_config_dict))
    
    # print the configuration selected
    # interact with the slider output through the printing function 
    widgets.interact(lambda i: print_config(i, config_dict=ecgunit_config_dict, param="Units"), i=config_ecgunit_slider);
else:
    print("No ECG unit found")

In [None]:
# select rows where the physical min is greater than the physical max
df_ecginv = df_ecg[df_ecg['physical_min'] > df_ecg['physical_max']]

if not df_ecginv.empty:
    print('\n>>> Inverted polarity detected in ECGs! <<<')
    print(f'{df_ecginv.shape[0]} ECGs have an inverted polarity (from {df_ecg.shape[0]} ECGs in {len(edf_files)} edf files)')
    print(df_ecginv[['subject', 'channel', 'dimension', 'physical_min', 'physical_max']])
else:
    print('\n>>> No inverted polarity was detected in ECGs <<<')
df_ecginv.to_csv(f'{summary_path}/ECG_inverted_polarity_edf.tsv', sep = '\t')
print(f'\nSaving informations from inverted polarity ECGs to:\n{summary_path}/ECG_inverted_polarity_edf.tsv \n(will be empty if no inverted polarity)')


#### Dynamic range

In [None]:
ecgdr_thres = widgets.BoundedFloatText(
    value=500,
    min=0,
    max=5000,
    step=0.1,
    style={'description_width': '200px'},  # augmente la largeur de la description
    layout=widgets.Layout(width='270px'),   # ajuste la taille totale du widget si besoin
    description='Dynamic range threshold (µV):',
    disabled=False
);


def check_bad_ecgdr(threshold):
    dr_mask = df_ecg['res_theoretical']*pow(2,16) <= threshold
    bad_dr = df_ecg[dr_mask]
    
    if not bad_dr.empty:
        print(f'\n>>> Dynamic range <= {threshold} µV detected in ECGs! <<<\n')
        print(f'{bad_dr.shape[0]} ECGs detected (from {df_ecg.shape[0]} ECGs in {len(edf_files)} edf files)')
        print(bad_dr[['subject', 'channel', 'dimension', 'physical_min', 'physical_max', 'res_theoretical']])
    else:
        print(f'\n>>> No ECG with a dynamic range <= {threshold} µV was detected! <<<')
    bad_dr.to_csv(f'{summary_path}/ECG_bad_dynamic_range_edf.tsv', sep = '\t')
    print(f'\nSaving informations from bad dynamic range ECGs to:\n{summary_path}/ECG_bad_dynamic_range_edf.tsv \n(will be empty if no bad resolution)')

widgets.interact(check_bad_ecgdr, threshold = ecgdr_thres);

#### Resolution

In [None]:
# res_theo have been converted to uV, but if dimension was not read or not indicated in the headers, it might not work. I might need to add something more robust
ecgr_thres = widgets.BoundedFloatText(
    value=0.1,
    min=0,
    max=10.0,
    step=0.1,
    style={'description_width': '150px'},  # augmente la largeur de la description
    layout=widgets.Layout(width='230px'),   # ajuste la taille totale du widget si besoin
    description='Resolution threshold (µV):',
    disabled=False
);

# define a function to interact with the widget
def check_bad_ecgres(threshold):
    r_mask = df_ecg['res_theoretical'] >= threshold
    bad_res = df_ecg[r_mask]
    
    if not bad_res.empty:
        print(f'\n>>> ECGs with a resolution >= {threshold} µV detected! <<<')
        print(f'{bad_res.shape[0]} ECGs detected (from {df_ecg.shape[0]} ecgs in {len(edf_files)} edf files)')
        print(bad_res[['subject', 'channel', 'dimension', 'physical_min', 'physical_max', 'res_theoretical']])
    else:
        print(f'\n>>> No ECG with a resolution >= {threshold} µV was detected! <<<')
    bad_res.to_csv(f'{summary_path}/ECG_bad_resolution_edf.tsv', sep = '\t')
    print(f'\nSaving informations from bad resolution ECGs to:\n{summary_path}/ECG_bad_resolution_edf.tsv \n(will be empty if no bad resolution)')

widgets.interact(check_bad_ecgres, threshold=ecgr_thres);

In [26]:
# -----------------------------
# 1) Normalization and synonyms
# -----------------------------
SYNONYMS = {
    # Eyes / EOG
    "LOC": "EOG_L", "ROC": "EOG_R",
    "E1": "EOG_L", "E2": "EOG_R",
    "EOGLEFT": "EOG_L", "EOGRIGHT": "EOG_R",
    # Mastoids / alternates
    "A1": "M1", "A2": "M2",
    # Hard-coded REF variants
    "FZREF": "FZ", "CZREF": "CZ", "PZREF": "PZ",
}

def normalize_label(raw: str) -> str:
    """Return a canonical EEG label from a raw label (deterministic rules)."""
    if raw is None:
        return ""
    s = str(raw).strip().upper()
    s = re.sub(r"[^A-Z0-9]", "", s)  # drop separators

    # Direct mapping first
    if s in SYNONYMS:
        s = SYNONYMS[s]

    # Drop trailing references (if present)
    s = re.sub(r"(M1|M2|A1|A2|REF)$", "", s)

    return s

COMMON_10_20 = [
    "Fp1","Fp2","F7","F3","Fz","F4","F8",
    "T3","C3","Cz","C4","T4",
    "T5","P3","Pz","P4","T6",
    "O1","O2","T7","T8","P7","P8",
    "M1","M2","EOG_L","EOG_R"
]

COMMON_10_10 = [
    # Frontal pole
    "Fp1", "Fpz", "Fp2",
    # Frontal
    "AF7", "AF3", "AFz", "AF4", "AF8",
    "F7", "F5", "F3", "F1", "Fz", "F2", "F4", "F6", "F8",
    # Frontocentral
    "FT7", "FC5", "FC3", "FC1", "FCz", "FC2", "FC4", "FC6", "FT8",
    # Central
    "T7", "C5", "C3", "C1", "Cz", "C2", "C4", "C6", "T8",
    # Centroparietal
    "TP7", "CP5", "CP3", "CP1", "CPz", "CP2", "CP4", "CP6", "TP8",
    # Parietal
    "P7", "P5", "P3", "P1", "Pz", "P2", "P4", "P6", "P8",
    # Parieto-occipital
    "PO7", "PO5", "PO3", "POz", "PO4", "PO6", "PO8",
    # Occipital
    "O1", "Oz", "O2",
    # Mastoid
    "M1", "M2",
    # EOG (optional — not part of strict 10–10, but commonly added)
    "EOG_L", "EOG_R"
]

# ----------------------------------------------------------------
# 2) Inputs expected from previous step: selected_by_config_raw, etc.
# ----------------------------------------------------------------
if 'selected_by_config_raw' not in globals():
    raise RuntimeError("selected_by_config_raw not found. Run the selection widget first.")

config_labels = list(selected_by_config_raw.keys())

# Build suggestion pool = common set + normalized from selections
suggest_pool = set(COMMON_10_10)
for cfg_label in config_labels:
    for raw in selected_by_config_raw[cfg_label]:
        if raw:
            suggest_pool.add(normalize_label(raw))
SUGGESTIONS = sorted(x for x in suggest_pool if x)

# -----------------------------------------------------------
# 3) Editor: one Accordion tab per config, rows with Combobox
# -----------------------------------------------------------
row_widgets_by_cfg = {}  # {cfg_label: {raw_label: Combobox}}

def make_row(raw_label: str):
    """Return (HBox, Combobox) for raw -> canonical mapping."""
    # Combobox = suggestions + free text
    combo = widgets.Combobox(
        options=SUGGESTIONS,
        value=normalize_label(raw_label),        # pre-fill with a suggestion
        placeholder="Type or pick a canonical label…",
        ensure_option=False,                     # allow values outside the options list
        description="",                          # no left description (we show raw label separately)
        layout=widgets.Layout(width="240px")
    )
    raw_lab = widgets.Label(raw_label, layout=widgets.Layout(width="220px"))
    row = widgets.HBox([raw_lab, combo])
    return row, combo

panels = []
for cfg_label in config_labels:
    row_widgets_by_cfg[cfg_label] = {}

    # stable ordering
    raw_list = sorted(selected_by_config_raw[cfg_label], key=lambda s: s.upper())

    # Local toolbar
    btn_apply_rules = widgets.Button(
        description="(Re)apply rules to all",
        tooltip="Re-run normalize_label(raw) for every row in this configuration",
        button_style="info"
    )
    info = widgets.HTML(value="<i>You can type freely or pick a suggestion.</i>")

    # Rows
    rows = []
    for raw in raw_list:
        row, combo = make_row(raw)
        rows.append(row)
        row_widgets_by_cfg[cfg_label][raw] = combo

    # Bind apply-all
    def make_apply_all(rows_map=row_widgets_by_cfg[cfg_label], raws=raw_list):
        def fn(_):
            for r in raws:
                rows_map[r].value = normalize_label(r)
        return fn
    btn_apply_rules.on_click(make_apply_all())

    panel = widgets.VBox([
        widgets.HBox([btn_apply_rules, info]),
        widgets.VBox(
            rows,
            layout=widgets.Layout(max_height="380px", overflow="auto", border="1px solid #ddd", padding="6px")
        )
    ])

    panels.append(panel)

acc = widgets.Accordion(children=panels)
for i, cfg_label in enumerate(config_labels):
    acc.set_title(i, cfg_label)

display(acc)

# --------------------------------------------
# 4) Save mapping -> remap_by_config + summary
# --------------------------------------------
btn_save = widgets.Button(description="Save mapping", button_style="success", icon="save")
out = widgets.Output()
display(widgets.HBox([btn_save]), out)

def on_save(_=None):
    remap = {}
    canonical_lists = {}

    for cfg_label in config_labels:
        rows_map = row_widgets_by_cfg[cfg_label]
        mapping = {}
        for raw, combo in rows_map.items():
            can = (combo.value or "").strip()
            mapping[raw] = can
        remap[cfg_label] = mapping

        # unique, sorted canonical targets (empty removed)
        canonical_lists[cfg_label] = sorted({v for v in mapping.values() if v})

    globals()["remap_by_config"] = remap
    globals()["selected_by_config_canonical"] = canonical_lists

    # Check duplicates: multiple raws -> same canonical within a config (not an error, just info)
    warnings = []
    for cfg_label, mapping in remap.items():
        inv = {}
        for raw, can in mapping.items():
            if not can:
                continue
            inv.setdefault(can, []).append(raw)
        dups = {k: v for k, v in inv.items() if len(v) > 1}
        if dups:
            warnings.append((cfg_label, dups))

    # Optional exports
    try:
        df_rows = []
        for cfg_label, mapping in remap.items():
            for raw, can in mapping.items():
                df_rows.append({"config": cfg_label, "raw_channel": raw, "canonical_channel": can})
        df_map = pd.DataFrame(df_rows)
        df_map.to_csv(f"{summary_path}/EEG_remap_raw_to_canonical.tsv", sep="\t", index=False)

        df_canon = pd.DataFrame({cfg: pd.Series(chs) for cfg, chs in canonical_lists.items()})
        df_canon.to_csv(f"{summary_path}/EEG_selected_canonical_by_config.tsv", sep="\t", index=False)

        export_msg = (f"\n📝 Exports:\n"
                      f" - {summary_path}/EEG_remap_raw_to_canonical.tsv\n"
                      f" - {summary_path}/EEG_selected_canonical_by_config.tsv")
    except Exception as e:
        export_msg = f"\n(Export skipped: {e})"

    with out:
        clear_output()
        print("✅ Mapping saved to variables:")
        print("   - remap_by_config  (dict: raw -> canonical per configuration)")
        print("   - selected_by_config_canonical (dict: unique canonical list per configuration)")
        print(export_msg)
        if warnings:
            print("\n⚠️ Multiple raw labels mapped to the same canonical within a configuration:")
            for cfg_label, dups in warnings:
                print(f"  • {cfg_label}: {dups}")
        else:
            print("\nNo duplicates detected.")
btn_save.on_click(on_save)

Accordion(children=(VBox(children=(HBox(children=(Button(button_style='info', description='(Re)apply rules to …

HBox(children=(Button(button_style='success', description='Save mapping', icon='save', style=ButtonStyle()),))

Output()

In [27]:
remap_by_config

{'config. 1 (n=26)': {'A2': 'M2', 'C3': 'C', 'Fp1': 'F', 'O1': 'O'},
 'config. 2 (n=2)': {'C4-M1': 'C', 'F4-M1': 'F', 'O2-M1': 'O'},
 'config. 3 (n=14)': {'C4-M1': 'C', 'F4-M1': 'F', 'O2-M1': 'O'},
 'config. 4 (n=6)': {'C4': 'C', 'F4': 'F', 'M1': 'M1', 'O2': 'O'},
 'config. 5 (n=6)': {'C3-M2': 'C', 'F3-M2': 'F', 'O1-M2': 'O'},
 'config. 6 (n=2)': {'C3': 'C', 'F3': 'F', 'M2': 'M2', 'O1': 'O'},
 'config. 7 (n=10)': {'EEG A2': 'M2',
  'EEG C3': 'C',
  'EEG F1': 'F',
  'EEG O1': 'O'}}

In [29]:
# --- Inputs expected from previous steps ---
# selected_by_config_canonical: { "Cfg 1 (n=…)" : ["F3","F4","C3","C4", ...], ... }
# TEN_TEN_LABELS: list of 10–10 standard labels (you pasted earlier)
if 'selected_by_config_canonical' not in globals():
    raise RuntimeError("selected_by_config_canonical not found. Run the previous mapping step first.")
if 'TEN_TEN_LABELS' not in globals():
    TEN_TEN_LABELS = []  # fallback if you forgot to paste it; still works

config_labels = list(selected_by_config_canonical.keys())

# Build options per config: union of 10–10 and the config's canonical labels
options_by_cfg = {
    cfg: sorted(set(TEN_TEN_LABELS).union(set(selected_by_config_canonical[cfg])))
    for cfg in config_labels
}

# Keep UI state for each config
state_by_cfg = {}  # cfg -> dict(mode_radio, select_multiple, projection_chk, info_html)

def build_panel_for_config(cfg_label):
    """
    Returns a VBox panel for one config with:
      - mode selection (None / Average / Custom)
      - ref channel picker (SelectMultiple) for Custom
      - projection checkbox (Average only)
      - info area + quick buttons
    """
    # Mode selection
    mode = widgets.RadioButtons(
        options=[("None (keep as-is)", "none"),
                 ("Average reference", "average"),
                 ("Custom reference (pick channels)", "custom")],
        value="none",
        description="Mode:",
        layout=widgets.Layout(width="330px")
    )

    # Projection (only meaningful for average reference in MNE)
    projection = widgets.Checkbox(
        value=False,
        description="Use projection (average ref only)",
        indent=False
    )

    # Candidate reference channels (multi-select)
    ref_options = options_by_cfg[cfg_label]
    ref_select = widgets.SelectMultiple(
        options=ref_options,
        value=tuple(),  # start empty
        rows=min(12, max(6, len(ref_options))),
        disabled=True,  # enabled only when mode == "custom"
        layout=widgets.Layout(width="260px")
    )

    # Info / validation area
    info = widgets.HTML(value="<i>Select re-reference mode. For 'Custom', pick channels on the right.</i>")

    # Quick picks row (for convenience)
    btn_mastoids = widgets.Button(description="M1 + M2", tooltip="Pick M1 and M2 if available")
    btn_cz       = widgets.Button(description="Cz", tooltip="Pick Cz")
    btn_clear    = widgets.Button(description="Clear selection", tooltip="Clear all picked reference channels")

    def on_mode_change(change):
        if change["name"] == "value":
            m = change["new"]
            # Enable/disable widgets depending on mode
            ref_select.disabled = (m != "custom")
            projection.disabled = (m != "average")

            # Set helper message
            if m == "none":
                info.value = "<i>No re-referencing will be applied for this configuration.</i>"
            elif m == "average":
                info.value = "<i>MNE call: <code>raw.set_eeg_reference('average', projection={})</code>.</i>".format(projection.value)
            else:
                info.value = "<i>Pick one or more channels to use as reference (MNE: <code>raw.set_eeg_reference(ref_channels=[...])</code>).</i>"

    mode.observe(on_mode_change, names="value")

    def pick_if_available(labels):
        opts = set(ref_options)
        chosen = [ch for ch in labels if ch in opts]
        ref_select.value = tuple(sorted(set(ref_select.value).union(chosen)))

    def on_mastoids(_):
        pick_if_available(["M1", "M2"])

    def on_cz(_):
        pick_if_available(["CZ"])

    def on_clear(_):
        ref_select.value = tuple()

    btn_mastoids.on_click(on_mastoids)
    btn_cz.on_click(on_cz)
    btn_clear.on_click(on_clear)

    # Layout
    left = widgets.VBox([mode, projection, widgets.HBox([btn_mastoids, btn_cz, btn_clear]), info])
    right = widgets.VBox([widgets.Label("Custom reference channels:"), ref_select])
    panel = widgets.HBox([left, right], layout=widgets.Layout(justify_content="space-between", align_items="flex-start", gap="16px"))

    # Store state handles
    state_by_cfg[cfg_label] = {
        "mode": mode,
        "ref_select": ref_select,
        "projection": projection,
        "info": info
    }
    return panel

# Build accordion
panels = [build_panel_for_config(cfg) for cfg in config_labels]
acc_reref = widgets.Accordion(children=panels)
for i, cfg in enumerate(config_labels):
    acc_reref.set_title(i, f"{cfg} — re-reference")
display(acc_reref)

# Save plan button
btn_save_plan = widgets.Button(description="Save re-reference plan", button_style="success", icon="save")
out_plan = widgets.Output()
display(widgets.HBox([btn_save_plan]), out_plan)

def validate_config_choice(cfg_label, mode, ref_chans):
    """Return (ok:bool, message:str)."""
    canon_set = set(selected_by_config_canonical[cfg_label])
    if mode == "custom":
        missing = [ch for ch in ref_chans if ch not in canon_set]
        if missing:
            return False, f"Warning: some chosen ref channels are not present in this configuration: {missing}"
        if len(ref_chans) == 0:
            return False, "Please pick at least one reference channel for 'Custom' mode."
    return True, "OK"

def on_save_plan(_=None):
    """
    Build a dict ready for MNE re-referencing, per configuration.
    JSON is also exported for reuse.
    """
    plan = {}  # cfg -> dict for MNE

    messages = []
    ok_all = True

    for cfg in config_labels:
        st = state_by_cfg[cfg]
        mode = st["mode"].value
        proj = bool(st["projection"].value)
        ref_chans = list(st["ref_select"].value)

        # Basic validation
        ok, msg = validate_config_choice(cfg, mode, ref_chans)
        if not ok:
            ok_all = False
        messages.append(f"{cfg}: {msg}")

        # Build MNE-friendly spec
        if mode == "none":
            spec = {"mode": "none"}  # you can use this key to skip in your pipeline
        elif mode == "average":
            spec = {"mode": "average", "projection": proj}
        else:
            # Note: MNE does not support projection=True for explicit ref_channels
            spec = {"mode": "custom", "ref_channels": ref_chans}

        plan[cfg] = spec

    # Save to globals and to disk
    globals()["reref_plan_by_config"] = plan

    try:
        out_json = os.path.join(summary_path, "mne_rereference_plan.json")
        with open(out_json, "w", encoding="utf-8") as f:
            json.dump(plan, f, indent=2, ensure_ascii=False)
        saved_msg = f"Saved JSON:\n  - {out_json}"
    except Exception as e:
        saved_msg = f"(JSON export skipped: {e})"

    with out_plan:
        clear_output()
        print("✅ Re-reference plan saved to variable: reref_plan_by_config")
        print(saved_msg)
        print("\nSummary / validation:")
        for m in messages:
            print(" - " + m)

btn_save_plan.on_click(on_save_plan)



Accordion(children=(HBox(children=(VBox(children=(RadioButtons(description='Mode:', layout=Layout(width='330px…

HBox(children=(Button(button_style='success', description='Save re-reference plan', icon='save', style=ButtonS…

Output()

In [30]:
# Inputs expected
if 'selected_by_config_canonical' not in globals():
    raise RuntimeError("selected_by_config_canonical not found. Run the previous mapping step first.")
if 'TEN_TEN_LABELS' not in globals():
    TEN_TEN_LABELS = []  # optional

config_labels = list(selected_by_config_canonical.keys())

# Suggestions: union of 10–10 and each config channels (per config we’ll filter)
base_suggestions = set(TEN_TEN_LABELS)

# State storage
state_by_cfg = {}  # cfg -> dict(mode_radio, combo, add_btn, list_box, info_html)

def build_panel_for_config(cfg_label):
    """
    Left: configuration channels (read-only).
    Right: mode (None/Average/Custom) and a Combobox-based multi-pick for Custom.
    """
    cfg_channels = sorted(selected_by_config_canonical[cfg_label])
    # Suggestions for this config = its channels + 10-10
    suggestions = sorted(base_suggestions.union(cfg_channels))

    # --- Left: show configuration channels ---
    left_title = widgets.HTML(f"<b>Configuration channels ({len(cfg_channels)}):</b>")
    left_list  = widgets.VBox(
        [widgets.HTML(", ".join(cfg_channels))],
        layout=widgets.Layout(max_height="150px", overflow="auto", border="1px solid #ddd", padding="6px")
    )
    left_box = widgets.VBox([left_title, left_list], layout=widgets.Layout(width="50%"))

    # --- Right: controls ---
    mode = widgets.RadioButtons(
        options=[("None (keep as-is)", "none"),
                 ("Average reference", "average"),
                 ("Custom reference (pick)", "custom")],
        value="none",
        description="Mode:",
        layout=widgets.Layout(width="330px")
    )

    # Combobox to add ONE ref channel at a time (free text + suggestions)
    combo = widgets.Combobox(
        options=suggestions,
        value="",
        placeholder="Type or pick a reference channel…",
        ensure_option=False,   # allow values outside suggestions
        layout=widgets.Layout(width="260px")
    )
    add_btn = widgets.Button(description="Add", button_style="primary", tooltip="Add channel to custom reference list")

    # A list of currently chosen reference channels (with removable buttons)
    chosen_box = widgets.VBox([], layout=widgets.Layout(
        max_height="200px", overflow="auto", border="1px solid #ddd", padding="6px", width="260px"
    ))
    chosen_label = widgets.HTML("<b>Custom reference channels:</b>")

    # Info / validation
    info = widgets.HTML("<i>Select re-reference mode. For 'Custom', add channels using the combobox.</i>")

    # Helper to (re)build the chosen list UI
    chosen = []  # Python list of strings (unique)
    def refresh_chosen_box():
        # Clear and re-create rows with a small remove (×) button
        rows = []
        for ch in chosen:
            rm_btn = widgets.Button(description="×", tooltip=f"Remove {ch}", layout=widgets.Layout(width="28px"))
            lbl = widgets.Label(ch)
            def make_rm(target=ch):
                def _(_b):
                    if target in chosen:
                        chosen.remove(target)
                        refresh_chosen_box()
                return _
            rm_btn.on_click(make_rm())
            rows.append(widgets.HBox([rm_btn, lbl]))
        chosen_box.children = rows

    # Add channel from combobox
    def on_add(_):
        ch = (combo.value or "").strip()
        if not ch:
            return
        # Deduplicate
        if ch not in chosen:
            chosen.append(ch)
            chosen.sort()
            refresh_chosen_box()
        combo.value = ""  # clear input for next entry

    add_btn.on_click(on_add)

    # Enable/disable custom area by mode
    def set_custom_enabled(enabled: bool):
        combo.disabled = not enabled
        add_btn.disabled = not enabled
        # You can still view/remove chosen even if disabled; leave chosen_box enabled.

    def on_mode_change(change):
        if change["name"] == "value":
            m = change["new"]
            if m == "none":
                info.value = "<i>No re-referencing will be applied for this configuration.</i>"
                set_custom_enabled(False)
            elif m == "average":
                info.value = "<i>MNE: <code>raw.set_eeg_reference('average')</code>.</i>"
                set_custom_enabled(False)
            else:
                info.value = "<i>Pick one or more channels to use as reference (MNE: <code>raw.set_eeg_reference(ref_channels=[...])</code>).</i>"
                set_custom_enabled(True)

    mode.observe(on_mode_change, names="value")
    set_custom_enabled(False)  # start in "none"

    right_top = widgets.VBox([mode, info])
    right_custom = widgets.VBox([
        widgets.HBox([combo, add_btn]),
        chosen_label,
        chosen_box
    ])
    right_box = widgets.VBox([right_top, right_custom], layout=widgets.Layout(width="50%"))

    # Store state (chosen list lives in closure but also store handle for save)
    state_by_cfg[cfg_label] = {
        "mode": mode,
        "combo": combo,
        "add_btn": add_btn,
        "chosen_list_ref": chosen,   # the Python list to read at save time
        "config_channels": cfg_channels,
        "info": info
    }

    return widgets.HBox([left_box, right_box], layout=widgets.Layout(gap="16px", align_items="flex-start"))

# Build accordion
panels = [build_panel_for_config(cfg) for cfg in config_labels]
acc_reref = widgets.Accordion(children=panels)
for i, cfg in enumerate(config_labels):
    acc_reref.set_title(i, f"{cfg} — re-reference")
display(acc_reref)

# Save button
btn_save_plan = widgets.Button(description="Save re-reference plan", button_style="success", icon="save")
out_plan = widgets.Output()
display(widgets.HBox([btn_save_plan]), out_plan)

def validate_choice(cfg_label, mode, ref_chans, cfg_channels):
    """Return (ok, message). Warn if custom refs not present in this config."""
    if mode == "custom":
        if len(ref_chans) == 0:
            return False, "Please add at least one reference channel for 'Custom' mode."
        missing = [ch for ch in ref_chans if ch not in cfg_channels]
        if missing:
            return False, f"Some chosen reference channels are not in this configuration: {missing}"
    return True, "OK"

def on_save_plan(_=None):
    """
    Build a dict ready for MNE re-referencing, per configuration:
      - 'none'
      - 'average'
      - 'custom' + ref_channels: [...]
    Export JSON for later reuse.
    """
    plan = {}
    messages = []
    ok_all = True

    for cfg in config_labels:
        st = state_by_cfg[cfg]
        mode = st["mode"].value
        refs = list(st["chosen_list_ref"])  # copy
        cfg_chs = st["config_channels"]

        ok, msg = validate_choice(cfg, mode, refs, cfg_chs)
        if not ok:
            ok_all = False
        messages.append(f"{cfg}: {msg}")

        if mode == "none":
            spec = {"mode": "none"}
        elif mode == "average":
            spec = {"mode": "average"}
        else:
            spec = {"mode": "custom", "ref_channels": refs}

        plan[cfg] = spec

    globals()["reref_plan_by_config"] = plan

    # Export JSON
    try:
        out_json = os.path.join(summary_path, "mne_rereference_plan.json")
        with open(out_json, "w", encoding="utf-8") as f:
            json.dump(plan, f, indent=2, ensure_ascii=False)
        saved_msg = f"Saved JSON:\n  - {out_json}"
    except Exception as e:
        saved_msg = f"(JSON export skipped: {e})"

    with out_plan:
        clear_output()
        print("✅ Re-reference plan saved to variable: reref_plan_by_config")
        print(saved_msg)
        print("\nSummary / validation:")
        for m in messages:
            print(" - " + m)
        if not ok_all:
            print("\n⚠️ Please fix the warnings above before applying to MNE.")

btn_save_plan.on_click(on_save_plan)

Accordion(children=(HBox(children=(VBox(children=(HTML(value='<b>Configuration channels (4):</b>'), VBox(child…

HBox(children=(Button(button_style='success', description='Save re-reference plan', icon='save', style=ButtonS…

Output()

In [None]:
# ---- Optional helper to apply plan to an MNE Raw ----
def apply_mne_rereference(raw, cfg_label, plan):
    """
    Apply re-referencing to an MNE Raw object according to 'plan' for the given configuration.
    Example:
        raw = apply_mne_rereference(raw, cfg_label, reref_plan_by_config)
    """
    spec = plan.get(cfg_label, {"mode": "none"})
    mode = spec.get("mode", "none")

    if mode == "none":
        return raw  # nothing to do

    if mode == "average":
        projection = bool(spec.get("projection", False))
        raw.set_eeg_reference('average', projection=projection)
        return raw

    if mode == "custom":
        ref_channels = spec.get("ref_channels", [])
        # In MNE, projection is only for 'average'; ref_channels imposes immediate reref
        raw.set_eeg_reference(ref_channels=ref_channels)
        return raw

    # Fallback: do nothing if unknown
    return raw