# Imports

In [1]:
import pandas as pd
import numpy as np
import wfdb
import ast
import biosppy.signals.ecg as ecg
import os
from scipy.signal import resample

def save_ecg_recording(file_name, signals, annotations, sample_rate, write_dir, additional_info):
    """
    Save an ECG recording to .mat, .hea, and .atr files.

    Args:
        file_name (str): Base name for the files to create.
        signals (list): List of signal arrays, one for each channel.
        annotations (list): List of annotation samples.
        sample_rate (int): Sampling rate of the signals.
        units (list): List of measurement units for each channel.
        write_dir (str, optional): Directory to save the files. Defaults to current directory.
        additional_info (list): List of additional information lines to append to the .hea file.
    """
    # Define the channel names in the desired order
    channel_names = ['I', 'II', 'III', 'AVR', 'AVL', 'AVF', 'V1', 'V2', 'V3', 'V4', 'V5', 'V6']

    # Ensure signal array is properly shaped (channels x samples)
    signals = np.array(signals).T  # Transpose to make channels as columns
    
    # Create a list of signal formats and units
    fmt = ['16'] * len(channel_names)
    units = ['mV'] * len(channel_names)

    # Check if the write directory exists, if not, create it
    if not os.path.exists(write_dir):
        os.makedirs(write_dir)

    # Save the signal using the wfdb format
    wfdb.wrsamp(file_name, fs=sample_rate, units=units, sig_name=channel_names, p_signal=signals, fmt=fmt, write_dir=write_dir)

    # Append additional information to .hea file
    hea_file = os.path.join(write_dir, f"{file_name}.hea")
    with open(hea_file, 'a') as f:
        for line in additional_info:
            f.write(line + '\n')

    # Save .atr file if annotations exist
    if len(annotations) > 0:
        wfdb.wrann(file_name, extension='atr', sample=annotations, symbol=['N'] * len(annotations), write_dir=write_dir)

# Load Data

In [2]:
root_path = '\\path\\ptb-xl-a-large-publicly-available-electrocardiography-dataset-1.0.3\\'

In [3]:
def load_raw_data(df, sampling_rate, path):
    if sampling_rate == 100:
        data = [wfdb.rdsamp(path+f) for f in df.filename_lr]
    else:
        data = [wfdb.rdsamp(path+f) for f in df.filename_hr]
    data = np.array([signal for signal, meta in data])
    return data

sampling_rate=100

# load and convert annotation data
Y = pd.read_csv(root_path+'ptbxl_database.csv', index_col='ecg_id')
Y.scp_codes = Y.scp_codes.apply(lambda x: ast.literal_eval(x))

# Load raw signal data
X = load_raw_data(Y, sampling_rate, root_path)

# Load scp_statements.csv for diagnostic aggregation
agg_df = pd.read_csv(root_path+'scp_statements.csv', index_col=0)
agg_df = agg_df[agg_df.diagnostic == 1]

def aggregate_diagnostic(y_dic):
    tmp = []
    for key in y_dic.keys():
        if key in agg_df.index:
            tmp.append(agg_df.loc[key].diagnostic_class)
    return list(set(tmp))

# Apply diagnostic superclass
Y['diagnostic_superclass'] = Y.scp_codes.apply(aggregate_diagnostic)

# Filtering ECG Records NORM, MI & MI-CD, HYP

In [5]:
# Find unique sets of diagnostic_superclass values
unique_diagnostic_classes = Y['diagnostic_superclass'].apply(tuple).unique()  # Convert lists to tuples for uniqueness

# Print each unique set of classes
for classes in unique_diagnostic_classes:
    print(list(classes))  # Convert tuples back to list for better readability

['NORM']
['MI']
[]
['STTC']
['HYP']
['CD']
['MI', 'STTC']
['CD', 'HYP']
['MI', 'CD']
['CD', 'STTC']
['MI', 'HYP']
['MI', 'HYP', 'STTC']
['MI', 'HYP', 'STTC', 'CD']
['HYP', 'STTC']
['CD', 'NORM']
['MI', 'STTC', 'CD']
['CD', 'HYP', 'STTC']
['CD', 'MI']
['NORM', 'STTC']
['MI', 'HYP', 'CD']
['CD', 'NORM', 'STTC']
['CD', 'HYP', 'NORM']
['HYP', 'NORM']
['CD', 'HYP', 'MI']
['CD', 'MI', 'STTC']
['STTC', 'CD', 'MI', 'HYP']
['CD', 'MI', 'HYP']
['CD', 'MI', 'NORM', 'HYP']
['CD', 'MI', 'STTC', 'HYP']


In [7]:
# First, filter for ecg_ids where 'diagnostic_superclass' is exactly ['NORM']
norm_ids = Y[Y['diagnostic_superclass'].apply(lambda x: x == ['NORM'])].index

