In [None]:
import pyarrow as pa
import pyarrow.parquet as pq
import polars as pl
import numpy as np
from pathlib import Path
from typing import Any

In [None]:
# try to read again
FILE_NAMES = [
    "red_100Hz_2024-04-01_11-29-27",
    "red_100Hz_2024-04-01_14-40-56",
    "red_50Hz_2024-04-02_09-46-52",
]
FILE_NAME = Path(FILE_NAMES[2] + ".parquet")
table = pq.read_table(FILE_NAME)
# read sample rate from filename
sample_rate_str = FILE_NAME.stem.split("_")[1]
_hz_idx = sample_rate_str.find("Hz")
sample_rate = int(sample_rate_str[:_hz_idx])
SAMPLE_RATE = sample_rate
SAMPLE_INTERVAL = 1 / SAMPLE_RATE
display(f"Sample rate: {SAMPLE_RATE} Hz", f"Sample interval: {SAMPLE_INTERVAL} s")

In [None]:
data = table["red"].to_numpy()
data.shape

In [None]:
import matplotlib.pyplot as plt
import plotly.express as px
import plotly.graph_objects as go

# sample rate is 800Hz (1.25ms per sample)
xs = np.arange(0, data.shape[0] * 1.25e-3, 1.25e-3)
px.line(x=xs, y=data, title="Raw data")

In [None]:
from enum import Enum, auto

THRESHOLD = 1.5e6


class Level(Enum):
    LOW = auto()
    HIGH = auto()


Segment = tuple[int, int, Level]


def segment_data(data: np.ndarray, threshold: float | int) -> list[Segment]:
    last_index = 0
    last_state = Level.HIGH if data[0] > threshold else Level.LOW
    segments: list[Segment] = []
    for i, n in enumerate(data):
        if n > threshold:
            if last_state == Level.LOW:
                segments.append((last_index, i, Level.LOW))
                last_index = i
                last_state = Level.HIGH
            else:
                continue
        else:
            if last_state == Level.HIGH:
                segments.append((last_index, i, Level.HIGH))
                last_index = i
                last_state = Level.LOW
            else:
                continue
        if i == len(data) - 1:
            segments.append((last_index, i, last_state))
    return segments

segments = segment_data(data, THRESHOLD)

In [None]:
def segment_length(segment: Segment) -> int:
    return segment[1] - segment[0]

segment_lens = [segment_length(segment) for segment in segments]
np.percentile(segment_lens, 75)

In [None]:
real_segments = [s for s in segments if segment_length(s) > 100]
display(real_segments)

In [None]:
# high plot as red, low plot as blue
for segment in real_segments:
    color = "red" if segment[2] == Level.HIGH else "blue"
    plt.axvspan(segment[0] * 1.25e-3, segment[1] * 1.25e-3, color=color, alpha=0.5)

In [None]:
# we're only interested in the high segments
high_segments_idx = [s for s in real_segments if s[2] == Level.HIGH]
display(high_segments_idx)
high_segments = [data[s[0]:s[1]] for s in high_segments_idx]

# lucky = random.sample(high_segments, 1)[0]
# lucky_idx = random.randint(0, len(high_segments) - 1)
lucky_idx = 0
display(f"lucky index: {lucky_idx}")
# 2 might be a good one
# 1484 : 70_000
lucky = high_segments[lucky_idx]
# filter out below 1 percentile and above 99 percentile
# filtered_lucky = np.clip(lucky, np.percentile(lucky, 1),
#                          np.percentile(lucky, 99))
# TODO: maybe doing some edge detection
# like 1D canny
# I don't feel the necessity if DC offset is removed (we have different significant DC offset)
xs = np.array(range(len(lucky)))
xs_time = xs * SAMPLE_INTERVAL
# px.line(y=lucky, x=xs).show()
trace = go.Scatter(x=xs, y=lucky, mode="lines")
trace_time = go.Scatter(x=xs_time, y=lucky, mode="lines")
fig = go.Figure(data=[trace_time, trace])
# https://community.plotly.com/t/can-plotly-support-2-x-axis-and-2-y-axis-in-one-graph/38303/2
fig.update_layout(
    xaxis=dict(title="Sample Index"),
    yaxis=dict(title="Red LED Reading (ADC Value)"),
    xaxis2=dict(title="Time (s)", overlaying="x", side="top"),
)
fig.data[0].update(xaxis="x2", yaxis="y", line=dict(color="rgba(0,0,0,0)")) # type: ignore
fig.update_layout(showlegend=False)
fig.show()

In [None]:
from typing import Optional


workable_data:Optional[np.ndarray] = lucky
# if FILE_NAME.stem == "red_100Hz_2024-04-01_11-29-27":
#     if lucky_idx == 1:
#         workable_data = lucky[4299:-100]
#     if lucky_idx == 2:
#         workable_data = lucky[765:-50]
#     if lucky_idx == 6:
#         workable_data = lucky[1678:-200]

