In [24]:
import pandas as pd
import numpy as np
import wfdb
import ast

def load_raw_data(df, sampling_rate, path):
    if sampling_rate == 100:
        data = [wfdb.rdsamp(path+f) for f in df.filename_lr]
    else:
        data = [wfdb.rdsamp(path+f) for f in df.filename_hr]
    data = np.array([signal for signal, meta in data])
    return data

path = r'D:\archive\ptb-xl-a-large-publicly-available-electrocardiography-dataset-1.0.1\ptb-xl-a-large-publicly-available-electrocardiography-dataset-1.0.1\\'
sampling_rate=500

# load and convert annotation data
Y = pd.read_csv(path+'ptbxl_database.csv', index_col='ecg_id')
Y.scp_codes = Y.scp_codes.apply(lambda x: ast.literal_eval(x))

# Load raw signal data--
X = load_raw_data(Y, sampling_rate, path)

# Load scp_statements.csv for diagnostic aggregation
agg_df = pd.read_csv(path+'scp_statements.csv', index_col=0)
agg_df = agg_df[agg_df.diagnostic == 1]

def aggregate_diagnostic(y_dic):
    tmp = []
    for key in y_dic.keys():
        if key in agg_df.index:
            tmp.append(agg_df.loc[key].diagnostic_class)
    return list(set(tmp))

# Apply diagnostic superclass
Y['diagnostic_superclass'] = Y.scp_codes.apply(aggregate_diagnostic)


KeyboardInterrupt: 

In [None]:
X[1]
print(X.shape)

In [None]:
import matplotlib.pyplot as plt
plt.plot(X[1000][:,4])

In [None]:
import numpy as np

def combine_12_leads(weights, signal):
    """
    Combine 12-lead signals into a single combined signal based on specified weights for each lead.

    :param weights: Dictionary containing weights for each lead.
    :param signal: 3D array representing the 12-lead signal.
    :return: Combined weighted signal.
    """
    # Calculate the total weight
    total_weight = sum(weights.values())

    # Initialize combined weighted signal with proper shape
    combined_weighted_signal = np.zeros((signal.shape[0], signal.shape[1], 1))

    # Calculate the weighted sum of signals across leads using mapping
    for i in range(signal.shape[0]):
        for lead, weight in weights.items():
            # Perform element-wise multiplication and addition to calculate combined signal
            combined_weighted_signal[i] += (signal[i, :, lead] * weight).reshape(-1, 1)

    # Normalize by dividing by total weight
    combined_weighted_signal /= total_weight

    print("New shape:", combined_weighted_signal.shape)
    return combined_weighted_signal
combined_weighted_signal=combine_12_leads(weights = {0: 1, 1: 1, 2: 1, 3: 2, 4: 2, 5: 2, 6: 1, 7: 1, 8: 1, 9: 1, 10: 1, 11: 1},signal=X)

In [None]:
print(combined_weighted_signal[:5])
plt.figure(figsize=(30,10))
plt.plot(combined_weighted_signal[1000])


In [None]:
combined_weighted_signal[1000].shape

In [None]:

def plot_wavelet_analysis(ECG,wavelet,level=3):
    """
    Takes the ECG signal and decomposes it into its CD and CA compositions
    in the case level=3, we have CA3,CD3,CD2,CD1. change them if you chose a different level of decomposition
    :param ECG: 1D array
    :param wavelet: for ECG sym4 is usually chosen.
    :param level: usually 3 or 4 is chosen.
    :return: plots CD and CA compositions.
    """
    import pywt
    coefs=pywt.swt(ECG,wavelet=wavelet,level=level, axis=0,trim_approx=True)
    CA3,CD3,CD2,CD1=coefs
    coef_list=[CA3,CD3,CD2,CD1]
    n=0
    for coef in coef_list:
        plt.figure(figsize=(30,5))
        plt.plot(coef)
        plt.xlabel('Amplitude')
        plt.ylabel('Time/Samples')
        plt.title(f'Coefficient{n}')
        plt.show()
        n+=1

plot_wavelet_analysis(combined_weighted_signal[2000],'sym4',3)

In [None]:
def wavelet_analysis(ECG,wavelet,level=3):
    """
    Based on the ECG signal, wavelet form and level of decompostion, reconstrucs the signal
    :param ECG: 1D array
    :param wavelet: Usually sym4 is chosen.
    :param level: change this according to your needs
    :return: reconstructed signal
    """
    import pywt
    coefs=pywt.swt(ECG,wavelet=wavelet,level=level, axis=0,trim_approx=True)

    # I chose not to consider the coefficients below. you can change this according to your needs

    coefs[-1]=np.zeros_like(coefs[-1])
    coefs[0]=np.zeros_like(coefs[0])

    signal_rec=pywt.iswt(coefs,wavelet=wavelet, axis=0)

    return signal_rec
