## Load and get fs dict

In [3]:
import mne
from pathlib import Path
import h5py
import os 


edf_path = Path("/Users/alicealbrecht/Desktop/UCSF/pipeline_project/datasets/raw/MrOS/visit2/EDF/bi0014_20110706_EDF/bi0014_20110706.edf")
h5_path = os.path.join(os.getcwd(), 'subject_001_signals.h5')
raw = mne.io.read_raw_edf(edf_path, preload=True, verbose = False)


In [5]:
def get_channel_fs_dict(raw):
    n_samps = raw._raw_extras[0]['n_samps']
    ch_names = raw.ch_names

    # Durée d’enregistrement estimée à partir du canal le plus long
    max_n_samp = max(n_samps)
    global_fs = raw.info['sfreq']
    duration = max_n_samp / global_fs

    # Dictionnary
    fs_dict = {ch_name: n_samp / duration for ch_name, n_samp in zip(ch_names, n_samps)}

    return fs_dict

fs_dict = get_channel_fs_dict(raw)
for ch, fs in fs_dict.items():
    print(ch)

Position
LegL
LegR
C3
C4
M1
M2
E2
E1
ECGL
ECGR
LChin
RChin
Airflow
SUM
Chest
ABD
STAT
HR
SpO2
CannulaFlow
DHR


## Metadata

In [3]:
import numpy as np 

def save_metadata_to_h5(raw, h5_path):
    info = raw.info

    with h5py.File(h5_path, "w") as f:
        metadata_grp = f.require_group("metadata")
        metadata_grp.attrs["start_time"] = info['meas_date'].strftime('%Y-%m-%dT%H:%M:%S') if info['meas_date'] else "unknown"
        metadata_grp.attrs["sfreq_global"] = raw.info['sfreq']
        metadata_grp.attrs["duration"] = raw.n_times / raw.info['sfreq']

save_metadata_to_h5(raw, h5_path)

## EEG

In [4]:
def extract_eeg_channel(raw, fs_dict, target, refs):
    """
    Extract harmonized EEG signal (e.g., EEG_C3) from raw EDF data.

    Args:
        raw (mne.io.Raw): The raw MNE object.
        fs_dict (dict): Dictionary of {channel_name: fs}.
        target (str): Target channel, e.g., 'C3'.
        refs (list of str): Reference candidates, e.g., ['A2', 'M2'].

    Returns:
        signal (np.ndarray or None)
        attrs (dict): metadata (harmonized name, raw names, fs, type, subtract)
    """
    harmonized_name = f'EEG_{target}'
    ch_names = raw.ch_names

    # Bipolar case: Direct channel like 'C3-A2' exists
    for ch in ch_names:
        if ch.startswith(target + '-'):
            signal = raw.get_data(picks=ch)[0]
            fs = fs_dict.get(ch)
            return signal, {
                'harmonized_name': harmonized_name,
                'raw_names': [ch],
                'fs': fs,
                'type': 'eeg',
            }

    # Separate Case: Need to subtract (e.g., C3 - M2)
    for ref in refs:
        if target in ch_names and ref in ch_names:
            sig1 = raw.get_data(picks=target)[0]
            sig2 = raw.get_data(picks=ref)[0]
            min_len = min(len(sig1), len(sig2))
            signal = sig1[:min_len] - sig2[:min_len]

            fs = fs_dict.get(target)  # use fs of target electrode
            return signal, {
                'harmonized_name': harmonized_name,
                'raw_names': [target, ref],
                'fs': fs,
                'type': 'eeg',
            }

    # Case 3: Not found
    return None, {
        'harmonized_name': harmonized_name,
        'raw_names': [],
        'fs': None,
        'type': 'eeg',
    }