xs_time = np.array(range(len(workable_data))) * SAMPLE_INTERVAL # type: ignore
px.line(y=workable_data, x=xs_time).show()

In [None]:
import heartpy as hp
from scipy.signal import butter, detrend, filtfilt, iirnotch, savgol_filter, wiener, sosfilt, sosfiltfilt, freqz, sosfreqz, ellip
from scipy.io import loadmat
from heartpy import filter_signal
# https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.ellip.html

mat = loadmat("HR_filter_ba.2.50Hz.mat")

In [None]:
b_2 = mat["b"].flatten()
a_2 = mat["a"].flatten()
display({
    "b": b_2,
    "a": a_2
})

In [None]:
scipy_bp_2 = butter(1, [0.8, 5], btype="band", fs=SAMPLE_RATE, output="sos")
b_s_2, a_s_2 = butter(1, [0.8, 5], btype="band", fs=SAMPLE_RATE, output="ba")
filtered_scipy = sosfiltfilt(scipy_bp_2, workable_data)

display(f"scipy: {b_s_2.shape}, {a_s_2.shape}")
display(f"matlab 2nd order: {b_2.shape}, {a_2.shape}")
# in scipy 2nd order is the 4th order in matlab

worN = 4000

w, h = sosfreqz(scipy_bp_2, worN=worN)
w_2, h_2 = freqz(b_2, a_2, worN=worN)

fig, ax1 = plt.subplots()
ax1.set_title("Digital filter frequency response")
ax1.set_ylabel("Amplitude (ratio)")
ax1.set_xlabel("Frequency (Hz)")
ax1.grid()
ax1.set_xlim([0, 10])

ax1.plot(0.5 * SAMPLE_RATE * w / np.pi, np.abs(h), label="scipy (2nd order)")
ax1.plot(0.5 * SAMPLE_RATE * w_2 / np.pi,
         np.abs(h_2),
         label="matlab (2nd order) ellip")

# some how the matlab filter is significantly worse than scipy
ax1.legend()
plt.show()

In [None]:
# 0.4Hz to 100Hz
# https://github.com/paulvangentcom/heartrate_analysis_python/blob/master/examples/1_regular_PPG/Analysing_a_PPG_signal.ipynb
# https://github.com/paulvangentcom/heartrate_analysis_python/blob/master/examples/5_noisy_ECG/Analysing_Noisy_ECG.ipynb
# https://github.com/paulvangentcom/heartrate_analysis_python/blob/master/docs/algorithmfunctioning.rst
# https://github.com/paulvangentcom/heartrate_analysis_python/blob/master/docs/heartrateanalysis.rst

# remove_baseline_wander is just a notch filter applied to low frequency (to remove DC offset)
# notch filter to remove DC offset
# enhance_ecg_peaks is useless
# the high pass/low pass/band pass filter here are all butterworth filter

# We will use the bandpass variant.
# we filter out frequencies below 0.8Hz (<= 48 bpm) (bpm = 60 x Hz)
# and above 3Hz (>= 180 bpm)
# Second-order sections (SOS) matrix and gain values (G) from MATLAB

# by default it only has 2nd order filter

filtered_mat = filtfilt(b_2, a_2, workable_data)
filtered_scipy = sosfiltfilt(scipy_bp_2, workable_data)

# drop the rediculously high values
# I'm not sure about the value range
filtered_scipy = np.clip(filtered_scipy, -255, 255 - 1)
filtered_mat = np.clip(filtered_mat, -255, 255 - 1)

trace_bp_matlab = go.Scatter(x=xs_time,
                               y=filtered_mat,
                               mode="lines",
                               name="Bandpass Filtered (MATLAB)")
trace_bp = go.Scatter(x=xs_time,
                      y=filtered_scipy,
                      mode="lines",
                      name="Bandpass Filtered (Scipy)")
fig = go.Figure(data=[trace_bp, trace_bp_matlab])
fig.update_layout(
    xaxis=dict(title="Time (s)"),
    yaxis=dict(title="Red LED Reading (ADC Value)"),
)
fig.show()

In [None]:
from typing import Tuple, Union
number = Union[int, float]
NDArray = np.ndarray

In [None]:
from dataclasses import dataclass
from jaxtyping import Int, Float, Bool
from typeguard import typechecked
from numpy.lib.stride_tricks import sliding_window_view
IntArray1D = Int[NDArray, "..."]


# https://leetcode.cn/problems/sliding-window-median
# https://ipython-books.github.io/47-implementing-an-efficient-rolling-average-algorithm-with-stride-tricks/
# https://aman.ai/code/sliding-window/
# https://oi-wiki.org/ds/monotonous-queue/