# Now, from these ids, select those where 'NORM' value in 'scp_codes' is greater than or equal to 80.0
norm_greater_or_equal_80_ids = Y.loc[norm_ids][Y.loc[norm_ids, 'scp_codes'].apply(lambda x: x.get('NORM', 0) >= 80)].index

# Print the ecg_ids where 'NORM' value in 'scp_codes' is greater than or equal to 80.0
print("ECG IDs where 'NORM' value in 'scp_codes' is greater than or equal to 80.0:")
print(norm_greater_or_equal_80_ids)
print("--------------------------------------------------------------------")

# ------------------------------------------------------------------------------------------

# Filter for ecg_ids with exactly ['MI'] or ['MI', 'CD']
mi_ids = Y[Y['diagnostic_superclass'].apply(lambda x: x == ['MI'])].index
cd_mi_ids = Y[Y['diagnostic_superclass'].apply(lambda x: x == ['CD', 'MI'])].index
mi_cd_ids = Y[Y['diagnostic_superclass'].apply(lambda x: x == ['MI', 'CD'])].index

# Combine the indices from both filters without duplicates
combined_mi_ids = mi_ids.union(mi_cd_ids)
combined_mi_ids = combined_mi_ids.union(cd_mi_ids)

# Print the ecg_ids
print("ECG IDs for records with exactly ['MI'] or ['MI', 'CD'] or ['CD', 'MI']:")
print(combined_mi_ids)
print("--------------------------------------------------------------------")

# ------------------------------------------------------------------------------------------

# Filter for ecg_ids with exactly ['HYP']
hyp_ids = Y[Y['diagnostic_superclass'].apply(lambda x: x == ['HYP'])].index

# Print the ecg_ids
print("ECG IDs for records with exactly ['HYP']:")
print(hyp_ids)

ECG IDs where 'NORM' value in 'scp_codes' is greater than or equal to 80.0:
Index([    1,     2,     3,     4,     5,     6,     7,     9,    10,    11,
       ...
       21810, 21813, 21814, 21818, 21822, 21825, 21831, 21834, 21836, 21837],
      dtype='int64', name='ecg_id', length=8577)
--------------------------------------------------------------------
ECG IDs for records with exactly ['MI'] or ['MI', 'CD'] or ['CD', 'MI']:
Index([    8,    50,    63,    77,   103,   131,   153,   155,   161,   175,
       ...
       21786, 21788, 21793, 21796, 21799, 21811, 21815, 21820, 21824, 21826],
      dtype='int64', name='ecg_id', length=3829)
--------------------------------------------------------------------
ECG IDs for records with exactly ['HYP']:
Index([   30,    96,   138,   313,   333,   502,   542,   616,   640,   697,
       ...
       21076, 21134, 21285, 21351, 21523, 21537, 21598, 21697, 21772, 21775],
      dtype='int64', name='ecg_id', length=535)


In [8]:
# Step 1: Define filter keys in Y DataFrame
Y['filtered_mi'] = Y.index.isin(combined_mi_ids)
Y['filtered_norm'] = Y.index.isin(norm_greater_or_equal_80_ids)
Y['filtered_hyp'] = Y.index.isin(hyp_ids)

# Step 2: Create masks for filtering X based on the filtered Y
mask_norm = Y['filtered_norm'] # Y.index.isin(norm_greater_or_equal_80_ids)
mask_mi = Y['filtered_mi'] # Y.index.isin(combined_mi_ids)
mask_hyp = Y['filtered_hyp'] # Y.index.isin(hyp_ids)

# Step 3: Apply the masks to X and Y
X_filtered_mi = X[mask_mi]
Y_filtered_mi = Y[mask_mi]

X_filtered_norm = X[mask_norm]
Y_filtered_norm = Y[mask_norm]

X_filtered_hyp = X[mask_hyp]
Y_filtered_hyp = Y[mask_hyp]

# Optionally, print out some of the filtered data to check
print("Filtered NORM Records:")
print(Y_filtered_norm)
print("Filtered MI Records:")
print(Y_filtered_mi)
print("Filtered HYP Records:")
print(Y_filtered_hyp)

Filtered NORM Records:
        patient_id    age  sex  height  weight  nurse  site      device  \
ecg_id                                                                    
1          15709.0   56.0    1     NaN    63.0    2.0   0.0   CS-12   E   
2          13243.0   19.0    0     NaN    70.0    2.0   0.0   CS-12   E   
3          20372.0   37.0    1     NaN    69.0    2.0   0.0   CS-12   E   
4          17014.0   24.0    0     NaN    82.0    2.0   0.0   CS-12   E   
5          17448.0   19.0    1     NaN    70.0    2.0   0.0   CS-12   E   
...            ...    ...  ...     ...     ...    ...   ...         ...   
21825      11197.0   59.0    0     NaN     NaN    1.0   2.0  AT-60    3   
21831      11905.0   55.0    1     NaN     NaN    1.0   2.0  AT-60    3   
21834      20703.0  300.0    0     NaN     NaN    1.0   2.0  AT-60    3   
21836       8873.0   64.0    1     NaN     NaN    1.0   2.0  AT-60    3   
21837      11744.0   68.0    0     NaN     NaN    1.0   2.0  AT-60    3   

 