def save_eeg_signals_to_h5(raw, fs_dict, targets, h5_path):
    """
    Extract and save EEG signals (with subtraction if needed) into an HDF5 file.

    Args:
        raw (mne.io.Raw): The loaded raw EDF data.
        fs_dict (dict): Sampling frequency dict {channel_name: fs}.
        targets (dict): Dict of {target_channel: [list_of_refs]}.
        h5_path (str or Path): Path to output HDF5 file.
    """
    with h5py.File(h5_path, "a") as f:
        for target, refs in targets.items():
            signal, attrs = extract_eeg_channel(raw, fs_dict, target=target, refs=refs)

            if signal is not None:
                path = f"signals/EEG/{attrs['harmonized_name']}"
                dset = f.create_dataset(path, data=signal.astype("float32"))
                for k, v in attrs.items():
                    if isinstance(v, list):
                        dset.attrs[k] = [str(i) for i in v]
                    elif v is None:
                        dset.attrs[k] = "None"
                    else:
                        dset.attrs[k] = v
            else:
                print(f"⚠️ Could not extract {target}")
                
targets = {
    'C3': ['A2', 'M2'],
    'C4': ['A1', 'M1']
}
save_eeg_signals_to_h5(raw, fs_dict, targets, h5_path)

## ECG

In [5]:
import numpy as np
import re

def extract_ecg_channel(raw, fs_dict):
    """
    Extract harmonized ECG signal from raw EDF data.
    Handles both bipolar (e.g., 'ECG1-2') and unipolar (e.g., 'ECGL', 'ECGR').

    Args:
        raw (mne.io.Raw): MNE Raw EDF object.
        fs_dict (dict): Dictionary mapping {channel_name: sampling_frequency}.

    Returns:
        signal (np.ndarray or None)
        attrs (dict): metadata (harmonized name, raw names, fs, type)
    """
    ch_names = raw.ch_names
    harmonized_name = "ECG"

    # Case 1: Bipolar channel exists directly
    for ch in ch_names:
        if re.match(r"ECG.*-.*", ch, re.I):
            signal = raw.get_data(picks=ch)[0]
            fs = fs_dict.get(ch)
            return signal, {
                'harmonized_name': harmonized_name,
                'raw_names': [ch],
                'fs': fs,
                'type': 'ecg',
            }

    # Case 2: Separate leads exist (e.g., ECGL + ECGR or ECG1 + ECG2)
    lead_L_options = ['ECGL', 'ECG1']
    lead_R_options = ['ECGR', 'ECG2']

    lead_L = next((ch for ch in lead_L_options if ch in ch_names), None)
    lead_R = next((ch for ch in lead_R_options if ch in ch_names), None)

    if lead_L and lead_R:
        sig1 = raw.get_data(picks=lead_L)[0]
        sig2 = raw.get_data(picks=lead_R)[0]
        min_len = min(len(sig1), len(sig2))
        signal = sig1[:min_len] - sig2[:min_len]
        fs = fs_dict.get(lead_L) or fs_dict.get(lead_R)  # pick one, they should match

        return signal, {
            'harmonized_name': harmonized_name,
            'raw_names': [lead_L, lead_R],
            'fs': fs,
            'type': 'ecg',
        }

    # Case 3: Not found
    return None, {
        'harmonized_name': harmonized_name,
        'raw_names': [],
        'fs': None,
        'type': 'ecg',
    }

def save_ecg_signal_to_h5(raw, fs_dict, h5_path):
    """
    Extract and save ECG signal into an HDF5 file.

    Args:
        raw (mne.io.Raw): The loaded raw EDF data.
        fs_dict (dict): Sampling frequency dict {channel_name: fs}.
        h5_path (str or Path): Path to output HDF5 file.
    """
    signal, attrs = extract_ecg_channel(raw, fs_dict)

    with h5py.File(h5_path, "a") as f:  # Use "a" to append without overwriting
        if signal is not None:
            path = f"signals/ECG/{attrs['harmonized_name']}"
            dset = f.create_dataset(path, data=signal.astype("float32"))
            for k, v in attrs.items():
                if isinstance(v, list):
                    dset.attrs[k] = [str(i) for i in v]
                elif v is None:
                    dset.attrs[k] = "None"
                else:
                    dset.attrs[k] = v
        else:
            print("⚠️ Could not extract ECG signal")