# np.pad(input, (size_before, size_after), mode="edge")
# https://github.com/scipy/scipy/blob/2ecac3e596fdb458c85000e7707a8f5f46926621/scipy/ndimage/src/ni_support.c#L222
@typechecked
def extend_input(input: NDArray, size_before: int,
                 size_after: int) -> NDArray:
    """
    abcd -> abcdcba | abcd | dcbabcd
    """
    line_len = len(input)
    before_size_diff = line_len - size_before
    # [::-1] is python way to reverse (I prefer use `reversed` though)

    if size_before != 0:
        before = input[:size_before][::-1]
        if before_size_diff < 0:
            sz = abs(before_size_diff)
            before = np.concatenate([before, input[:sz][::-1]])
    else:
        before = np.array([])

    if size_after != 0:
        after_size_diff = line_len - size_after
        after = input[-size_after:][::-1]
        if after_size_diff < 0:
            sz = abs(after_size_diff)
            after = np.concatenate([after, input[:sz][::-1]])
    else:
        after = np.array([])

    return np.concatenate([before, input, after])


def rolling_mean(input: NDArray, window_size: int) -> Tuple[NDArray, number]:
    """
    input: 1D array
    window_size: window size
    """
    assert window_size > 0, "Window size must be greater than 0"
    size_1 = int(window_size / 2)
    size_2 = window_size - size_1 - 1
    padded = extend_input(input, size_1, size_2)
    var_summation = np.sum(padded[:window_size])
    output = np.zeros_like(input)
    div = var_summation / window_size
    output[0] = div

    summation = div
    # no idea how these crazy size aligns
    for i in range(window_size, len(padded)):
        var_summation += padded[i]
        var_summation -= padded[i - window_size]
        div = var_summation / window_size
        summation += div
        output[i - window_size + 1] = div

    approx_mean = summation / len(input)
    return output, approx_mean


In [None]:
import unittest


class TestRollingMean(unittest.TestCase):

    def test_sz(self):
        input_array = np.array([1, 2, 3, 4, 5])
        r, a = rolling_mean(input_array, 3)
        self.assertEqual(len(r), len(input_array))
    
    def test_approx_mean(self):
        input_array = np.array([1, 2, 3, 4, 5, 52])
        r, a = rolling_mean(input_array, 10)
        np.testing.assert_almost_equal(a, np.mean(input_array), decimal=0)




class TestExtendInput(unittest.TestCase):

    def test_normal_case(self):
        """Test case where size_before and size_after are less than the array length"""
        input_array = np.array([1, 2, 3, 4])
        expected_output = np.array([2, 1, 1, 2, 3, 4, 4, 3])
        np.testing.assert_array_equal(extend_input(input_array, 2, 2),
                                      expected_output)

    def test_size_before_larger(self):
        """Test case where size_before is larger than the array length"""
        input_array = np.array([1, 2, 3, 4])
        expected_output = np.array([4, 3, 2, 1, 1, 1, 2, 3, 4, 4, 3])
        np.testing.assert_array_equal(extend_input(input_array, 5, 2),
                                      expected_output)

    def test_size_after_larger(self):
        """Test case where size_after is larger than the array length"""
        input_array = np.array([1, 2, 3, 4])
        expected_output = np.array([2, 1, 1, 2, 3, 4, 4, 3, 2, 1, 1])
        np.testing.assert_array_equal(extend_input(input_array, 2, 5),
                                      expected_output)

    def test_both_sizes_larger(self):
        """Test case where size_before and size_after are larger than the array length"""
        input_array = np.array([1, 2, 3, 4])
        expected_output = np.array([4, 3, 2, 1, 1, 1, 2, 3, 4, 4, 3, 2, 1, 1])
        np.testing.assert_array_equal(extend_input(input_array, 5, 5),
                                      expected_output)

    def test_empty_array(self):
        """Test case where the input array is empty"""
        input_array = np.array([])
        expected_output = np.array([])
        np.testing.assert_array_equal(extend_input(input_array, 2, 2),
                                      expected_output)

    def test_zero_sizes(self):
        """Test case where size_before and size_after are zero"""
        input_array = np.array([1, 2, 3, 4])
        expected_output = np.array([1, 2, 3, 4])
        np.testing.assert_array_equal(extend_input(input_array, 0, 0),
                                      expected_output)

    def test_zero_size_before(self):
        """Test case where size_before is zero and size_after is larger than the array length"""
        input_array = np.array([1, 2, 3, 4])
        expected_output = np.array([1, 2, 3, 4, 4, 3, 2, 1, 1])
        np.testing.assert_array_equal(extend_input(input_array, 0, 5),
                                      expected_output)

    def test_zero_size_after(self):
        """Test case where size_before is larger than the array length and size_after is zero"""
        input_array = np.array([1, 2, 3, 4])
        expected_output = np.array([4, 3, 2, 1, 1, 1, 2, 3, 4])
        np.testing.assert_array_equal(extend_input(input_array, 5, 0),
                                      expected_output)

    def test_single_element_array(self):
        """Test case where the input array has a single element"""
        input_array = np.array([1])
        expected_output = np.array([1, 1, 1])
        np.testing.assert_array_equal(extend_input(input_array, 1, 1),
                                      expected_output)


