In [None]:
"""
Generate Raw ECG Signal Data for CNN Training and Testing

This script processes ECG recordings for apnea detection by:
1. Reading the raw ECG signal and annotations using wfdb.
2. Converting annotation sample indices to seconds.
3. Cleaning each ECG segment using NeuroKit2's cleaning functions.
4. Filtering out segments with poor signal quality.
5. Concatenating the cleaned segments for each record.
6. Saving the processed signals and labels as NumPy arrays for training and testing.

Update file paths as needed.
"""

In [None]:
import wfdb
import mne
import numpy as np
import pandas as pd
import pickle
import torch
import neurokit2 as nk
import warnings
warnings.filterwarnings("ignore")


In [None]:
def data_creation(record_path):
    """
    Process an ECG record and extract cleaned signal segments along with labels.
    
    Parameters:
        record_path (str): Path to the ECG record (without extension).
    
    Returns:
        clean_signal (np.ndarray): Concatenated cleaned ECG segments.
        labels (np.ndarray): Corresponding labels (0 for no apnea, 1 for apnea) for each segment.
    """
    # Read the annotation file (using the extension 'apn')
    annotation = wfdb.rdann(record_path, extension='apn')
    
    # Get the sampling frequency (samples per second)
    sampling_rate = wfdb.rdrecord(record_path).fs
    
    # Convert annotation sample indices to times in seconds
    annotation_times = np.array(annotation.sample) / sampling_rate
    
    # Generate labels: "N" is normal (0), others indicate apnea (1)
    annotation_labels = np.where(np.array(annotation.symbol) == "N", 0, 1).astype(np.int32)
    
    # Read the ECG signal and flatten it
    signal, _ = wfdb.rdsamp(record_path)
    signal = signal.flatten()
    
    # Exclude the first and last annotations to avoid edge effects
    annotation_times = annotation_times[1:-1]
    annotation_labels = annotation_labels[1:-1]
    
    clean_signal = None  # To accumulate cleaned segments
    bad_times = []       # Indices of segments with poor signal quality

    # Process each segment based on annotation times
    for i in range(len(annotation_times)):
        try:
            # Calculate the center of the segment in samples
            time_index = int(annotation_times[i]) * sampling_rate
            # Define a window of 60 seconds on either side of the annotation
            time_interval = 60 * sampling_rate
            segment = signal[time_index - time_interval: time_index + time_interval]
            
            # Clean the ECG segment using NeuroKit2
            clean = nk.ecg_clean(segment, sampling_rate=sampling_rate)
            
            # Compute ECG quality (mean quality score)
            quality = np.mean(nk.ecg_quality(clean, sampling_rate=sampling_rate))
            if quality < 0.5:
                # Mark segment as bad if quality is below threshold
                bad_times.append(i)
            else:
                # Reshape cleaned segment to a 2D array (one row)
                clean = clean.reshape(1, -1)
                if clean_signal is None:
                    clean_signal = clean
                else:
                    # Concatenate new segment along the first dimension (rows)
                    clean_signal = np.concatenate((clean_signal, clean), axis=0)
        except Exception as e:
            # On any error, mark the segment as bad and continue
            bad_times.append(i)
            # Optionally, uncomment the line below for debugging:
            # print(f"Warning: Error processing segment at index {i}. Error: {e}")
    
    # Remove labels corresponding to segments with poor quality
    labels = np.delete(annotation_labels, bad_times)
    return clean_signal, labels

In [None]:
# Generate Training Data

ECG_labels_train= []
ECG_signal_train = []
list_of_file_names_train = open(r"CNN Model\Lists\list_train").read().split("\n")
for file_name in list_of_file_names_train:
    # Change the path to where the apnea data is.
    record_path = fr"C:\Users\piotr\Desktop\PSG data\Apnea ECG\{file_name}"
    clean_data, labels = data_creation(record_path)
    ECG_labels_train.append(labels)
    ECG_signal_train.append(clean_data)

# Concatenate all training labels and signals (using dtype=object for variable-length arrays)
training_labels = np.concatenate(np.array(ECG_labels_train, dtype=object))
training_signal = np.concatenate(np.array(ECG_signal_train, dtype=object))

# Save processed training data
np.save("training labels", training_labels)
np.save("training_signal", training_signal)

In [None]:
# Generate Testing Data
ECG_labels_test = []
ECG_signal_test = []
list_of_file_names_test = open(r"CNN Model\Lists\list_test").read()

for file_name in list_of_file_names_test:

    record_path = fr"C:\Users\piotr\Desktop\PSG data\Apnea ECG\{file_name}"
    clean_data, labels = data_creation(record_path)
    ECG_labels_test.append(labels)
    ECG_signal_test.append(clean_data)

# Concatenate all testing labels and signals
test_labels = np.concatenate(np.array(ECG_labels_test, dtype=object))
test_signal = np.concatenate(np.array(ECG_signal_test, dtype=object))

# Save processed testing data
np.save("test labels", test_labels)
np.save("test_signal", test_signal)