In [None]:
import sys
import pickle
import matplotlib.pyplot as plt
import numpy as np
from scipy.signal import butter, filtfilt, correlate, find_peaks
from scipy.fft import fft, fftfreq
import pandas as pd


SAMPLING_RATE = 64 #Hz
WINDOW_SIZE = 10
LOW_FREQ = 0.5 #Hz
HIGH_FREQ = 5 #Hz

In [None]:
def bandpass_filter(signal_data: np.ndarray, low_freq: float, high_freq: float, sampling_rate:int) -> np.ndarray: 
    """
    Filter frequencies out of [low_freq, high_freq].
        args:
            signal_data: 
                data in time domain
            low_freq:
                low frequency
            high_freq:
                high frequency
            sampling_rate:
                sampling rate
    """
    nyquist_freq = 0.5 * sampling_rate
    low = low_freq / nyquist_freq
    high = high_freq / nyquist_freq
    order = 4  # Filter order
    b, a = butter(order, [low, high], btype='band')
    filtered_signal = filtfilt(b, a, signal_data)
    return filtered_signal

In [None]:
hr_history: list[float] = []
def get_hr_from_bvp(bvp: np.ndarray, reset=False, plot=False) -> float:
    global hr_history
    # print(hr_history)
    if reset:
        hr_history = []
    filtered_bvp_signal = bandpass_filter(bvp, LOW_FREQ, HIGH_FREQ, sampling_rate=SAMPLING_RATE)
    r = correlate(filtered_bvp_signal, filtered_bvp_signal)
    r = r[len(r)//2:]
    peaks,_ = find_peaks(r, prominence=r[0]/4)
    def b(peaks):
        return 60/((peaks[1]-peaks[0])/64)
    MIN_HR, MAX_HR, FALLBACK_HR = 55, 160, 60
    # Edge cases
    if len(peaks) < 2 or b(peaks) < MIN_HR or b(peaks) > MAX_HR:
        if len(hr_history) == 0:
            return FALLBACK_HR
        else:
            # Returns latest HR
            return hr_history[-1]
    hr = b(peaks)
    # Updating history by removing last HR and adding newest HR
    if len(hr_history) > 2:
        hr_history.pop(0)
    hr_history.append(hr)
    w = np.array([i+1 for i in range(len(hr_history))])
    # Update newest HR by weighted average of history
    hr_history[-1] = sum([hr_history[i]*w[i] for i in range(len(hr_history))])/sum(w)
    if plot:
        plt.plot(r)
        plt.scatter(peaks, r[peaks], color='r')
        plt.show()
    
    return hr_history[-1]

In [None]:
all_bvp = np.load('./../../recordings/empatica/may_15min_run_metronome/bvp.pkl', allow_pickle=True)


In [None]:
START_WINDOW_IDX = 1000
WIN_IDX = 8
reset = False
bvp = all_bvp[START_WINDOW_IDX+640*WIN_IDX:START_WINDOW_IDX+640*(WIN_IDX+1)]
plt.plot(bvp)

In [None]:
# reshape bvp signal to non overllaping windows 10 sec length
bvp_cutted = bvp[:len(bvp)- len(bvp)%(SAMPLING_RATE*WINDOW_SIZE)]
bvp_reshaped = bvp_cutted.reshape(len(bvp_cutted)//(WINDOW_SIZE*SAMPLING_RATE), WINDOW_SIZE*SAMPLING_RATE)

# print 10 bvp windows, in each windows we estimate the HR
for i in range(bvp_reshaped.shape[0]):
    # plt.figure()
    # plt.plot(bvp_reshaped[i])
    # plt.title(f'BVP in window[{i}] | HR={get_hr_from_bvp(bvp_reshaped[i]/2)}')
    print(f"window[{i}] | HR={get_hr_from_bvp(bvp_reshaped[i], reset=reset, plot=True)}")

In [None]:
hr_history