unittest.main(argv=[''], exit=False)


In [None]:
@typechecked
def detect_peaks(
        hr_data: Float[NDArray, "sz"],  # noqa: F821
        rol_mean: Float[NDArray, "sz"],  # noqa: F821
        mean: number,
        ma_perc: number) -> Optional[IntArray1D]:
    """
    Detect peaks in heart rate data based on a rolling mean threshold.

    This function identifies peaks in the given heart rate data by comparing the data points
    against a rolling mean threshold. The threshold is calculated by scaling the rolling mean
    with a specified percentage.

    Parameters:
    -----------
    hr_data : NDArray
        An array containing the heart rate data points.
    rol_mean : NDArray
        An array containing the rolling mean values corresponding to each data point in hr_data.
        The length of rol_mean must be the same as hr_data.
    mean : number
        The mean value used to calculate the threshold.
    ma_perc : number
        The percentage used to scale the rolling mean. It must be a value between 0 and 2 (exclusive).
        For example, 0.1 means 10% of the peak value.

    Returns:
    --------
    NDArray
        An array containing the indices of the detected peaks in hr_data.
    """
    assert len(hr_data) == len(
        rol_mean), "Length of input data and rolling mean must be the same"
    assert ma_perc > 0, "Percentage must be greater than 0"
    assert 0 < ma_perc <= 3, "Percentage must be between 0 and 2 (0.1 means 10% of the peak value)"
    assert len(hr_data) > 0, "Input data must not be empty"

    mn = mean * ma_perc
    # this comment exists in heartpy already
    # might be an alternative way to calculate the scaled rolling mean
    #
    # r_mean = rol_mean + rol_mean * ma_perc + mn
    r_mean = rol_mean + mn

    data_ps = np.vstack((np.arange(len(hr_data)), hr_data)).T
    # grab the peak based on the scaled rolling mean
    peak_ps = data_ps[np.where(data_ps[:, 1] > r_mean)]
    if len(peak_ps) == 0:
        return None
    # not sure about this
    last_p = peak_ps[-1]

    # remove peaks that are too close to each other (peak should NOT appear
    # continuously, like the flat part of square wave)
    excl_cont_ps_ = data_ps[np.where(np.diff(peak_ps[:, 0]) > 1)]
    if len(excl_cont_ps_) == 0:
        return None
    # np.diff will return n-1 elements, add one element back (should be optional)
    excl_cont_ps = np.vstack([excl_cont_ps_, last_p])

    peak_idxs = np.array([], dtype=int)

    # find the max y-value in each interval
    # put the index of the max y-value into peak_idxs
    for p in sliding_window_view(excl_cont_ps[:, 0], 2):
        x_0 = int(p[0])
        x_1 = int(p[1])
        interval_ps = peak_ps[x_0:x_1]
        max_idx = np.argmax(interval_ps[:, 1])
        origin_idx = int(interval_ps[max_idx][0])
        peak_idxs = np.append(peak_idxs, int(origin_idx))

    return peak_idxs.astype(int)


In [None]:
# we need 0.75s (at some sample rate)
window_size = int(0.75 / SAMPLE_INTERVAL)
r, a = rolling_mean(filtered_mat, window_size)
display(
    f"Window size: {window_size}, Approximate mean: {a}, data mean: {np.mean(filtered_mat)}, rolling mean: {np.mean(r)}"
)

In [None]:
from warnings import warn


@dataclass
class RRInterval:
    # the RR interval in milliseconds
    rr: Float[NDArray, "..."]
    # the index of the peaks in pairs
    rr_idxs: Int[NDArray, "... 2"]


@typechecked
def preprocess_peaks_idxs(peaks_idxs: IntArray1D,
                          sample_rate: int) -> IntArray1D:
    assert sample_rate > 0, "Sample rate must be greater than 0"
    assert peaks_idxs.ndim == 1, "Peaks must be a 1D array"
    assert len(peaks_idxs) > 1, "Peaks must contain at least 2 elements"
    working_peaks = peaks_idxs
    if peaks_idxs[0] <= (sample_rate / 1000 * 150):
        working_peaks = peaks_idxs[1:]
    return working_peaks


