# Check export parameters from .edf files of an EEG database
---
EDF is the european data format for storage of multichannel biological and physical signals (EEG/EMG/ECG.. but also intracranial data https://www.edfplus.info/ \
It was published in 1992, and an updgraded version in 2003 (to add possibilities of discontinuous recordings, annotations, stimuli and events in a UTF-8 format). \
It is a compressed 16-bit format (meaning that each measured data point can take 2^16 values between the min/max of a dynamic range.

---
At the exportation in the .edf format (with software like compumedics), many parameters need to be set per recorded channels, such as the channels configuration, sampling frequency, filtering, units, dynamic range. \
Exporting as .edf format is tedious and time consuming, and mistakes in parameters can easily be made.  \
To avoid making mistakes, there is the possibility of implmenting routines within some softwares (insert a link of how to make a routine in compumedic). \
Inspecting those parameters can also be necessary if you want to work on an already existing dataset, to make sure that every participant's data is exploitable. 

---
The script directly reads information from headers of the .edf files, instead of relying on existing packages such as MNE python or pyedflib.  \
MNE python is not returning all the informations from headers (such as boundaries of dynamic range, filtering parameters if there are different filters for different channel types).\
pyedflib is too rigid and some headers are not read (in the ICEBERG database at least).\
\
To use this notebook, read markdown cells, then run code cells and read the ouput below.
**The script will save summary dataframes as .tsv file so that you can visually inspect (or reload later) if needed.** Those summaries will be stored in a summary folder within the study folder. \
\
\
It was developped on the ICEBERG database and tested on APOMORPHEE (from Noémie's internship).  \
last update 30/06/2025, YN

## 0. Import packages and define custom functions

If you are missing one package (getting an error when you import it), there is two solutions: \
1. Install within jupyter notebook:
- run in a new cell "%conda install nom_du_package --yes"
- re-run the import cell

2. Install within a terminal: 
- go back to your terminal
- enter the virtual environment you are working in (from where you installed jupyter notebook) with "conda activate my_virtual_environment"
- run: "conda install -k my_package"
- restart the kernel of the jupyter notebook

In [1]:
import os
from pathlib import Path
import pandas as pd
import chardet
import re
from ipyfilechooser import FileChooser
import ipywidgets as widgets
import warnings

# custom function to detect automatically and return the encoding of edf file (from chatGPT)
def detect_encoding(byte_string, min_confidence=0.6):
    result = chardet.detect(byte_string)
    encoding = result['encoding']
    confidence = result['confidence']
    if encoding is None or confidence < min_confidence:
        raise UnicodeDecodeError("chardet", byte_string, 0, len(byte_string),
                                 f"\tUnable to reliably detect encoding. Detected: {encoding} with confidence {confidence}")
    return encoding

# custom function to read information from EDF headers, without using the pyedflib package (that was too strict for ICEBERG)
# EDF file should follow a strict format dedicating specific number of octets for each type of information.
# it means that we can read the info octect by octet by specifying the number of octet we expect for the next variable (that is known from the EDF norm)
def read_edf_header_custom(file_path):
    with open(file_path, 'rb') as f: # open the file in binary mode, to read octet by octet. 
        header = {}
        # detect encoding
        raw_header = f.read(256)
        encoding = detect_encoding(raw_header)
        print(f"\tDetected encoding for {file_path} : {encoding}")
        # Rewind to beginning
        f.seek(0)
        
        # the first 256 octets are subject global info
        header['version'] = f.read(8).decode(encoding).strip()
        header['patient_id'] = f.read(80).decode(encoding).strip()
        header['recording_id'] = f.read(80).decode(encoding).strip()
        header['start_date'] = f.read(8).decode(encoding).strip()
        header['start_time'] = f.read(8).decode(encoding).strip()
        header['header_bytes'] = int(f.read(8).decode(encoding).strip())
        header['reserved'] = f.read(44).decode(encoding).strip()
        header['n_data_records'] = int(f.read(8).decode(encoding).strip())
        header['duration_data_record'] = float(f.read(8).decode(encoding).strip())
        header['n_channels'] = int(f.read(4).decode(encoding).strip())
        
        # get info per channel
        n = header['n_channels']
        channel_fields = {
            'channel': [],
            'transducer_type': [],
            'dimension': [],
            'physical_min': [],
            'physical_max': [],
            'digital_min': [],
            'digital_max': [],
            'prefiltering': [],
            'sampling_frequency': [],
            'reserved': [],
        }

        for key in channel_fields:
            length = {
                'channel': 16,
                'transducer_type': 80,
                'dimension': 8,
                'physical_min': 8,
                'physical_max': 8,
                'digital_min': 8,
                'digital_max': 8,
                'prefiltering': 80,
                'sampling_frequency': 8,
                'reserved': 32,
            }[key]
            channel_fields[key] = [f.read(length).decode(encoding).strip() for _ in range(n)]

        header.update(channel_fields)
    
    return header

# function to extract filters information from the string in headers
def extract_filter_value(s, tag):
    if pd.isna(s):
        return None
    match = re.search(rf'{tag}[:\s]*([\d\.]+)\s*', s, re.IGNORECASE)
    return float(match.group(1)) if match else None

# custom function to get the sampling frequency out of a dataframe (the df needs to have 'subject' and 'channel' as column)
def get_sf(df, subject, channel):
    df_sf = df[(df['subject'] == subject) & (df['channel'] == channel)]
    if not df_sf.empty:
        return df_sf.iloc[0]['sampling_frequency']
    else:
        return None

# function to create a widget slider to select configuration to inspect
def mk_config_slider(value = 1, min = 1, max = 5):
    config_slider = widgets.IntSlider(
    value=value,
    min=min,
    max=max,
    step=1,
    description='Selected configuration:',
    style={'description_width': '150px'},  # augmente la largeur de la description
    layout=widgets.Layout(width='400px'),   # ajuste la taille totale du widget si besoin
    disabled=False,
    continuous_update=False,
    orientation='horizontal',
    readout=True,
    readout_format='d'
    )
    return config_slider

# function to print the configuration of a dataset parameter
def print_config(i, config_dict, param):
    # get the key and value from dictionnary
    idx = i - 1
    # get participant ID
    value = list(config_dict.values())  
    v = value[idx]  
    # get configuration
    key = list(config_dict.keys())
    k = key[idx]
    
    # print info
    print(f'Selected configuration: # {i}')
    print(f'\t{param}: {k}')
    print(f'\t{len(v)} participants: {v}')


## 1. Select data folder and get the file list of your database
The first cell below will open a widget to select the folder containing your data. 

In [2]:
chooser = FileChooser(os.getcwd())
chooser.title = "<b>Choose your study folder</b>"
chooser.show_only_dirs = True
display(chooser)

FileChooser(path='/Users/thandrillon/WorkGit/Dream-Toolkit/python/Check_EDF', filename='', title='<b>Choose yo…

In [3]:
folder_path = chooser.selected_path

# get the edf file list 
edf_files = list(Path(folder_path).rglob('*.edf'))

# initialyse list of dataframe to store file info, that will be concatenated at the end (this is better for performance)
df_list = []

# check the existence and/or create the summary folder that will receive the summary tables and the report
summary_path = f'{folder_path}/summary'
if not os.path.exists(summary_path):
    os.makedirs(summary_path)

## 2. Loop over the edf file list to extract parameters from each participant

In [5]:
# intialyse empty list for file that could not be read
failed_list = []

for e, edf_path in enumerate(edf_files):
    print(f'file {e+1}/{len(edf_files)}, currently opening file: {edf_path}')
    # read file with the custom function
    try:
        edf_header = read_edf_header_custom(edf_path) 
        
        # get subject name (corresponding to file_name)
        sub_name = edf_path.stem
        
        # get subject group (from the parent folder, in ICEBERG database subfolder were created per patient group)
        sub_folder = edf_path.parent.name # get the parent folder of the subject file (path)
        
        # create df from signal info
        df = pd.DataFrame(edf_header)
            
        # theoretical resolution (edf are 16bit files so the eeg signal can take 2^16 values within the dynamic range)
        df['res_theoretical'] = (abs(pd.to_numeric(df['physical_min']))+abs(pd.to_numeric(df['physical_max'])))/pow(2,16)
        # turn theoretical resolution to uV if dimension is mV (if no dimension, it is a mess)
        df.loc[df['dimension'].str.contains('mv', case=False, na=False), 'res_theoretical'] *= 1000
        
        # get filtering info in different columns
        df['lowpass']   = df['prefiltering'].apply(lambda x: extract_filter_value(x, 'LP'))
        df['highpass']  = df['prefiltering'].apply(lambda x: extract_filter_value(x, 'HP'))
        df['notch']  = df['prefiltering'].apply(lambda x: extract_filter_value(x, 'NOTCH'))
        
        # add subject info in the dataframe
        df['subject'] = sub_name
        df['group'] = sub_folder
        df['path'] = str(edf_path)
        
        # select only the columns of interest
        df = df[['subject', 'group', 'path', 'channel', 'transducer_type', 'dimension', 'sampling_frequency', 
             'highpass', 'lowpass', 'notch', 'physical_min', 'physical_max', 'res_theoretical']]
        
        # store subject data
        df_list.append(df)

    except UnicodeDecodeError as e:
        print(f"[⚠️] Encoding problem for {edf_path}")
        failed_list.append((edf_path, 'encoding'))
    except Exception as e:
        print(f"[❌] Unkown problem for {edf_path} : {e}")
        failed_list.append((edf_path, 'other'))
   
# concatenate dataframe into one and only
with warnings.catch_warnings(): # this is to skip a warning not affecting our operation
    warnings.simplefilter("ignore", FutureWarning)
    df_full = pd.concat(df_list, ignore_index=True)

# save summary table containing full info
df_full.to_csv(f'{summary_path}/full_summary_table_edf.tsv', sep = '\t')
print(f'\nSaving full informations from headers of the dataset to:\n{summary_path}/full_summary_table_edf.tsv')

# save the failed list if not empty:
failed_df = pd.DataFrame(failed_list)
if not failed_df.empty:
    failed_df.to_csv(f'{summary_path}/failed_edf_read.tsv', sep = '\t')
    print(f'\nSaving the list of files that could not be read to: \n{summary_path}/failed_edf_read.tsv')    

file 1/68, currently opening file: /Users/thandrillon/Data/Apomorphee/data/17_N2.edf
	Detected encoding for /Users/thandrillon/Data/Apomorphee/data/17_N2.edf : ascii
file 2/68, currently opening file: /Users/thandrillon/Data/Apomorphee/data/21_N2.edf
	Detected encoding for /Users/thandrillon/Data/Apomorphee/data/21_N2.edf : ascii
file 3/68, currently opening file: /Users/thandrillon/Data/Apomorphee/data/2_N1.edf
	Detected encoding for /Users/thandrillon/Data/Apomorphee/data/2_N1.edf : ascii
file 4/68, currently opening file: /Users/thandrillon/Data/Apomorphee/data/33_N2.edf
	Detected encoding for /Users/thandrillon/Data/Apomorphee/data/33_N2.edf : ascii
file 5/68, currently opening file: /Users/thandrillon/Data/Apomorphee/data/17_N1.edf
	Detected encoding for /Users/thandrillon/Data/Apomorphee/data/17_N1.edf : ascii
file 6/68, currently opening file: /Users/thandrillon/Data/Apomorphee/data/21_N1.edf
	Detected encoding for /Users/thandrillon/Data/Apomorphee/data/21_N1.edf : ascii
file 7

## 3. Inspect dataset general info (# participants, groups, recorded sensors)

In [6]:
print(f'\n>>> There is {len(df_full["subject"].unique())} participants in your dataset, within {len(df_full["group"].unique())} group(s): {df_full["group"].unique()} <<<')
print(df_full.drop_duplicates().groupby('group').agg(n_subjects=('subject', 'nunique')))


>>> There is 68 participants in your dataset, within 2 group(s): ['data' 'bin'] <<<
       n_subjects
group            
bin             4
data           64


In [7]:
print('\nFull recorded sensors configuration of your database (across participants): ')
print(*df_full['channel'].unique(), sep='\n')


Full recorded sensors configuration of your database (across participants): 
F4-M1
C4-M1
C3-M2
O2-M1
EMG_L
ECG
Ronfl
Flux
Therm
Thor
Abdo
Sum
SpO2
FC
Pos
Jambe_R
Jambe_L
EDF Annotations
F4
C3
C4
O2
M1
M2
Chin1
Chin2
ECG1
ECG2
Fp1
O1
A2
EOG G
EOG D
EMG 1
EMG 2
EMG JD
EMG JG
Mic
Thermistance
Thoracic
Abdominal
Pulse
Pleth
Position
EEG F2
EEG C4
EEG O2
EEG T4
EEG F1
EEG C3
EEG O1
EEG T3
EEG A2
MO-D
MO-G
MENTON
JAM-Dt
JAM-Ga
SAT
ABD
THO
RONF
PRES
THERM
PTT
Pouls
FCbb
POS
E1-M2
E2-M2
F3-M2
O1-M2
EMG_R
Pl?th
E1
E2
F3
Chin3
LLeg1-LLeg2
RLeg1-RLeg2


## 4. Inspect EEG and EOG channels only 

### 4.1 Select only the EEG/EOG channels

In [9]:
# select only EEG and EOG channels and return a warning if the number of participant is smaller/higher
mask_ch = df_full['transducer_type'].str.contains(r'EEG|EOG|AGAGCL ELECTRODE', na=False) # create a mask that returns true for lines containing either EEG/EOG/EMG/ECG in the transducer_type column
df_ch = df_full[mask_ch]
# remove the emg channels that were captured with the AGAGCL ELECTRODE transducer type 
df_ch = df_ch[~df_ch['channel'].str.contains(r'emg|ecg', case=False, na=False)] # the ~ allows to not select the selection (like ! in matlab)

# Check if the number of participants with only EEG/EOG is the same as df_full. 
# If not, it might be because the transducer type was no correctly detected. 
# One possibility is to add the type of transducer to the condition line 2 of this cell.
if len(df_full['subject'].unique()) > len(df_ch['subject'].unique()):
    # identify missing subjects
    missing_sub = set(df_full['subject'].unique()) - set(df_ch['subject'].unique())
    print('\n!!! There is less participants in the dataset with only EEG/EOG channels !!!')
    print(f'Missing participants: {missing_sub}')
    print("\nEither these participants don't have EEG/EOG channel.")
    print("Or the transducer type was not correctly detected.")
    # get df of missing sub to save and inspect
    df_miss = df_full[df_full['subject'].isin(missing_sub)]
    df_miss.to_csv(f'{summary_path}/EEG-EOG_missing_edf.tsv', sep = '\t')
    print(f'\nSaving informations from missing participants to:\n{summary_path}/EEG-EOG_missing_edf.tsv')
    print('Please inspect the file, and specifically the column transducer_type')
elif len(df_full['subject'].unique()) < len(df_ch['subject'].unique()):
    print('\n!!! There is more participants in the dataset with only EEG/EOG channels !!!')
    print('This should not be the case.')
    print('Please inspect what is happening in a code editor (spyder..), or ask Yvan.')
    more_sub = set(df_ch['subject'].unique()) - set(df_full['subject'].unique())
    df_more = df_ch[df_ch['subject'].isin(more_sub)]
    df_more.to_csv(f'{summary_path}/EEG-EOG_suspect_edf.csv', sep = '\t')
    print(f'\nSaving informations from suspect participants to:\n{summary_path}/EEG-EOG_suspect_edf.tsv')

### 4.2 Inspect EEG/EOG channels configurations

There will be likely many channels configuration (especially with multicentric dataset).  \
\
For polysomnographic EEG, there should be at least:
- 4 EEG (Fp1, C3, O1 and A2 that will be used as the reference) (or less frequently Fp2, C4, O2 and A1) \
- 2 EOG (EOG D, EOG G)

Depending on the analysis you plan, if one configuration does not contain those electrodes you will need either to re-export the data or to exclude the participant. \
\
If the channel label is Fp1-A2, it means that your data is already re-referenced to A2.

In [10]:
# get the channels configuration per participant 
ch_per_sub = df_ch.groupby('subject')['channel'].apply(lambda x: tuple(sorted(set(x))))

# identify the channel configuration of each participant and store them in a dict to print per channel config
ch_config_dict = {}
for config in ch_per_sub.unique():
    sub = ch_per_sub[ch_per_sub == config].index.tolist()
    ch_config_dict[config] = sub

if len(ch_config_dict) > 1:
    print('\n>>> There is multiple channels configurations in your dataset! <<<')    
    print(f'\n\nNumber of different channels configuration: {len(ch_config_dict)}\n')
else:
    print('\n>>> There is only one channels configuration in your dataset! <<<')

# # print info per channel configuaration
# for i, (config, participants) in enumerate(ch_config_dict.items(), 1):
#     print(f'Configuration #{i} ({len(participants)} participants):')
#     print(f'Channels ({len(config)}) : {config}\n')


>>> There is multiple channels configurations in your dataset! <<<


Number of different channels configuration: 9

Configuration #1 (26 participants):
Channels (6) : ('A2', 'C3', 'EOG D', 'EOG G', 'Fp1', 'O1')

Configuration #2 (2 participants):
Channels (6) : ('C3-M2', 'C4-M1', 'E1-M2', 'E2-M2', 'F4-M1', 'O2-M1')

Configuration #3 (13 participants):
Channels (4) : ('C3-M2', 'C4-M1', 'F4-M1', 'O2-M1')

Configuration #4 (7 participants):
Channels (6) : ('C3', 'C4', 'F4', 'M1', 'M2', 'O2')

Configuration #5 (6 participants):
Channels (8) : ('C3-M2', 'C4-M1', 'E1-M2', 'E2-M2', 'F3-M2', 'F4-M1', 'O1-M2', 'O2-M1')

Configuration #6 (2 participants):
Channels (10) : ('C3', 'C4', 'E1', 'E2', 'F3', 'F4', 'M1', 'M2', 'O1', 'O2')

Configuration #7 (1 participants):
Channels (6) : ('C3-M2', 'C4-M1', 'F3-M2', 'F4-M1', 'O1-M2', 'O2-M1')

Configuration #8 (1 participants):
Channels (8) : ('C3', 'C4', 'F3', 'F4', 'M1', 'M2', 'O1', 'O2')

Configuration #9 (10 participants):
Channels (9) : ('EEG A2',

To inspect the participants ID in one configuration, run the cell just below after changing the parameter  "configuration_to_inspect".

In [11]:
# widget to select the configuration of interest
config_ch_slider = mk_config_slider(value = 1, min = 1, max = len(ch_config_dict))

# print the configuration selected
# interact with the slider output through the printing function 
widgets.interact(lambda i: print_config(i, config_dict=ch_config_dict, param="Channels"), i=config_ch_slider);

interactive(children=(IntSlider(value=1, continuous_update=False, description='Selected configuration:', layou…

### 4.3 Inspect sampling frequency

Ideally, you expect to have only one sampling frequency for all the channels and participants. \
In practice, you might have different sampling frequencies across participants (especially with multicentric dataset), and 2 sampling frequency within participants (one for EEG, the other for EOG).  \
\
Each EEG analysis software handles multiple sampling frequencies within participants differently. For example:
- MNE python will automatically upsample channels to the highest sampling frequency
- Fieldtrip will load only a subset of channels (with the sampling frequency the most represented)   

In [13]:
# the sampling frequency configuration
sf_per_sub = df_ch.groupby('subject')['sampling_frequency'].apply(lambda x: tuple(sorted(set(x))))
# identify the sampling frequency configuration of each participant and store them in a dict to print per sampling configuration config
sf_config_dict = {}
for config in sf_per_sub.unique():
    sub = sf_per_sub[sf_per_sub == config].index.tolist()
    sf_config_dict[config] = sub

# print info per sf configuration (maybe print it only for multiple config)
if len(sf_config_dict) > 1:
    print('\n>>> There is multiple sampling frequency configurations in your dataset! <<<')    
    print(f'\n\nNumber of different sampling frequency configuration: {len(sf_config_dict)}\n')
    for s, sf in enumerate(df_ch['sampling_frequency'].unique()):
        # select only rows with the current sf
        df_sf = df_ch[df_ch['sampling_frequency'] == sf].copy()
        print(f'\n{sf} Hz: {df_sf["channel"].unique()}')
else:
    print(f'\n>>> There is only one sampling frequency configuration in your dataset: {df_ch['sampling_frequency'].unique()} <<<')

# print('\nSampling frequency configurations:\n')
# for i, (config, participants) in enumerate(sf_config_dict.items(), 1):
#     print(f'Configuration #{i} ({len(participants)} participants):')
#     print(f'Sampling frequency ({len(config)}) : {config}\n')


>>> There is multiple sampling frequency configurations in your dataset! <<<


Number of different sampling frequency configuration: 4


Sampling frequency configurations:

Configuration #1 (37 participants):
Sampling frequency (1) : ('256',)

Configuration #2 (21 participants):
Sampling frequency (1) : ('200',)

Configuration #3 (9 participants):
Sampling frequency (1) : ('512',)

Configuration #4 (1 participants):
Sampling frequency (2) : ('128', '256')



To inspect the participants ID in one configuration, run the cell just below after changing the parameter  "configuration_to_inspect".

In [14]:
# widget to select the configuration of interest
config_sf_slider = mk_config_slider(value = 1, min = 1, max = len(sf_config_dict))

# print the configuration selected
# interact with the slider output through the printing function 
widgets.interact(lambda i: print_config(i, config_dict=sf_config_dict, param="Sampling frequencies"), i=config_sf_slider);

interactive(children=(IntSlider(value=1, continuous_update=False, description='Selected configuration:', layou…

### 4.4 Inspect filtering parameters

Ideally with EEG sleep data, you want no lowpass nor notch filter, and a very low highpass filter around 0.05 Hz (to remove slow drift in long recordings).

In [15]:
if len(df_ch['highpass'].unique())+len(df_ch['lowpass'].unique())+len(df_ch['notch'].unique()) == 3:
    print('\n>>> All channels have the same filtering parameters! <<<')
elif len(df_ch['highpass'].unique())+len(df_ch['lowpass'].unique())+len(df_ch['notch'].unique()) > 3:
    print('\n>>> Filtering parameters are not fully consistent across the dataset! <<<')
else:
    print('\n>>> There may have been a problem in reading the filtering parameters. Here is the output: <<<')

# Get the list of participants with different filtering parameters
# 1st replace NaN because groupby does not like NaN
df_filt = df_ch.copy()
df_filt[['lowpass', 'highpass', 'notch']] = df_filt[['lowpass', 'highpass', 'notch']].fillna('missing')

config_filters = (
    df_filt.groupby(['lowpass', 'highpass', 'notch'])['subject']
    .apply(lambda x: sorted(set(x)))
    .reset_index(name = 'subjects')
)

# print filter configuration
print(f'\n\nNumber of different filters configurations: {len(config_filters)}\n')
# print('\nFilters configurations: ')
# r=1
# for row in config_filters.itertuples(index=False):
#     print(f'Configuration #{r} ({len(row.subjects)} participants)')
#     print(f'highpass: {row.highpass}, lowpass: {row.lowpass}, notch: {row.notch}\n')
#     r=r+1



>>> Filtering parameters are not fully consistent across the dataset! <<<


Number of different filters configurations: 4


Filters configurations: 
Configuration #1 (26 participants)
highpass: 0.05, lowpass: 0.0, notch: 0.0

Configuration #2 (1 participants)
highpass: 0.17, lowpass: 0.0, notch: 0.0

Configuration #3 (10 participants)
highpass: 10.0, lowpass: 100.0, notch: 50.0

Configuration #4 (43 participants)
highpass: missing, lowpass: missing, notch: missing



To inspect the participants ID in one configuration, run the cell just below after changing the parameter  "configuration_to_inspect".

In [16]:
# widget to select the configuration of interest
config_filter_slider = mk_config_slider(value = 1, min = 1, max = len(config_filters))

# function to rpint filters configurations
def print_filters(config_slider):
    # get the info from the dataframe
    idx = config_slider - 1
    sID = config_filters.iloc[idx]['subjects']
    hpass = config_filters.iloc[idx]['highpass']
    lpass = config_filters.iloc[idx]['lowpass']
    notch = config_filters.iloc[idx]['notch']
    
    # print info
    print(f'Selected configuration: # {config_slider}')
    print(f'\tFilters configuration: highpass: {hpass}; lowpass: {lpass}; notch: {notch}')
    print(f'\tParticipants: {sID}')

widgets.interact(print_filters, config_slider = config_filter_slider);

interactive(children=(IntSlider(value=1, continuous_update=False, description='Selected configuration:', layou…

### 4.5 Inspect units in the dataset

At the exportation, channels can be imported in different units. \
Each analysis software will handle units differently, so it can be helpful to know which units your dataset contains. 
- MNE python will automatically detect the units and convert the data to Volt. However, if the unit is not read correctly, the data will **not** be converted (e.g. "UV" is not interpredted as uV, therefore data are not converted to Volt )
- fieldtrip is loading the data with their unit of origin, so you might want to convert all channels to the same unit before your analysis  

In [17]:
if len(df_ch['dimension'].unique()) == 1:
    print(f'\n>>> All channels have the same unit: {df_ch["dimension"].unique()} <<<\n')
elif len(df_ch['dimension'].unique()) > 1:
    print('\n>>> Multiple units were found! <<<\n')
    print(f'\n\tNumber of different units configurations: {len(df_ch['dimension'].unique())}\n')
    print('Quick overlook of channels associated to units:')
    for u, unit in enumerate(df_ch['dimension'].unique()):
        # select only rows with the current sf
        df_unit = df_ch[df_ch['dimension'] == unit].copy()
        print(f'\n{unit}: {df_unit["channel"].unique()}')

# print the different configuration of units 
# if info about sf configuration is needed
unit_per_sub = df_ch.groupby('subject')['dimension'].apply(lambda x: tuple(sorted(set(x))))
ch_per_unit = df_ch.groupby('dimension')['channel'].apply(lambda x: tuple(sorted(set(x))))
# identify the sampling frequency configuration of each participant and store them in a dict to print per sampling configuration config
unit_config_dict = {}
for config in unit_per_sub.unique():
    sub = unit_per_sub[unit_per_sub == config].index.tolist()
    unit_config_dict[config] = sub

# # print info per sf configuration
# print('\nUnits configurations:')
# for i, (config, participants) in enumerate(unit_config_dict.items(), 1):
#     print(f'Configuration #{i} ({len(participants)} participants):')
#     print(f'Unit ({len(config)}) : {config}\n')
#     # print(f"Participants : {participants}\n")


>>> Multiple units were found: <<<


Units configurations:
Configuration #1 (26 participants):
Unit (2) : ('mV', 'uV')

Configuration #2 (42 participants):
Unit (1) : ('uV',)



To inspect the participants ID in one configuration, run the cell just below after changing the parameter  "configuration_to_inspect".

In [18]:
# widget to select the configuration of interest
config_unit_slider = mk_config_slider(value = 1, min = 1, max = len(unit_config_dict))

# print the configuration selected
# interact with the slider output through the printing function 
widgets.interact(lambda i: print_config(i, config_dict=unit_config_dict, param="Units"), i=config_unit_slider);

interactive(children=(IntSlider(value=1, continuous_update=False, description='Selected configuration:', layou…

### 4.6 Inspect signal inversion

Some softwares (e.g. profusion from compumedics) allows to invert the polarity of the exported data. It can be extremely confusing and can lead to wrong results. \
Here, we inspect if the signal is inverted by checking if the minimum physical boundary is higher than the maximum physical boundary. \
For .edf file, the physical boundaries are values that are set when exporting the data by specifying the scale of the data. \
\
In profusion (from compumedics) a scale of 1mV will lead to a min physical boundary of -500 uV and a max physical boundary of +500 uV.\
\
For other EEG format and software, the dynamical range might be set before recording (e.g. to be specified in the montage) and can't be changed at the exportation.

In [20]:
# select rows where the physical min is greater than the physical max
df_inv = df_ch[df_ch['physical_min'] > df_ch['physical_max']]

if not df_inv.empty:
    print('\n>>> Inverted polarity detected ! <<<')
    print(str(df_inv.shape[0]) + ' channels have a inverted polarity (from ' + str(len(edf_files)) + ' edf files)')
    print(df_inv[['subject', 'channel', 'dimension', 'physical_min', 'physical_max']])
else:
    print('\n>>> No inverted polarity was detected <<<')
df_inv.to_csv(f'{summary_path}/inverted_polarity_edf.tsv', sep = '\t')
print(f'\nSaving informations from inverted polarity channels to:\n{summary_path}/inverted_polarity_edf.tsv \n(will be empty if no inverted polarity)')



>>> Inverted polarity detected ! <<<
1 channels have a inverted polarity (from 68 edf files)
     subject channel dimension physical_min physical_max
1442   27_N2      F4        uV       500.00      -500.00

Saving informations from inverted polarity channels to:
/Users/thandrillon/Data/Apomorphee/data/summary/inverted_polarity_edf.tsv 
(will be empty if no inverted polarity)


### 4.7 Inspect theoretical resolution

The theoretical resolution of .edf file is the minimum amplitude variation that can be recorded between two samples. \
Since .edf file is a 16-bit compressed format (meaning that a datapoint can take 2^16 value between a min and a max), we computed the theoretical resolution by dividing the dynamical range (upper/lower physical boundaries)  by 2^16. \
Ideally, resolution should be around 0.01 uV (or lower). \
\
Hence, to improve the theoretical resolution, we can reduce the min and max values of the dynamic range. However, reducing the dynamic range can lead to loss of data information (because the signal can't get higher or lower than the boundaries), an issue called signal clipping.\
\
Below, we detect channels that have a resolution higher than 0.1 uV.\
You can change the resolution threshold with the widget:

In [24]:
# res_theo have been converted to uV, but if dimension was not read or not indicated in the headers, it might not work. I might need to add something more robust
r_thres = widgets.BoundedFloatText(
    value=0.1,
    min=0,
    max=10.0,
    step=0.1,
    style={'description_width': '150px'},  # augmente la largeur de la description
    layout=widgets.Layout(width='230px'),   # ajuste la taille totale du widget si besoin
    description='Resolution threshold (uV):',
    disabled=False
);

# define a function to interact with the widget
def check_bad_res(threshold):
    r_mask = df_ch['res_theoretical'] >= threshold
    bad_res = df_ch[r_mask]
    
    if not bad_res.empty:
        print(f'\n>>> Poor resolution detected! (>= {threshold} uV) <<<')
        print(f'{bad_res.shape[0]} channels have a very poor resolution (from {len(edf_files)} edf files)')
        print(bad_res[['subject', 'channel', 'dimension', 'physical_min', 'physical_max', 'res_theoretical']])
    else:
        print(f'\n>>> No channel with a poor resolution (> {threshold} uV) was detected! <<<')
    bad_res.to_csv(f'{summary_path}/bad_resolution_edf.tsv', sep = '\t')
    print(f'\nSaving informations from bad resolution channels to:\n{summary_path}/bad_resolution_edf.tsv \n(will be empty if no bad resolution)')

widgets.interact(check_bad_res, threshold=r_thres);

interactive(children=(BoundedFloatText(value=0.1, description='Resolution threshold (uV):', layout=Layout(widt…

### 4.8 Inspect dynamic range

As mentionned above, a small dynamic range will lead to a good signal resolution, but can lead to signal clipping.\
Signal clipping happens when the signal reach the physical boundaries (min or max) and therefore is blocked at this value. It results in a loss of data information.\
\
Typical physiological EEG data (good quality) varies from +/-200 uV.\
Below, we check if the dynamic range physical boundaries are lower than 400 uV (+/- 200 uV). \
You can change the dynamic range threshold with the widget:

In [30]:
dr_thres = widgets.BoundedFloatText(
    value=400,
    min=0,
    max=2000,
    step=0.1,
    style={'description_width': '200px'},  # augmente la largeur de la description
    layout=widgets.Layout(width='270px'),   # ajuste la taille totale du widget si besoin
    description='Dynamic range threshold (uV):',
    disabled=False
);


def check_bad_dr(threshold):
    dr_mask = df_ch['res_theoretical']*pow(2,16) <= threshold
    bad_dr = df_ch[dr_mask]
    
    if not bad_dr.empty:
        print(f'\n>>> Small dynamic range detected! (<= {threshold} uV) <<<\n')
        print(f'{bad_dr.shape[0]} channels have a small dynamic range (from {len(edf_files)} edf files)')
        print(bad_dr[['subject', 'channel', 'dimension', 'physical_min', 'physical_max', 'res_theoretical']])
    else:
        print(f'\n>>> No channel with a small dynamic range(< {threshold} uV) was detected! <<<')
    bad_dr.to_csv(f'{summary_path}/bad_dynamic_range_edf.tsv', sep = '\t')
    print(f'\nSaving informations from bad dynamic range channels to:\n{summary_path}/bad_dynamic_range_edf.tsv \n(will be empty if no bad resolution)')

widgets.interact(check_bad_dr, threshold = dr_thres);

interactive(children=(BoundedFloatText(value=1e-06, description='Dynamic range threshold (uV):', layout=Layout…