In [1]:
pip install neurokit2





In [3]:
pip install mne neurokit2 numpy pandas scipy

Note: you may need to restart the kernel to use updated packages.


In [5]:
pip install mne neurokit2 numpy pandas scipy joblib tqdm

Note: you may need to restart the kernel to use updated packages.


In [7]:
import os
import numpy as np
import pandas as pd
import mne
import neurokit2 as nk
from scipy.signal import butter, filtfilt, resample, find_peaks
import logging
from joblib import Parallel, delayed
from tqdm import tqdm
from multiprocessing import cpu_count

# Configure logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

# ECG Signal Preprocessing
def filter_signal(signal, sfreq, low=0.5, high=40.0):
    """Applies a 4th-order Butterworth bandpass filter."""
    b, a = butter(4, [low/(sfreq/2), high/(sfreq/2)], btype='band')
    return filtfilt(b, a, signal)

def normalize(signal):
    """Normalizes the signal between 0 and 1."""
    return (signal - np.min(signal)) / (np.max(signal) - np.min(signal)) if np.max(signal) != np.min(signal) else signal

def calculate_edr(signal, r_peaks, sfreq):
    """Calculates ECG-Derived Respiration (EDR) using R-peak amplitude modulation."""
    if len(r_peaks) < 2:
        return 0
    edr_signal = signal[r_peaks]  # Extract ECG amplitudes at R-peaks
    peaks, _ = find_peaks(edr_signal, height=np.percentile(edr_signal, 95), distance=sfreq // 2)
    if len(peaks) < 2:
        return 0
    rr_intervals = np.diff(peaks) / sfreq  # Convert to seconds
    return 60 / np.mean(rr_intervals)  # Convert to breaths per minute

def calculate_cvhr(r_peaks, sfreq):
    """Calculates Cyclic Variation of Heart Rate (CVHR) based on RR intervals."""
    if len(r_peaks) < 2:
        return 0
    rr_intervals = np.diff(r_peaks) / sfreq  # Convert to seconds
    return np.sum(np.abs(np.diff(rr_intervals))) / len(rr_intervals) * 100  # CVHR as a percentage

def process_interval(signal, sfreq):
    """Processes a single ECG interval and extracts features."""
    # Downsample early to save computation
    downsampled_signal = resample(signal, len(signal) // 4)
    sfreq_downsampled = sfreq / 4

    normalized_signal = normalize(downsampled_signal)

    # Detect R-peaks using NeuroKit2
    r_peaks = nk.ecg_peaks(normalized_signal, sampling_rate=sfreq_downsampled)[1]['ECG_R_Peaks']

    # Extract HRV features
    hrv_features = nk.hrv(r_peaks, sampling_rate=sfreq_downsampled, show=False).iloc[0].to_dict()

    # Compute Approximate Entropy (ApEn)
    apen = nk.entropy_approximate(normalized_signal)

    # Compute EDR and CVHR
    edr_rate = calculate_edr(normalized_signal, r_peaks, sfreq_downsampled)
    cvhr = calculate_cvhr(r_peaks, sfreq_downsampled)

    return {
        'Mean RR Interval (ms)': hrv_features.get('HRV_MeanNN', 0),
        'Overall HRV': hrv_features.get('HRV_SDNN', 0),
        'RMS_SD (ms)': hrv_features.get('HRV_RMSSD', 0),
        'pNN50 (%)': hrv_features.get('HRV_pNN50', 0),
        'VLF Power': hrv_features.get('HRV_VLF', 0),
        'EDR Rate (breaths per min)': edr_rate,
        'CVHR (%)': cvhr
    }

def process_file(file_path, interval_length=300):
    """Processes a single EDF file and extracts features per interval."""
    try:
        logging.info(f"Processing: {os.path.basename(file_path)}")

        # Load EDF file
        raw = mne.io.read_raw_edf(file_path, preload=True, verbose=False)
        sfreq = raw.info['sfreq']

        # Extract ECG signal (assuming channel 2 contains ECG)
        signal = raw.get_data(picks='chan 2')[0]  # Change 'chan 2' to correct channel if needed

        # Apply bandpass filter
        filtered_signal = filter_signal(signal, sfreq)

        # Determine interval size
        interval_samples = int(interval_length * sfreq)
        num_intervals = len(filtered_signal) // interval_samples

        # Process intervals in parallel
        features_list = Parallel(n_jobs=cpu_count())(
            delayed(process_interval)(filtered_signal[i * interval_samples:(i + 1) * interval_samples], sfreq)
            for i in range(num_intervals)
        )

        # Add file and interval information
        for i, features in enumerate(features_list):
            features['File'] = os.path.basename(file_path)
            features['Interval'] = f"{i * interval_length}-{(i + 1) * interval_length}s"

        return features_list

    except Exception as e:
        logging.error(f"Failed to process {os.path.basename(file_path)}: {e}")
        return []

def process_edf(directory, output_csv="sleep_apnea_features.csv", interval_length=300):
    """Processes all EDF files in a directory and extracts ECG features."""
    # Get list of EDF files
    edf_files = [os.path.join(directory, file) for file in os.listdir(directory) if file.endswith('.edf')]

    logging.info(f"Found {len(edf_files)} EDF files. Starting processing...")

    # Parallel processing with progress bar
    features_list = Parallel(n_jobs=cpu_count())(
        delayed(process_file)(file_path, interval_length) for file_path in tqdm(edf_files, desc="Processing EDF files")
    )

    # Flatten list
    features_list = [item for sublist in features_list for item in sublist]

    # Save to CSV
    df = pd.DataFrame(features_list)
    df.to_csv(output_csv, index=False)
    logging.info(f"Feature extraction completed. Saved to '{output_csv}'")

# Run script
if __name__ == "__main__":
    process_edf('dataset', interval_length=300)  # Adjust path & interval length as needed


2025-03-30 11:57:11,121 - INFO - Found 25 EDF files. Starting processing...
Processing EDF files: 100%|█████████████████████████████████████████████████████████████████████████| 25/25 [13:55<00:00, 33.41s/it]
2025-03-30 12:22:28,626 - INFO - Feature extraction completed. Saved to 'sleep_apnea_features.csv'


In [1]:
import os
import pandas as pd


# File paths
ECG_FEATURES_CSV = "sleep_apnea_features.csv"  # Your main ECG features file
STAGE_FILES_DIR = "dataset/"                   # Directory with 25 stage files (e.g., ucddb001_stage.txt)
RESPEVT_FILES_DIR = "dataset/"                  # Directory with respiratory event files
OUTPUT_CSV = "labeled_ecg_features.csv"

# ======================================================
# Step 1: Parse Respiratory Events from All Files
# ======================================================
def parse_resp_events(file_path):
    """Extracts apnea/hypopnea events from a .txt file."""
    events = []
    
    with open(file_path, 'r') as f:
        for line in f:
            if line.startswith("Time") or not line.strip():
                continue
            parts = line.strip().split()
            if len(parts) < 5:
                continue

            try:
                # Parse time (HH:MM:SS to seconds)
                h, m, s = map(int, parts[0].split(':'))
                total_sec = h * 3600 + m * 60 + s

                # Improved parsing with error handling
                low = float(parts[3]) if parts[3].replace('.', '', 1).isdigit() else None
                percent_drop = float(parts[4]) if parts[4].replace('.', '', 1).isdigit() else None

                events.append({
                    "time": total_sec,
                    "type": parts[1],
                    "duration": int(parts[2]),
                    "low": low,
                    "percent_drop": percent_drop
                })
            except (ValueError, IndexError):
                continue

    return events


# Load all respiratory events into a dictionary {patient_id: events}
resp_events_all = {}
for filename in os.listdir(RESPEVT_FILES_DIR):
    if filename.endswith("_respevt.txt"):
        patient_id = filename.split('_')[0]  # e.g., "ucddb001"
        resp_events_all[patient_id] = parse_resp_events(os.path.join(RESPEVT_FILES_DIR, filename))

# ======================================================
# Step 2: Label ECG Intervals Based on Respiratory Events
# ======================================================
def label_apnea(row, resp_events):
    """Labels a 5-minute interval as 1 (apnea) or 0 (no apnea)."""
    if resp_events is None:
        return 0  # No event data → assume no apnea

    start = row["start_sec"]
    end = start + 300  # 5-minute window

    apnea_count = 0
    hypopnea_count = 0

    for event in resp_events:
        # Detect overlapping events
        event_start = event["time"]
        event_end = event_start + event["duration"]

        if (event_start < end) and (event_end > start):
            if "APNEA" in event["type"]:
                apnea_count += 1
            elif "HYP" in event["type"]:
                percent_drop = event.get("percent_drop")
                # Use a more appropriate threshold for hypopnea
                if percent_drop is not None and percent_drop >= 30:
                    hypopnea_count += 1

    return 1 if (apnea_count >= 1) or (hypopnea_count >= 1) else 0

# ======================================================
# Step 3: Process ECG Features CSV
# ======================================================
# Load ECG features
ecg_df = pd.read_csv(ECG_FEATURES_CSV)

# Extract patient ID and interval start time from filename/interval
ecg_df["patient_id"] = ecg_df["File"].str.extract(r'([a-z0-9]+)_')[0]  # e.g., "ucddb002"
ecg_df["start_sec"] = ecg_df["Interval"].str.extract(r'^(\d+)').astype(int)

# Check for missing patient IDs
missing_patients = set(ecg_df["patient_id"]) - set(resp_events_all.keys())
print("Missing patients with no event files:", missing_patients)

# Add apnea labels with improved event matching
ecg_df["Apnea_Label"] = ecg_df.apply(
    lambda row: label_apnea(row, resp_events_all.get(row["patient_id"])),
    axis=1
)

# ======================================================
# Step 4: Add Sleep Stage Features (Optional)
# ======================================================
def get_dominant_sleep_stage(patient_id, start_sec, stage_files_dir):
    """Returns the dominant sleep stage for a 5-minute interval."""
    stage_file = os.path.join(stage_files_dir, f"{patient_id}_stage.txt")
    
    if not os.path.exists(stage_file):
        return None

    with open(stage_file, 'r') as f:
        stages = [int(line.strip()) for line in f if line.strip().isdigit()]

    # 30-second epochs → 10 epochs per 5-minute window
    epoch = start_sec // 30
    window_stages = stages[epoch: epoch + 10]

    if not window_stages:
        return None
    
    return max(set(window_stages), key=window_stages.count)

# Add sleep stage (if needed)
ecg_df["Dominant_Sleep_Stage"] = ecg_df.apply(
    lambda row: get_dominant_sleep_stage(row["patient_id"], row["start_sec"], STAGE_FILES_DIR),
    axis=1
)

# ======================================================
# Step 5: Save Final Labeled Dataset
# ======================================================
ecg_df.to_csv(OUTPUT_CSV, index=False)
print(f"Labeled dataset saved to {OUTPUT_CSV}")
print("Summary of Apnea Labels:")
print(ecg_df["Apnea_Label"].value_counts())


Missing patients with no event files: set()
Labeled dataset saved to labeled_ecg_features.csv
Summary of Apnea Labels:
Apnea_Label
0    2158
1     275
Name: count, dtype: int64