@typechecked
def calc_rr_list(peaks_idxs: IntArray1D, sample_rate: int) -> RRInterval:
    assert peaks_idxs.ndim == 1, "Peaks must be a 1D array"
    assert len(peaks_idxs) > 1, "Peaks must contain at least 2 elements"
    rr_list = np.diff(peaks_idxs) / sample_rate * 1000
    rr_idxs = sliding_window_view(peaks_idxs, 2).astype(int)
    return RRInterval(rr_list, rr_idxs)


@typechecked
def calc_bpm_by_len(rr_len: int, hr_sample_len: int,
                    sample_rate: number) -> float:
    assert rr_len > 0, "RR interval length must be greater than 0"
    assert hr_sample_len > 0, "Heart rate sample length must be greater than 0"
    assert sample_rate > 0, "Sample rate must be greater than 0"
    assert hr_sample_len > rr_len, "Heart rate sample length must be greater than RR interval length"
    return rr_len / (hr_sample_len / sample_rate) * 60


@typechecked
def fit_peaks(data: Float[NDArray, "..."] | Int[NDArray, "..."],
              sample_rate: int,
              hr_max: int = 190,
              hr_min: int = 48,
              rrsd_min: float = 0.1,
              rrsd_max: float = 1_000) -> Tuple[IntArray1D, RRInterval]:
    assert sample_rate > 0, "Sample rate must be greater than 0"
    assert hr_max > 0, "Maximum heart rate must be greater than 0"
    assert hr_min > 0, "Minimum heart rate must be greater than 0"
    assert hr_max > hr_min, "Maximum heart rate must be greater than minimum heart rate"
    assert len(data) > 0, "Data must not be empty"
    bl_val = np.min(data)
    workable_data = data.copy()
    if bl_val < 0:
        workable_data = workable_data + abs(bl_val)
    sample_interval = 1 / sample_rate
    # 0.75ms
    window_size = int(0.75 / sample_interval)
    r, a = rolling_mean(workable_data, window_size)

    # ma_perc_list = [0.05, 0.1, 0.15, 0.2, 0.25, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1, 1.1, 1.2, 1.5, 2, 3]
    ma_perc_candidate: list[float] = [0.04, 0.1, 0.25, 0.5, 0.75, 1.25, 3]
    # candidate should be sorted in ascending order
    ma_perc_candidate = sorted(ma_perc_candidate)
    peak_idxs: Optional[NDArray] = None
    rr: Optional[RRInterval] = None
    for ma_perc in ma_perc_candidate:
        _data = workable_data.copy()
        _peak_idxs = detect_peaks(_data, r, a, ma_perc)
        if _peak_idxs is None:
            warn(f"No peaks detected with ma_perc: {ma_perc}")
            continue
        bpm = calc_bpm_by_len(len(_peak_idxs), len(_data), sample_rate)
        if bpm > hr_max or bpm < hr_min:
            warn(
                f"Detected heart rate is out of range: {bpm}bpm from ma_perc: {ma_perc}"
            )
            continue
        _rr = calc_rr_list(_peak_idxs, sample_rate)
        rr_std = np.std(_rr.rr)
        if rr_std < rrsd_min or rr_std > rrsd_max:
            warn(
                f"RR interval standard deviation is too high: {rr_std} from ma_perc: {ma_perc}"
            )
            continue
        peak_idxs = _peak_idxs
        rr = _rr
        break

    if peak_idxs is None or rr is None:
        raise ValueError("No valid peak detection found")
    return peak_idxs, rr

In [None]:
pc = np.percentile(filtered_mat, 0.1)
display(f"Percentile: {pc}")
peaks_idx, rr = fit_peaks(filtered_mat, SAMPLE_RATE)

In [None]:
@typechecked
def filter_rr_by_time(rr: Float[NDArray, "sz"]) -> Bool[NDArray, "sz"]:
    """
    Filter out RR interval that is too short or too long
    """
    # define RR range as mean +/- 30%, with a minimum of 300
    mean_rr = np.mean(rr)
    thirty = mean_rr * 0.3
    if thirty <= 300:
        upper_thresh = mean_rr + 300
        lower_thresh = mean_rr - 300
    else:
        upper_thresh = mean_rr + thirty
        lower_thresh = mean_rr - thirty

    return np.logical_and(rr < upper_thresh, rr > lower_thresh)


@typechecked
def filter_rr_by_quotient_filter_iter(
        rr: Float[NDArray, "sz"],
        upper_ratio: float = 1.25,
        lower_ratio: float = 0.75) -> Bool[NDArray, "..."]:
    """
    applies a quotient filter

    Function that applies a quotient filter as described in
    "Piskorki, J., Guzik, P. (2005), Filtering Poincare plots"
    """
    assert upper_ratio > 0, "Upper ratio must be greater than 0"
    assert lower_ratio > 0, "Lower ratio must be greater than 0"
    assert upper_ratio > lower_ratio, "Upper ratio must be greater than lower ratio"

    def quotient(fst: float, snd: float) -> Bool:
        return lower_ratio <= (fst / snd) <= upper_ratio

    # https://stackoverflow.com/questions/35215161/most-efficient-way-to-map-function-over-numpy-array
    ufunc_quotient = np.frompyfunc(quotient, 2, 1)
    window = sliding_window_view(rr, 2)
    mask_ = ufunc_quotient(window[:, 0], window[:, 1]).astype(bool)
    mask = np.concatenate([mask_, [True]])
    return mask


