https://github.com/gslapnicar/bp-estimation-mimic3/blob/master/cleaning_scripts/flat_lines.m  
https://github.com/gslapnicar/bp-estimation-mimic3/blob/master/cleaning_scripts/flat_peaks.m  
https://github.com/gslapnicar/bp-estimation-mimic3/blob/master/cleaning_scripts/main.m

ChatGPT

In [None]:
import numpy as np
import matplotlib.pyplot as plt

def flat_lines(data, window, incline, show):
    # Inputs:
    #   data ... 2xN matrix (containing signal PPG in the first and ABP in the second dimension)
    #   window ... size of the sliding window
    #   incline .. boolean, check for a small inclines yi == y(i+1) +- 1
    #   show ... boolean, show plots or not
    # Outputs:
    #   per_ppg/abp ... percentage of points that are considered flat
    
    # Flat line in ABP and PPG -> sliding window over the whole thing
    len_data = data.shape[1]
    flat_locs_abp = np.ones((len_data - window + 1,), dtype=bool)
    flat_locs_ppg = np.ones((len_data - window + 1,), dtype=bool)
    
    # Get the locations where i == i+1 == i+2 ... == i+window
    # efficient-ish sliding window
    for i in range(1, window):
        tmp_abp = (data[1, :len-flat_locs_abp] == data[1, i:len_data - window + i])
        tmp_ppg = (data[0, :len_data - window + 1] == data[0, i:len_data - window + i])
        
        # can be generalized -> for loop, if so desired
        if incline:
            # +1
            tmp_abp2 = (data[1, :len_data - window + 1] == data[1, i:len_data - window + i] + 1)
            tmp_ppg2 = (data[0, :len_data - window + 1] == data[0, i:len_data - window + i] + 1)
            # -1
            tmp_abp3 = (data[1, :len_data - window + 1] == data[1, i:len_data - window + i] - 1)
            tmp_ppg3 = (data[0, :len_data - window + 1] == data[0, i:len_data - window + i] - 1)
            # OR
            tmp_abp = (tmp_abp | tmp_abp2 | tmp_abp3)
            tmp_ppg =(tmp_ppg | tmp_ppg2 | tmp_ppg3)
        
        flat_locs_abp = (flat_locs_abp & tmp_abp)
        flat_locs_ppg = (flat_locs_ppg & tmp_ppg)
    
    # Extend to be the same size as data
    flat_locs_ppg = np.concatenate((flat_locs_ppg, np.zeros((window - 1,), dtype=bool)))
    flat_locs_abp = np.concatenate((flat_locs_abp, np.zeros((window - 1,), dtype=bool)))
    
    flat_locs_ppg2 = flat_locs_ppg.copy()
    flat_locs_abp2 = flat_locs_abp.copy()
    
    # Mark the ends of the window
    for i in range(1, window):
        flat_locs_abp[i:] = flat_locs_abp[i:] | flat_locs_abp2[:len_data - window + 1 - i]
        flat_locs_ppg[i:] = flat_locs_ppg[i:] | flat_locs_ppg2[:len_data - window + 1 - i]
    
    # Percentages
    per_abp = np.sum(flat_locs_abp) / len_data
    per_ppg = np.sum(flat_locs_ppg) / len
    if show:
        # Plot the flat line points
        x = np.arange(1, len_data + 1)
        
        plt.subplot(2, 1, 1)
        plt.plot(x, data[0, :], 'black')
        plt.scatter(x[flat_locs_ppg], data[0, flat_locs_ppg], color='red')
        
        plt.subplot(2, 1, 2)
        plt.plot(x, data[1, :], 'black')
        plt.scatter(x[flat_locs_abp], data[1, flat_locs_abp], color='red')
        
        plt.show()
    
    return per_ppg, per_abp

