In [3]:
import numpy as np
import pandas as pd
import plotly.graph_objects as go
from database_tools.preprocessing.functions import bandpass, find_peaks

pd.options.plotting.backend = 'plotly'

In [4]:
ppg = []
ppg_filt = []
ppg_filt_flip = []
for x in range(35, 44):
    with open(f'data/TX{x}DATAPPG0.txt', 'r') as f:
        ppg_i = np.array([l.strip('\n') for l in f.readlines()][4::]).astype(int)
        # ppg_i_filt = bandpass(ppg_i, low=0.5, high=3.0, fs=200)
        # ppg_i_filt_flip = ppg_i_filt * -1 + 4000

    with open(f'data/TX{x}DATAPPG1.txt', 'r') as f:
        ppg_r = np.array([l.strip('\n') for l in f.readlines()][4::]).astype(int)
        # ppg_r_filt = bandpass(ppg_r, low=0.5, high=3.0, fs=200)
        # ppg_r_filt_flip = ppg_r_filt * -1 + 4000

    ppg.append([ppg_i, ppg_r])
    # ppg_filt.append([ppg_i_filt, ppg_r_filt])
    # ppg_filt_flip.append([ppg_i_filt_flip, ppg_r_filt_flip])

ppg = np.array(ppg)
# ppg_filt = np.array(ppg_filt)
# ppg_filt_flip = np.array(ppg_filt_flip)
ppg.shape

(9, 2, 4100)

In [6]:
import pickle as pkl

with open('test-data.pkl', 'wb') as f:
    pkl.dump(ppg, f)

with open('test-data.pkl', 'rb') as f:
    data = pkl.load(f)

In [5]:
pd.Series(ppg[:, 0, :].reshape(-1)).plot()
# pd.Series(ppg[:, 1, :].reshape(-1)).plot()
# pd.Series(ppg_filt[:, 0, :].reshape(-1)).plot()
# pd.Series(ppg_filt[:, 1, :].reshape(-1)).plot()
# pd.Series(ppg_filt_flip[:, 0, :].reshape(-1)).plot()

In [None]:
pd.Series(ppg_filt_flip[0, 0, :]).plot()

In [None]:
def repair_peaks_troughs_idx(peaks: list, troughs: list):
    """Takes a list of peaks and troughs and removes
       out of order elements. Regardless of which occurs first,
       a peak or a trough, a peak must be followed by a trough
       and vice versa.

    Args:
        peaks (list): Signal peaks.
        troughs (list): Signal troughs.

    Returns:
        first_repaired (list): Input with out of order items removed.
        second_repaired (list): Input with out of order items removed.

        Items are always returned with peaks idx as first tuple item.
    """
    # Configure algorithm to start with lowest index.
    if peaks[0] < troughs[0]:
        first = peaks
        second = troughs
    else:
        second = peaks
        first = troughs

    first_repaired, second_repaired = [], []  # lists to store outputs
    i_first, i_second = 0, 0  # declare starting indices
    for _ in enumerate(first):
        try:
            poi_1 = first[i_first]
            poi_2 = second[i_second]
            if poi_1 < poi_2:  # first point of interest is before second
                poi_3 = first[i_first + 1]
                if poi_2 < poi_3:  # second point of interest is before third
                    first_repaired.append(poi_1)
                    second_repaired.append(poi_2)
                    i_first += 1
                    i_second += 1
                else:  # first without iterating second
                    i_first += 1
            else: # inverse of other else condition
                i_second += 1
        except IndexError: # catch index error (always thrown in last iteration)
            first_repaired.append(poi_1)
            second_repaired.append(poi_2)

    # place indices in the correct order
    if peaks[0] < troughs[0]:
        return (first_repaired, second_repaired)
    else:
        return (second_repaired, first_repaired)

def detect_notches(sig, peaks, troughs, dx=10):
    """Detect dichrotic notch by find the maximum velocity
       at least 10 samples after peak and 30 samples before
       the subsequent trough.
    

    Args:
        sig (np.ndarray): Cardiac signal.
        peaks (list): List of signal peak indices.
        troughs (list): List of signal trough indices.
        dx (int, optional): Spacing between sig values (for np.gradient). Defaults to 10.

    Returns:
        notches (list): List of dichrotic notch indices.
    """
    if peaks[0] > troughs[0]:
        troughs = troughs[1::]

    notches = []
    for i, j in zip(peaks, troughs):
        try:
            vel = np.gradient(sig[i:j], dx)
        except ValueError:  # gradient fails if slice of sig is too small
            continue
        n = np.argmax(vel[10:-30])
        notches.append(n + i)  # add first index of slice to get correct notch index

    # look for a notch after the last peak if the highest index is a peak.
    if peaks[-1] > troughs[-1]:
        vel = np.gradient(sig[peaks[-1]::], dx)
        n = np.argmax(vel[10:-30])
        notches.append(n + peaks[-1])
    return notches

In [None]:
x = ppg_filt_flip[5, 0, :]
x_pad = np.pad(x, pad_width=40, constant_values=np.mean(x))

peaks, troughs = find_peaks(x_pad).values()
peaks = np.array(peaks) - 41
troughs = np.array(troughs) - 41

peaks, troughs = repair_peaks_troughs_idx(peaks, troughs)

notches = detect_notches(x, peaks, troughs, dx=10)

fig = go.Figure()

fig.add_scatter(y=x)
fig.add_scatter(x=peaks, y=x[peaks], mode='markers')
fig.add_scatter(x=troughs, y=x[troughs], mode='markers')
fig.add_scatter(x=notches, y=x[notches], mode='markers')