def filter_rr_by_quotient_filter(
    rr: Float[NDArray, "sz"],
    iterations: int = 1,
    upper_ratio: float = 1.25,
    lower_ratio: float = 0.75,
) -> Bool[NDArray, "..."]:
    """
    applies a quotient filter

    Function that applies a quotient filter as described in
    "Piskorki, J., Guzik, P. (2005), Filtering Poincare plots"
    """
    assert upper_ratio > 0, "Upper ratio must be greater than 0"
    assert lower_ratio > 0, "Lower ratio must be greater than 0"
    assert upper_ratio > lower_ratio, "Upper ratio must be greater than lower ratio"
    assert iterations > 0, "Iterations must be greater than 0"
    if iterations == 1:
        return filter_rr_by_quotient_filter_iter(rr, upper_ratio, lower_ratio)
    else:
        rr_with_idx = np.vstack([np.arange(len(rr)), rr]).T
        # iteration variables
        # the index that should be masked
        masked_idx = np.array([])
        # rr with the original index
        # (which will be fewer and fewer by each iteration)
        rr_idx_iter = rr_with_idx
        for _ in range(iterations):
            temp_mask = filter_rr_by_quotient_filter_iter(
                rr_idx_iter[:, 1], upper_ratio, lower_ratio)
            masked_idx = np.unique(
                np.concatenate([masked_idx, rr_idx_iter[~temp_mask][:, 0]]))
            rr_idx_iter = rr_idx_iter[temp_mask]
        # if index is in masked_idx, then it should be masked (being False in the return array)
        return np.logical_not(np.isin(rr_with_idx[:, 0], masked_idx))

In [None]:
# concatenate a False to the end since sliding_window_view will always return n-1 elements
rr_bool = filter_rr_by_time(rr.rr)
peaks_bool = np.concatenate([rr_bool, [False]])
rr_bool_quotient = filter_rr_by_quotient_filter(rr.rr, iterations=1)
peaks_bool_quotient = np.concatenate([rr_bool_quotient, [False]])
rejected_peaks_idx = peaks_idx[~peaks_bool]
rejected_peaks_idx_quotient = peaks_idx[~peaks_bool_quotient]

In [None]:
signal_trace = go.Scatter(y=filtered_mat,
                          mode="lines",
                          name="ADC Reading (filtered)")

roll_mean_trace = go.Scatter(y=r,
                             mode="lines",
                             name="moving average",
                             line=dict(color="black", width=0.5))

peaks_trace = go.Scatter(
    x=peaks_idx,
    y=filtered_mat[peaks_idx],
    mode="markers",
    name="peaks",
    marker=dict(color="green"),
)

peaks_rejected_trace = go.Scatter(
    x=rejected_peaks_idx,
    y=filtered_mat[rejected_peaks_idx],
    mode="markers",
    name="rejected peaks (time criteria)",
    marker=dict(color="red"),
)

peaks_rejected_quotient_trace = go.Scatter(
    x=rejected_peaks_idx_quotient,
    y=filtered_mat[rejected_peaks_idx_quotient],
    mode="markers",
    name="rejected peaks (quotient)",
    marker=dict(color="orange"),
)

rejected_both_idx = np.intersect1d(rejected_peaks_idx,
                                     rejected_peaks_idx_quotient)
peaks_rejected_both_trace = go.Scatter(
    x=rejected_both_idx,
    y=filtered_mat[rejected_both_idx],
    mode="markers",
    name="rejected peaks (both)",
    marker=dict(color="purple"),
)

layout = go.Layout(title="Plot",
                   xaxis=dict(title="Index"),
                   yaxis=dict(title="Value"),
                   showlegend=True)

fig = go.Figure(
    data=[
        signal_trace,
        roll_mean_trace,
        peaks_trace,
        peaks_rejected_trace,
        peaks_rejected_quotient_trace,
        peaks_rejected_both_trace,
    ],
    layout=layout,
)

fig.show()

In [None]:
rr_left_time_crit = rr.rr[rr_bool]
rr_left_quotient = rr.rr[rr_bool_quotient]
rr_left_both = rr.rr[(rr_bool & rr_bool_quotient)]
# the count that is left after filtering
display(f"before filtering: {len(rr.rr)}, time_criteria: {len(rr_left_time_crit)}, quotient: {len(rr_left_quotient)}, both: {len(rr_left_both)}")