# Base Wander Filter and Upsampling

In [None]:
# Function to filter and interpolate ECG signals
def filter_and_interpolate_ecg_signals(X_signals, original_sampling_rate, target_sampling_rate):
    resize_factor = target_sampling_rate / original_sampling_rate
    new_size = int(X_signals.shape[1] * resize_factor)
    X_filtered = []  # Use a list to collect all records
    count = 0
    
    for i in range(X_signals.shape[0]):  # Loop over each ECG record
        record_filtered = []  # This will hold filtered signals for one record
        if len(X_signals[i, :, ]) == 1000:  # Ensure the record has the expected length
            print(count)
            for j in range(X_signals.shape[2]):  # Loop over each channel
                signal = X_signals[i, :, j]
                try:
                    # Attempt to filter and resample the signal
                    ecg_analysis = ecg.ecg(signal=signal, sampling_rate=original_sampling_rate, show=False)
                    resampled_sig = ecg_analysis['filtered']
                    #resampled_sig = resample(filtered_signal, new_size)
                except Exception as e:
                    # If there's an error, fall back to adding the original signal
                    print(f"Error processing signal in record {count}, channel {j}: {e}")
                    resampled_sig = resample(signal, new_size)

                record_filtered.append(resampled_sig)  # Add the signal (processed or original) to the record

            X_filtered.append(record_filtered)  # Add the filtered record to the main list
            count = count + 1

    return X_filtered

# Process the three classes using the updated function
X_interpolated_norm = filter_and_interpolate_ecg_signals(X_filtered_norm, 100, 100)
X_interpolated_mi = filter_and_interpolate_ecg_signals(X_filtered_mi, 100, 100)
X_interpolated_hyp = filter_and_interpolate_ecg_signals(X_filtered_hyp, 100, 100)


# Saving Filtered ECG Records

In [None]:
root_dir = root_path+"ptbxl_filtered_100hz\\"
# Directory paths for filtered ECG signals
dir_norm = root_dir+"NORM"
dir_mi = root_dir+"MI"
dir_hyp = root_dir+"HYP"

# List of directories to check and create if necessary
directories = [dir_norm, dir_mi, dir_hyp]

# Function to create directories if they don't exist
def create_directory_if_not_exists(directory):
    if not os.path.exists(directory):
        os.makedirs(directory)
        print(f"Directory '{directory}' created successfully.")

# Apply the function to each directory
for directory in directories:
    create_directory_if_not_exists(directory)
    
# --------------------------------------------------------------------

# Prepare to save labels
labels_info = []

def save_individual_signals_and_labels(X_signals, Y_labels, label, write_dir):
    count = 0
    for i in range(len(X_signals)):
        original_ecg_id = Y_labels.index[i]
        signal = X_signals[i]
                        
        ecg_analysis = ecg.ecg(signal=signal[0], sampling_rate=100, show=False)
        rpeaks = ecg_analysis['rpeaks']
        
        # Generate a unique filename for each signal
        file_name = f'signal_{original_ecg_id}'        
        
        # Extract additional information
        age = int(Y_labels.iloc[i]['age'])
        sex = 'M' if Y_labels.iloc[i]['sex'] == 0 else 'F'
        
        additional_info = [
            f"# Age: {age}",
            f"# Sex: {sex}",
            f"# Diagnosis: {label}"
        ]
        
        save_ecg_recording(file_name, signal, rpeaks, 100, write_dir, additional_info)
        print(count)
        count = count + 1
        # Record the filename and its corresponding label
        labels_info.append({'filename': file_name, 'label': label}) 

save_individual_signals_and_labels(X_interpolated_norm, Y_filtered_norm, "NORM", dir_norm)
save_individual_signals_and_labels(X_interpolated_mi, Y_filtered_mi, "MI", dir_mi)
save_individual_signals_and_labels(X_interpolated_hyp, Y_filtered_hyp, "HYP", dir_hyp)

# Convert labels information to DataFrame
labels_df = pd.DataFrame(labels_info)

labels_path = os.path.join(root_dir, 'labels.csv')

# Check if the labels CSV file exists and delete it if it does
if os.path.exists(labels_path):
    os.remove(labels_path)

# Save the new labels CSV file
labels_df.to_csv(labels_path, index=False)
print(f"Signals and labels saved successfully in '{root_dir}'.")