signals=wavelet_analysis(combined_weighted_signal[2000],'sym4',3)
plt.plot(signals)
plt.plot(combined_weighted_signal[2000],'r')

In [None]:
def R_finder(signals):
    """
    Based on the reconstructed signa, finds the R peaks of the ECG signal
    :param signals: reconstructed signal/ The original sigan could also be used.
    :return: R peak indexes and amplitudes
    """
    from scipy.signal import argrelmax
    from scipy.signal import find_peaks

    signals_1d = np.squeeze(signals)
    y_prime=np.abs(signals_1d)**2
    average=y_prime.mean()
    R_peaks, properties = find_peaks(signals_1d, height=9*average,distance=75)
    peak_indices = argrelmax(properties['peak_heights'])
    R_peaks=R_peaks[peak_indices]
    peak_values=properties['peak_heights'][peak_indices]

    return R_peaks,peak_values

# plt.plot(R_peaks,properties, 'r', label='Detected QRS Complexes')
# plt.scatter(R_peaks[peak_indices],peak_values)

peak_indices,peak_values=R_finder(signals)
plt.figure(figsize=(30,10))
plt.plot(peak_indices,peak_values, 'r', label='Detected QRS Complexes')
plt.xlabel('Time (s)')
plt.ylabel('Amplitude')
plt.title('ECG Signal with Detected QRS Complexes')
plt.legend()
plt.grid(True)
plt.plot(combined_weighted_signal[1000])
plt.show()
plt.tight_layout()

In [None]:
def R_plot(signals):
    """

    :param signals: 1D array of a signal
    :return: plots the signal and its identified R peaks
    """
    from scipy.signal import argrelmax
    from scipy.signal import find_peaks
    signals_1d = np.squeeze(signals)
    y_prime=np.abs(signals_1d)**2
    average=y_prime.mean()

    # Height and distance values can be changed based on your needs
    R_peaks, properties = find_peaks(signals_1d, height=9*average,distance=75)
    peak_indices = argrelmax(properties['peak_heights'])
    peak_values=properties['peak_heights'][peak_indices]
    plt.figure(figsize=(20,10))
    plt.plot(R_peaks,properties['peak_heights'], 'r', label='Detected QRS Complexes')
    plt.scatter(R_peaks[peak_indices],peak_values,label='Peaks')
    plt.xlabel('Time')
    plt.ylabel('Amplitude')
    plt.title('ECG Signal with Detected QRS Complexes')
    plt.legend()

R_plot(signals)




In [None]:
def PQRS_extraction(ECG,sampling_rate=500):
    """

    :param ECG: Signal array
    :param sampling_rate: based your own data
    :return: PQRS indexes and amplitudes based on the ECG provided
    """
    import neurokit2 as nk

    # Assuming combined_weighted_signal[2000] contains your ECG signal data
    ecg=wavelet_analysis(ECG,'sym4')
    # Ensure the ECG signal data is in the correct format (1-dimensional array)
    ecg_signal = np.squeeze(ecg)
    original_ECG= np.squeeze(ECG)

    ecg_info = nk.ecg_process(ecg_signal, sampling_rate=sampling_rate)

    # Get the R-peaks indices
    r_peaks = ecg_info[1]["ECG_R_Peaks"]

    r_amplitude=original_ECG[r_peaks]

    p_peaks = ecg_info[1]["ECG_P_Peaks"]
    q_peaks = ecg_info[1]["ECG_Q_Peaks"]
    s_peaks = ecg_info[1]["ECG_S_Peaks"]

    valid_p_peaks = [peak for peak in p_peaks if not np.isnan(peak)]
    valid_q_peaks = [peak for peak in q_peaks if not np.isnan(peak)]
    valid_s_peaks = [peak for peak in s_peaks if not np.isnan(peak)]

    # You can also get the amplitudes of the peaks
    p_amplitudes = original_ECG[valid_p_peaks]
    q_amplitudes = original_ECG[valid_q_peaks]
    s_amplitudes = original_ECG[valid_s_peaks]

    return r_peaks, r_amplitude,valid_p_peaks,p_amplitudes,valid_q_peaks,q_amplitudes,valid_s_peaks,s_amplitudes