In [None]:
from scipy.spatial.transform import Rotation as R

@dataclass
class Poincare:
    x_plus: Float[NDArray, "l"]
    x_minus: Float[NDArray, "l"]
    x_one: Float[NDArray, "l"]
    x_two: Float[NDArray, "l"]
    sd_1: number
    sd_2: number
    s: number
    ratio: number

@typechecked
def calc_poincare(rr: Float[NDArray, "sz"]):
    x_plus = np.array([], dtype=float)
    x_minus = np.array([], dtype=float)
    for pair in sliding_window_view(rr, 2):
        x_plus = np.append(x_plus, pair[0])
        x_minus = np.append(x_minus, pair[1])
    x_one = (x_plus - x_minus) / np.sqrt(2)
    x_two = (x_plus + x_minus) / np.sqrt(2)
    sd_1 = np.sqrt(np.var(x_one))
    sd_2 = np.sqrt(np.var(x_two))
    s = np.pi * sd_1 * sd_2
    ratio = sd_1 / sd_2
    return Poincare(x_plus, x_minus, x_one, x_two, float(sd_1), float(sd_2), float(s), float(ratio))

def plot_poincare(poincare: Poincare, title: str="Poincare Plot"):
    x_plus = poincare.x_plus
    x_minus = poincare.x_minus
    sd_1 = poincare.sd_1
    sd_2 = poincare.sd_2
    rr = go.Scatter(x=x_plus, y=x_minus,  mode='markers', name='RR intervals',
                         marker=dict(color="grey", opacity=0.75))
    mins = np.min([x_plus, x_minus])
    maxs = np.max([x_plus, x_minus])
    x_mn = np.mean(x_plus)
    y_mn = np.mean(x_minus)
    identity_line = np.linspace(mins, maxs, 100)
    identity = go.Scatter(x=identity_line, y=identity_line, mode='lines', name='identity line',
                      line=dict(color='black', dash='dash'))

    def rotate_vec(x:number, y:number, angle:number)->Tuple[number, number]:
        '''rotates vector around origin point

        Function that takes vector and angle, and rotates around origin point
        with given amount of degrees.

        Helper function for poincare plotting

        Parameters
        ----------
        x : int or float
            vector x coordinate

        y : int or float
            vector y coordinate

        angle: int or float
            the angle of rotation applied to the vecftor

        Returns
        -------
        x_rot : float
            new x coordinate with rotation applied

        y_rot : float
            new x coordinate with rotation applied
        '''
        theta = np.radians(angle)

        cs = np.cos(theta)
        sn = np.sin(theta)

        x_rot = (x * cs) - (y * sn)
        y_rot = (x * sn) + (y * cs)

        return x_rot, y_rot

    # Rotate SD1, SD2 vectors 45 degrees counterclockwise and plot

    sd1_x_rot, sd1_y_rot = rotate_vec(0, sd_1, 45)
    sd2_x_rot, sd2_y_rot = rotate_vec(0, sd_2, 45)

    sd1_line = go.Scatter(x=[np.mean(x_plus), np.mean(x_plus) + sd1_x_rot],
                          y=[np.mean(x_minus), np.mean(x_minus) + sd1_y_rot],
                          mode='lines', name='SD1', line=dict(color='blue'))

    sd2_line = go.Scatter(x=[np.mean(x_plus), np.mean(x_plus) - sd2_x_rot],
                          y=[np.mean(x_minus), np.mean(x_minus) + sd2_y_rot],
                          mode='lines', name='SD2', line=dict(color='red'))

    # Ellipse
    rotation_matrix = R.from_euler('z', 45, degrees=True).as_matrix()[0:2, 0:2]
    e_xs = sd_2 * np.cos(np.linspace(0, 2 * np.pi, 100))
    e_ys = sd_1 * np.sin(np.linspace(0, 2 * np.pi, 100))
    rotated = np.dot(rotation_matrix, np.vstack([e_xs, e_ys]))
    # make e_xs and e_ys rotate 45 degrees
    ellipse = go.Scatter(x=rotated[0] + x_mn,
                         y=rotated[1] + y_mn,
                         mode='lines', name='Ellipse',
                         line=dict(color='black'))
    
    # Layout
    layout = go.Layout(title=title,
                       xaxis_title='RRi[n] (ms)',
                       yaxis_title='RRi[n+1] (ms)',
                       showlegend=True,
                       legend=dict(x=1, y=1, bgcolor='rgba(255,255,255,0.5)'),
                       xaxis=dict(showline=True, linewidth=2, linecolor='black', mirror=True),
                       yaxis=dict(showline=True, linewidth=2, linecolor='black', mirror=True),
                       plot_bgcolor='white')
    fig = go.Figure(data=[rr, identity, sd1_line, sd2_line, ellipse], layout=layout)
    xlim_max = np.percentile(x_plus, 90) + 100
    xlim_min = np.percentile(x_plus, 10) - 100
    ylim_max = np.percentile(x_minus, 90) + 100
    ylim_min = np.percentile(x_minus, 10) - 100
    fig.update_layout(xaxis_range=[xlim_min, xlim_max], yaxis_range=[ylim_min, ylim_max])
    fig.update_layout(yaxis_scaleanchor="x", width=500, height=500)
    return fig
    