save_ecg_signal_to_h5(raw, fs_dict, h5_path)

## EOG

In [6]:
def extract_eog_channel(raw, fs_dict, target, refs):
    """
    Extract harmonized EOG signal (e.g., EOG_L) from raw EDF data.

    Args:
        raw (mne.io.Raw): The raw MNE object.
        fs_dict (dict): Dictionary of {channel_name: fs}.
        target (str): Main channel (e.g., 'E1', 'LOC')
        refs (list of str): Reference candidates (e.g., ['M1', 'M2'])

    Returns:
        signal (np.ndarray or None), attrs (dict)
    """
    target_map = {'E1': 'L', 'E2': 'R'}
    harmonized_name = f"EOG_{target_map.get(target, target[-1].upper())}"

    ch_names = raw.ch_names

    # Case 1: Bipolar form like 'E1-M2'
    for ch in ch_names:
        if ch.startswith(target + '-'):
            signal = raw.get_data(picks=ch)[0]
            fs = fs_dict.get(ch)
            return signal, {
                'harmonized_name': harmonized_name,
                'raw_names': [ch],
                'fs': fs,
                'type': 'eog',
            }

    # Case 2: Separate subtraction
    for ref in refs:
        if target in ch_names and ref in ch_names:
            sig1 = raw.get_data(picks=target)[0]
            sig2 = raw.get_data(picks=ref)[0]
            min_len = min(len(sig1), len(sig2))
            signal = sig1[:min_len] - sig2[:min_len]
            fs = fs_dict.get(target)
            return signal, {
                'harmonized_name': harmonized_name,
                'raw_names': [target, ref],
                'fs': fs,
                'type': 'eog',
            }

    # Case 3: Not found
    return None, {
        'harmonized_name': harmonized_name,
        'raw_names': [],
        'fs': None,
        'type': 'eog',
    }

def save_eog_signals_to_h5(raw, fs_dict, targets, h5_path):
    """
    Extract and save EOG signals into an HDF5 file.

    Args:
        raw (mne.io.Raw): EDF data.
        fs_dict (dict): Sampling rate info.
        targets (dict): {target: [refs]}
        h5_path (str or Path): Output path.
    """
    with h5py.File(h5_path, "a") as f:
        for target, refs in targets.items():
            signal, attrs = extract_eog_channel(raw, fs_dict, target=target, refs=refs)

            if signal is not None:
                path = f"signals/EOG/{attrs['harmonized_name']}"
                dset = f.create_dataset(path, data=signal.astype("float32"))
                for k, v in attrs.items():
                    if isinstance(v, list):
                        dset.attrs[k] = [str(i) for i in v]
                    elif v is None:
                        dset.attrs[k] = "None"
                    else:
                        dset.attrs[k] = v
            else:
                print(f"⚠️ Could not extract EOG for {target}")


eog_targets = {
    'E1': ['M2'],
    'E2': ['M1']
}

save_eog_signals_to_h5(raw, fs_dict, eog_targets, h5_path)

## EMG