s=wavelet_analysis(combined_weighted_signal[1000],'sym4')
r_peaks, r_amplitude,valid_p_peaks,p_amplitudes,valid_q_peaks,q_amplitudes,valid_s_peaks,s_amplitudes = PQRS_extraction(s)

plt.figure(figsize=(12, 6))
plt.plot(s)
plt.plot(valid_q_peaks, q_amplitudes, "ro")
plt.xlabel("Samples")
plt.ylabel("Amplitude")
plt.title("ECG Signal with Detected R-peaks")
plt.grid(True)
plt.show()

In [None]:
def HR_counter(ECG,sample_rate=500):
    """
    Counts the heart rate based on the data provided
    :param ECG: 1D signal array
    :param sample_rate: based on your own data
    :return: rounded Heart Rate
    """
    import neurokit2 as nk

    # Assuming that youre using sym4 wavelet
    ecg=wavelet_analysis(ECG,'sym4')

    # Ensure the ECG signal data is in the correct format (1-dimensional array)
    ecg_signal = np.squeeze(ecg)

    ecg_info = nk.ecg_process(ecg_signal, sampling_rate=sample_rate)

    # Get the R-peaks indices
    r_peaks = ecg_info[1]["ECG_R_Peaks"]
    RR_intervals = np.mean(np.diff(r_peaks)/sample_rate)
    heart_rate = 60 / RR_intervals  # Convert to beats per minute (BPM)
    return round(heart_rate,0)

HR_counter(combined_weighted_signal[176])


In [None]:
def mean_PQRS_amplitude(ECG,sampling_rate=500):
    """
    based on the signal, extracts and calculates the mean of PQRS amplitudes
    :param ECG: 1D signal array
    :param sampling_rate: based on your own data
    :return: mean values of PQRS peaks
    """
    r_peaks, r_amplitude,valid_p_peaks,p_amplitudes,valid_q_peaks,q_amplitudes,valid_s_peaks,s_amplitudes =PQRS_extraction(ECG,sampling_rate=sampling_rate)
    mean_r_peaks=r_amplitude.mean()
    mean_p_peaks=p_amplitudes.mean()
    mean_q_peaks=q_amplitudes.mean()
    mean_s_peaks=s_amplitudes.mean()

    return mean_r_peaks,mean_p_peaks,mean_q_peaks,mean_s_peaks
mean_PQRS_amplitude(combined_weighted_signal[1000])


In [None]:
def wave_duration(ECG,sampling_rate=500):
    """
    calculates the PR segment and Rwave and Pwave durations
    :param ECG: 1D signal array
    :param sampling_rate: based on your own data
    :return: Rwave and Pwave duration and PR segment
    """
    import neurokit2 as nk

    # Assuming combined_weighted_signal[2000] contains your ECG signal data
    ecg=wavelet_analysis(ECG,'sym4')

    # Ensure the ECG signal data is in the correct format (1-dimensional array)
    ecg_signal = np.squeeze(ecg)

    ecg_info = nk.ecg_process(ecg_signal, sampling_rate=sampling_rate)
    # Get the wave onset and offsets
    r_onset = ecg_info[1]["ECG_Q_Peaks"]
    r_offset=ecg_info[1]["ECG_S_Peaks"]
    p_onset = ecg_info[1]["ECG_P_Onsets"]
    p_offset=ecg_info[1]["ECG_P_Offsets"]

    VALID_r_onset=np.array(r_onset)
    VALID_r_offset=np.array(r_offset)

    # valid_r_onsets = [onset for onset in r_onset if not np.isnan(onset)]
    # valid_r_offsets = [offset for offset in r_offset if not np.isnan(offset)]

    VALID_p_onset=np.array(p_onset)
    VALID_p_offset=np.array(p_offset)

    # valid_p_onsets = [onset for onset in p_onset if not np.isnan(onset)]
    # valid_p_offsets = [offset for offset in p_offset if not np.isnan(offset)]

    pwaves=VALID_p_offset-VALID_p_onset
    P_waves=[wave for wave in pwaves if not np.isnan(wave)]


    rwaves=VALID_r_offset-VALID_r_onset
    R_waves=[wave for wave in rwaves if not np.isnan(wave)]


    mean_rwave_duration=np.mean(R_waves)/sampling_rate # in seconds
    mean_pwave_duration=np.mean(P_waves)/sampling_rate # in seconds

    # Convert lists to numpy arrays to handle nan values
    ECG_P_onset_arr = np.array(p_onset)
    ECG_Q_Peaks_arr = np.array(r_onset)
    Difference = ECG_Q_Peaks_arr - ECG_P_onset_arr

    valid_PR_segment=[segment for segment in Difference if not np.isnan(segment)]
    mean_PR_segment=np.mean(valid_PR_segment)/sampling_rate


    return round(mean_rwave_duration,2), round(mean_pwave_duration,2),round(mean_PR_segment,3)