poincare = calc_poincare(rr.rr[rr_bool_quotient])
fig = plot_poincare(poincare)
fig.show()

In [None]:
@typechecked
def bpm_by_rr(rr: Float[NDArray, "..."]) -> float:
    return float(60 / (np.mean(rr)/1000))

In [None]:
from pprint import pprint

@dataclass
class TimeDomainHRV:
    bpm: number
    # i.e. average of the RR intervals
    ibi: number
    # NN interval is RR interval
    # sdnn (standard deviation of NN intervals)
    # Values below 50 ms are considered unhealthy, 50-100 ms indicate
    # compromised health, and above 100 ms suggest a healthy individual
    sdnn: number
    # sdsd (standard deviation of successive differences in interbeat intervals)
    sdsd: number
    rmssd: number
    # Higher pNN50 suggests a more relaxed state
    pnn20: number
    pnn50: number
    # median absolute deviation of RR intervals
    mad: number


def calc_time_domain_hrv(rr: Float[NDArray, "..."]) -> TimeDomainHRV:
    bpm = bpm_by_rr(rr)
    ibi = float(np.mean(rr))
    rr_diff = np.abs(np.diff(rr))
    rr_sqdiff = np.power(rr_diff, 2)

    sdnn = float(np.std(rr))
    sdsd = float(np.std(rr_diff))
    rmssd = np.sqrt(np.mean(rr_sqdiff))

    nn20 = rr_diff[np.where(rr_diff > 20)]
    nn50 = rr_diff[np.where(rr_diff > 50)]
    pnn20 = len(nn20) / len(rr_diff)
    pnn50 = len(nn50) / len(rr_diff)

    def calc_mad(data: Float[NDArray, "..."]) -> float:
        '''computes median absolute deviation

        Function that compute median absolute deviation of data slice
        See: https://en.wikipedia.org/wiki/Median_absolute_deviation

        Parameters
        ----------
        data : 1-dimensional numpy array or list
            sequence containing data over which to compute the MAD

        Returns
        -------
        out : float
            the Median Absolute Deviation as computed
        '''
        med = np.median(data)
        return float(np.median(np.abs(data - med)))

    mad = calc_mad(rr)
    return TimeDomainHRV(bpm, ibi, sdnn, sdsd, rmssd, 
                         pnn20, pnn50, mad)


time_domain_hrv = calc_time_domain_hrv(rr.rr[(rr_bool & rr_bool_quotient)])

pprint(time_domain_hrv)
pprint({"sd1": poincare.sd_1, "sd2": poincare.sd_2, "s": poincare.s, "sd1/sd2": poincare.ratio})

In [None]:
# calc_freq: whether to calculate frequency domain measures
# interp_threshold: the amplitude threshold beyond which will be checked for
# clipping. Recommended is to take this as the maximum value of the ADC with
# some margin for signal noise
# reject_segmentwise: whether to reject segments with more than 30% rejected
# beats. By default looks at segments of 10 beats at a time.

# clean_rr uses by default quotient-filtering, which is a bit aggressive.
# You can set 'iqr' or 'z-score' with the clean_rr_method flag.
from typing import Literal

CLEAN_RR_METHOD = Literal["quotient-filter", "iqr", "z-score"]
clean_rr_method: CLEAN_RR_METHOD = "quotient-filter"
working, measures = hp.process(
    filtered_mat,
    sample_rate=SAMPLE_RATE,
    interp_clipping=False,
    clean_rr=False,
    clean_rr_method=clean_rr_method,
)

# Take into consideration that the scale for RMSSD doesn't typically exceed +/-
# 130, SDSD doesn't differ by much. This means that even a few incorrectly
# detected peaks are already introducing large measurement errors into the output
# variables. The algorithm described here is specifically designed to handle noisy
# PPG data from cheap sensors. The main design criteria was to minimise the number
# of incorrectly placed peaks as to minimise the error introduced into the output
# measures.

display(measures)
hp.plotter(working, measures, figsize=(24, 6), moving_average=True)

In [None]:
hp.plot_breathing(working, measures, figsize=(18, 4))

In [None]:
hp.plot_poincare(working, measures, figsize=(4, 4))