In [2]:
import numpy as np
from scipy.signal import find_peaks

def extract_p_peaks(ecg_lead, amplitude_range=(50, 150), peaks=None):
    """
    Extract P-peak locations from the ECG lead based on an amplitude range,
    excluding the R-peak regions.
    """
    if peaks is None:
        peaks, _ = find_peaks(ecg_lead, distance=350)

    p_peaks = []
    for i in range(len(peaks) - 1):
        r_peak = peaks[i]
        r_next_peak = peaks[i+1]
        
        # Define a search window before the R-peak
        window_start = r_peak - 200  # Adjust this value as needed
        window_end = r_peak - 50  # Adjust this value as needed
        
        # Find peaks within the search window and amplitude range
        window_peaks, _ = find_peaks(ecg_lead[window_start:window_end], height=amplitude_range)
        
        # Add the peak locations to p_peaks, adjusting for the window offset
        p_peaks.extend([window_start + peak for peak in window_peaks])
    
    return p_peaks

def extract_t_peaks(ecg_lead, amplitude_range=(150, 250), peaks=None):
    """
    Extract T-peak locations from the ECG lead based on an amplitude range,
    excluding the R-peak regions.
    """
    if peaks is None:
        peaks, _ = find_peaks(ecg_lead, distance=350)

    t_peaks = []
    for i in range(len(peaks) - 1):
        r_peak = peaks[i]
        r_next_peak = peaks[i+1]
        
        # Define a search window after the R-peak
        window_start = r_peak + 50  # Adjust this value as needed
        window_end = r_next_peak - 200  # Adjust this value as needed
        
        # Find peaks within the search window and amplitude range
        window_peaks, _ = find_peaks(ecg_lead[window_start:window_end], height=amplitude_range)
        
        # Add the peak locations to t_peaks, adjusting for the window offset
        t_peaks.extend([window_start + peak for peak in window_peaks])
    
    return t_peaks

def calculate_intervals(ecg_lead, p_peaks, r_peaks, t_peaks, sampling_rate):
    intervals = []
    for i, r_peak in enumerate(r_peaks):
        if i == 0 or i == len(r_peaks) - 1:
            continue  # Skip the first and last R-peak

        # Find the nearest P-peak before the R-peak
        p_peak_idx = np.argmax(np.array(p_peaks) < r_peak)
        p_peak = p_peaks[p_peak_idx]

        # Find the nearest T-peak after the R-peak
        t_peak_idx = np.argmax(np.array(t_peaks) > r_peak)
        t_peak = t_peaks[t_peak_idx]

        # Calculate PR, RT, and PT intervals in seconds
        pr_interval = (r_peak - p_peak) / sampling_rate
        rt_interval = (t_peak - r_peak) / sampling_rate
        pt_interval = (t_peak - p_peak) / sampling_rate

        intervals.append([pr_interval, rt_interval, pt_interval])

    return np.array(intervals)

# Example usage
sampling_rate = 500  # Assuming a sampling rate of 500 Hz
file_path = "./124.asc"  # Replace with the actual path to your file
ecg_data = np.loadtxt(file_path)
# ecg_data = np.loadtxt("ecg_data.txt")  # Replace with your ECG data loading method

# Select the desired leads (1, 2, 5, 6, 7, 8)
selected_leads = [0, 1, 4, 5, 6, 7]  # Indices of the selected leads
ecg_data_selected = ecg_data[:, selected_leads]

# Create a list to store the 2D arrays for each lead
lead_intervals = []

for i, ecg_lead in enumerate(ecg_data_selected.T):
    # Find R-peak locations
    peaks, _ = find_peaks(ecg_lead, distance=350)

    # Extract P-peaks and T-peaks
    p_peaks = extract_p_peaks(ecg_lead, amplitude_range=(50, 150), peaks=peaks)
    t_peaks = extract_t_peaks(ecg_lead, amplitude_range=(150, 250), peaks=peaks)

    # Calculate PR, RT, and PT intervals
    intervals = calculate_intervals(ecg_lead, p_peaks, peaks, t_peaks, sampling_rate)
    lead_intervals.append(intervals)

# Convert the list of arrays to a single 2D array
all_lead_intervals = np.array(lead_intervals)
print(all_lead_intervals.shape)

(6, 10, 3)


In [4]:
# all_lead_intervals