wave_duration(combined_weighted_signal[6])


In [None]:
def RR_ratio(ECG,sampling_rate=500):
    """
    calculates the RR ratio which is: RR(i)/RR(i+1)
    :param ECG:
    :param sampling_rate:
    :return: rounded RR_ratio to 2 decimals
    """
    import neurokit2 as nk
    # Assuming combined_weighted_signal[2000] contains your ECG signal data
    ecg=wavelet_analysis(ECG,'sym4')

    # Ensure the ECG signal data is in the correct format (1-dimensional array)
    ecg_signal = np.squeeze(ecg)

    ecg_info = nk.ecg_process(ecg_signal, sampling_rate=sampling_rate)

    # Get the R-peaks indices
    r_peaks = ecg_info[1]["ECG_R_Peaks"]
    RR_intervals = np.diff(r_peaks)/sampling_rate
    rr_ratios = []
    for i in range(len(RR_intervals) - 1):
        rr_ratio = RR_intervals[i] / RR_intervals[i + 1]
        rr_ratios.append(rr_ratio)

    RR_R = np.mean(rr_ratios)

    return round(RR_R,2)

RR_ratio(combined_weighted_signal[1000])


In [None]:
def extraction(combined_weighted_signal):
    """

    :param combined_weighted_signal: The 1D array of original ECG signal
    :return: A data fram of extracted features
    """
    HR_list = []
    RR_list = []
    R_wave_duration_list = []
    P_wave_duration_list = []
    PR_interval_list = []
    R_peak_list = []
    P_peak_list = []
    Q_peak_list = []
    S_peak_list = []
    for item in range(100):
        print(item)
        signal=combined_weighted_signal[item]
        HR=HR_counter(signal)
        RR=RR_ratio(signal)
        R_wave_duration,P_wave_duration,PR_interval=wave_duration(signal)
        R_peak,P_peak,Q_peak,S_peak=mean_PQRS_amplitude(signal)
        # Append extracted features to lists
        HR_list.append(HR)
        RR_list.append(RR)
        R_wave_duration_list.append(R_wave_duration)
        P_wave_duration_list.append(P_wave_duration)
        PR_interval_list.append(PR_interval)
        R_peak_list.append(R_peak)
        P_peak_list.append(P_peak)
        Q_peak_list.append(Q_peak)
        S_peak_list.append(S_peak)

    # Create a DataFrame from the lists
    data = {
        'HR': HR_list,
        'RR': RR_list,
        'R_wave_duration': R_wave_duration_list,
        'P_wave_duration': P_wave_duration_list,
        'PR_interval': PR_interval_list,
        'R_peak': R_peak_list,
        'P_peak': P_peak_list,
        'Q_peak': Q_peak_list,
        'S_peak': S_peak_list
    }
    df = pd.DataFrame(data)
    return df
df=extraction(combined_weighted_signal)
df.head()

In [None]:
df.describe()

In [None]:
Y.head()

In [None]:
Y['diagnostic_superclass'].head(10)

In [None]:
Y['diagnostic_superclass'] = Y['diagnostic_superclass'].apply(lambda x: str(x)[1:-1])
Y['diagnostic_superclass'].head(10)

In [None]:
Y_subset = Y.iloc[:2000].reset_index(drop=True)

df['diagnostic_superclass']=Y_subset['diagnostic_superclass']
df['weight']=Y_subset['weight']
df['sex']=Y_subset['sex']
df['age']=Y_subset['age']
df['patient_id']=Y_subset['patient_id']
df.head(10)


In [None]:
import seaborn as sns
sns.countplot(df['diagnostic_superclass'])
plt.show()
df['diagnostic_superclass']=df['diagnostic_superclass'].apply(lambda x: 0 if 'NORM' in x else 1)

df['diagnostic_superclass'].hist(legend=True)
plt.ylabel('Number of samples')
plt.xlabel('Categories')
plt.show()
df['diagnostic_superclass'].head(10)

In [None]:
df.to_csv('extracted_features.csv', index=False)