In [7]:
def extract_emg_channel(raw, fs_dict, target, refs, label=None):
    """
    Extract harmonized EMG signal from raw EDF data.

    Args:
        raw (mne.io.Raw): Raw EDF data.
        fs_dict (dict): Sampling rate per channel.
        target (str): Main EMG channel (e.g., 'LChin', 'LegL', 'Chin1').
        refs (list of str): Reference channels to subtract (e.g., ['RChin']).
        label (str): Optional harmonized label (e.g., 'Chin', 'Leg_L')

    Returns:
        signal (np.ndarray or None), attrs (dict)
    """
    ch_names = raw.ch_names
    label = label or target
    harmonized_name = f'EMG_{label.upper()}'

    # Case 1: Bipolar signal like 'Chin1-Chin2'
    for ch in ch_names:
        if ch.startswith(target + '-'):
            signal = raw.get_data(picks=ch)[0]
            fs = fs_dict.get(ch)
            return signal, {
                'harmonized_name': harmonized_name,
                'raw_names': [ch],
                'fs': fs,
                'type': 'emg',
            }

    # Case 2: Subtract two channels manually
    for ref in refs:
        if target in ch_names and ref in ch_names:
            sig1 = raw.get_data(picks=target)[0]
            sig2 = raw.get_data(picks=ref)[0]
            min_len = min(len(sig1), len(sig2))
            signal = sig1[:min_len] - sig2[:min_len]
            fs = fs_dict.get(target)
            return signal, {
                'harmonized_name': harmonized_name,
                'raw_names': [target, ref],
                'fs': fs,
                'type': 'emg',
            }

    # Case 3: Just use one channel
    if target in ch_names:
        signal = raw.get_data(picks=target)[0]
        fs = fs_dict.get(target)
        return signal, {
            'harmonized_name': harmonized_name,
            'raw_names': [target],
            'fs': fs,
            'type': 'emg',
        }

    # Case 4: Not found
    return None, {
        'harmonized_name': harmonized_name,
        'raw_names': [],
        'fs': None,
        'type': 'emg',
    }


def save_emg_signals_to_h5(raw, fs_dict, targets, h5_path):
    """
    Save EMG signals into an HDF5 file.

    Args:
        raw (mne.io.Raw): EDF data.
        fs_dict (dict): Sampling rates.
        targets (dict): {target: (refs, label)}
        h5_path (Path): Output HDF5 path.
    """
    chin_signals = []
    chin_raw_names = []
    chin_fs = None

    with h5py.File(h5_path, "a") as f:
        for target, (refs, label) in targets.items():
            signal, attrs = extract_emg_channel(raw, fs_dict, target=target, refs=refs, label=label)

            if signal is not None:
                if label.lower() == "chin":
                    chin_signals.append(signal)
                    chin_raw_names.extend(attrs["raw_names"])
                    chin_fs = attrs["fs"]
                else:
                    path = f"signals/EMG/{attrs['harmonized_name']}"
                    dset = f.create_dataset(path, data=signal.astype("float32"))
                    for k, v in attrs.items():
                        if isinstance(v, list):
                            dset.attrs[k] = [str(i) for i in v]
                        elif v is None:
                            dset.attrs[k] = "None"
                        else:
                            dset.attrs[k] = v
            else:
                print(f"⚠️ Could not extract EMG for {target}")

        # Combine chin signals if available
        if chin_signals:
            min_len = min(len(s) for s in chin_signals)
            chin_signals_trimmed = [s[:min_len] for s in chin_signals]
            combined_chin = np.mean(chin_signals_trimmed, axis=0)

            path = f"signals/EMG/EMG_CHIN"
            dset = f.create_dataset(path, data=combined_chin.astype("float32"))
            dset.attrs["harmonized_name"] = "EMG_CHIN"
            dset.attrs["raw_names"] = [str(name) for name in chin_raw_names]
            dset.attrs["fs"] = chin_fs or "None"
            dset.attrs["type"] = "emg"

emg_targets = {
    'LChin': (['RChin'], 'Chin'),   # contributes to EMG_CHIN
    'Chin1': (['Chin2'], 'Chin'),   # also contributes to EMG_CHIN
    'LegL': ([], 'Leg_L'),          # saved as EMG_LEG_L
    'LegR': ([], 'Leg_R'),          # saved as EMG_LEG_R
}

save_emg_signals_to_h5(raw, fs_dict, emg_targets, h5_path)

⚠️ Could not extract EMG for Chin1