In [None]:
def flat_peaks(signal, abp_peaks, abp_valleys, ppg_peaks, ppg_valleys, abp_thresh, ppg_thresh, window, graphs):
    # This function checks the signal for flat peaks. Flat peaks are an anomaly in the collected data
    # and such signals are not useful, thus must be discarded.
    #
    # Input:
    #   signal  ... signal for current patient (ABP and PPG)
    #   abp_peaks   ... peak locations for ABP
    #   ppg_peaks   ... peak locations for PPG                  
    #   abp_valleys ... cycle start/end points for ABP
    #   ppg_valleys ... cycle start/end points for PPG          
    #   ppg_thresh / abp_thresh ... threshold amount (fraction or %) of flat peaks that must be exceeded in order for this signal to be discarded
    #   window    ... how many points are considered a flat line/top
    # Output:
    #   skip_ppg/skip_abp ... binary values, 1 = skip this signal due to flat peaks, 0 = keep this signal

    # show the plots or not
    if graphs:
        show = True
    else:
        show = False
    
    number_of_peaks_abp = len(abp_peaks)
    number_of_valleys_abp = len(abp_valleys)

    number_of_peaks_ppg = len(ppg_peaks)
    number_of_valleys_ppg = len(ppg_valleys)

    # first get the flat lines:
    len = signal.shape[1]
    flat_locs_abp = np.ones((1, len-window+1))
    flat_locs_ppg = np.ones((1, len-window+1))

    # get the locations where i == i+1 == i+2 ... == i+window
    # efficient-ish sliding window
    for i in range(1, window+1):
        flat_locs_abp = flat_locs_abp & (signal[1, 0:len-window+1] == signal[1, i:len-window+i])
        flat_locs_ppg = flat_locs_ppg & (signal[0, 0:len-window+1] == signal[0, i:len-window+i])

    # extend to be the same size as data
    flat_locs_ppg = np.concatenate((flat_locs_ppg, np.zeros((1, window-1))), axis=1)
    flat_locs_abp = np.concatenate((flat_locs_abp, np.zeros((1, window-1))), axis=1)

    # additional arrays
    abp_peak_ones = np.zeros((1, signal.shape[1]))
    abp_peak_ones[0, abp_peaks] = 1
    abp_valley_ones = np.zeros((1, signal.shape[1]))
    abp_valley_ones[0, abp_valleys] = 1
    ppg_peak_ones = np.zeros((1, signal.shape[1]))
    ppg_peak_ones[0, ppg_peaks] = 1
    ppg_valley_ones = np.zeros((1, signal.shape[1]))
    ppg_valley_ones[0, ppg_valleys] = 1

    # extract the needed info:
    locs_of_flat_peaks_abp = np.where(flat_locs_abp & abp_peak_ones)
    locs_of_flat_valleys_abp = np.where(flat_locs_abp & abp_valley_ones)
    number_of_flat_peaks_abp = locs_of_flat_peaks_abp.shape[1]
    number_of_flat_valleys_abp = locs_of_flat_valleys_abp.shape[1]

    locs_of_flat_peaks_ppg = np.where(flat_locs_ppg & ppg_peak_ones)
    locs_of_flat_valleys_ppg = np.where(flat_locs_ppg & ppg_valley_ones)
    number_of_flat_peaks_ppg = locs_of_flat_peaks_ppg.shape[1]
    number_of_flat_valleys_ppg = locs_of_flat_valleys_ppg.shape[1]

    # thresholding and plotting
    skip_ppg = 0
    skip_abp = 0
    abp_conclusion = ' KEEP!'
    ppg_conclusion = ' KEEP!'
    abp_mark = '*g'
    ppg_mark = '*g'

    if (number_of_flat_peaks_abp >= abp_thresh*number_of_peaks_abp) or (number_of_flat_valleys_abp >= abp_thresh*number_of_valleys_abp):
        abp_conclusion = ' SKIP!'
        abp_mark = '*r'
        skip_abp = 1

    if (number_of_flat_peaks_ppg >= ppg_thresh*number_of_peaks_ppg) or (number_of_flat_valleys_ppg >= ppg_thresh*number_of_valleys_ppg):
        ppg_conclusion = ' SKIP!'
        ppg_mark = '*r'
        skip_ppg = 1

    if show and (skip_abp or skip_ppg):
        print(skip_abp)
        print(skip_ppg)
        print(f'This ABP signal has more than 10% flat peaks, thus{abp_conclusion}')
        plt.figure()
        plt.plot(signal[1])
        plt.hold(True)
        plt.plot(np.unique(locs_of_flat_peaks_abp), signal[1, locs_of_flat_peaks_abp], abp_mark)
        plt.plot(np.unique(locs_of_flat_valleys_abp), signal[1, locs_of_flat_valleys_abp], abp_mark)
        plt.title(f'ABP{abp_conclusion}')
        plt.hold(False)

        print(f'This signal PPG has more than 10% flat peaks, thus{ppg_conclusion}')
        plt.figure()
        plt.plot(signal[0])
        plt.hold(True)
        plt.plot(np.unique(locs_of_flat_peaks_ppg), signal[0, locs_of_flat_peaks_ppg], ppg_mark)
        plt.plot(np.unique(locs_of_flat_valleys_ppg), signal[0, locs_of_flat_valleys_ppg], ppg_mark)
        plt.title(f'PPG{ppg_conclusion}')
        plt.hold(False)

    return skip_ppg, skip_abp