In [None]:
import os
import copy
import scipy
import pickle
import numpy as np
import pandas as pd
import heartpy as hp
import matplotlib.pyplot as plt
from tqdm import tqdm
from scipy import signal as sig
from scipy import sparse as sp
from heartpy.datautils import rolling_mean
# Set matplotlib font size
plt.rcParams.update({'font.size': 12})

In [None]:
def calcMetrics(input_data, sampFreq, B, window_size=None, use_harmonic=False, normalize=False):
    # define goodness parameters
    B1 = 0.75 # low cutoff
    B2 = 3 # high cutoff

    if window_size is None:
        pulseCalcLength = len(input_data)
    else:
        pulseCalcLength = np.uint32(window_size/(1.0/sampFreq))
    pulseRate_result = np.empty(len(input_data)-pulseCalcLength + 1)
    goodnessMetric_result = np.empty(len(input_data)-pulseCalcLength + 1)

    for i in range(0, len(pulseRate_result)):
        window = input_data[i:i+pulseCalcLength]
        # sigFreq, sigPower = sig.periodogram(window, fs=sampFreq, nfft=180000)
        sigFreq, sigPower = sig.welch(x=window, nperseg=len(window)//3, fs=sampFreq, nfft=180000)
        maskFreq = (sigFreq > B1)&(sigFreq<B2)
        # sigFreq = sigFreq[maskFreq]
        sigPower = sigPower * maskFreq
        if use_harmonic:
            # harmonic PSD
            harmonicsigPower = sigPower[::2]
            harmonicsigPower = np.pad(harmonicsigPower, (0, len(sigPower) - len(harmonicsigPower)))
            # find peak frequency
            peakFreq = sigFreq[np.argmax(sigPower + harmonicsigPower)]
        else:
            peakFreq = sigFreq[np.argmax(sigPower)]
        pulseRate = 60.0*peakFreq
        pulseRate_result[i] = pulseRate
        # compute goodness
        aroundPulseRate = (sigFreq > peakFreq - B) & (sigFreq < peakFreq + B)
        withinBandpass = (sigFreq >= B1) & (sigFreq <= B2)
        powerPulseRate = np.sum(sigPower[aroundPulseRate])
        powerAll = np.sum(sigPower[withinBandpass])
        if normalize:
            goodnessMetric_result[i] = powerPulseRate / powerAll
        else:
            goodnessMetric_result[i] = powerPulseRate / (powerAll - powerPulseRate)
    timestamps = np.arange(len(pulseRate_result))
    return timestamps, goodnessMetric_result, pulseRate_result

def custom_detrend(sig, Lambda):
    """custom_detrend(sig, Lambda) -> filtered_signal
    This function applies a detrending filter.
    This code is based on the following article "An advanced detrending method with application
    to HRV analysis". Tarvainen et al., IEEE Trans on Biomedical Engineering, 2002.
    *Parameters*
      ``sig`` (1d numpy array):
        The sig where you want to remove the trend.
      ``Lambda`` (int):
        The smoothing parameter.
    *Returns*
      ``filtered_signal`` (1d numpy array):
        The detrended sig.
    """
    signal_length = sig.shape[0]

    # observation matrix
    H = np.identity(signal_length)

    # second-order difference matrix

    ones = np.ones(signal_length)
    minus_twos = -2 * np.ones(signal_length)
    diags_data = np.array([ones, minus_twos, ones])
    diags_index = np.array([0, 1, 2])
    D = sp.spdiags(diags_data, diags_index, (signal_length - 2), signal_length).toarray()
    filtered_signal = np.dot((H - np.linalg.inv(H + (Lambda ** 2) * np.dot(D.T, D))), sig)
    return filtered_signal

def pulse_rate_from_power_spectral_density(pleth_sig: np.array, FS: float,
                                           LL_PR: float, UL_PR: float,
                                           BUTTER_ORDER: int = 6,
                                           DETREND: bool = False,
                                           FResBPM: float = 0.1,
                                           HARMONIC: bool = False,
                                           WELCH = True) -> float:
    """ Function to estimate the pulse rate from the power spectral density of the plethysmography sig.

    Args:
        pleth_sig (np.array): Plethysmography sig.
        FS (float): Sampling frequency.
        LL_PR (float): Lower cutoff frequency for the butterworth filtering.
        UL_PR (float): Upper cutoff frequency for the butterworth filtering.
        BUTTER_ORDER (int, optional): Order of the butterworth filter. Give None to skip filtering. Defaults to 6.
        DETREND (bool, optional): Boolena Flag for executing cutsom_detrend. Defaults to False.
        FResBPM (float, optional): Frequency resolution. Defaults to 0.1.

    Returns:
        pulse_rate (float): _description_
    

    Daniel McDuff, Ethan Blackford, January 2019
    Copyright (c)
    Licensed under the MIT License and the RAIL AI License.
    """

    N = (60*FS)/FResBPM

    # Detrending + nth order butterworth + periodogram
    if DETREND:
        pleth_sig = custom_detrend(pleth_sig, 100)
    if BUTTER_ORDER:
        [b, a] = sig.butter(BUTTER_ORDER, [LL_PR/60, UL_PR/60], btype='bandpass', fs = FS)
    pleth_sig = sig.filtfilt(b, a, np.double(pleth_sig))
    
    # Calculate the PSD and the mask for the desired range
    if WELCH:
        F, Pxx = sig.welch(x=pleth_sig, nperseg=len(pleth_sig)//3, nfft=N, fs=FS)
    else:
        F, Pxx = sig.periodogram(x=pleth_sig,  nfft=N, fs=FS);  
    FMask = (F >= (LL_PR/60)) & (F <= (UL_PR/60))
    
    # Calculate predicted pulse rate:
    FRange = F * FMask
    PRange = Pxx * FMask

    if HARMONIC:
      harmonicsigPower = PRange[::2]
      harmonicsigPower = np.pad(harmonicsigPower, (0, len(PRange) - len(harmonicsigPower)))
      harmonicsigPower[0] = 0
      MaxInd = np.argmax(PRange+harmonicsigPower)
    else:
      MaxInd = np.argmax(PRange)
    pulse_rate_freq = FRange[MaxInd]
    pulse_rate = pulse_rate_freq*60
            
    return pulse_rate

def get_error_metric(pred_values, gt_values):
    """
    Calculate the error metric between predicted and ground truth values.
    """
    # Calculate the mean absolute error
    mae = np.mean(np.abs(pred_values - gt_values))
    # Calculate the root mean squared error
    rmse = np.sqrt(np.mean(np.square(np.abs(pred_values - gt_values))))
    # Calculate the mean absolute percentage error
    mape = np.mean(np.abs((pred_values - gt_values) / gt_values)) * 100
    # Calculate pearson correlation coefficient
    r = np.corrcoef(pred_values, gt_values)[0, 1]
    return mae, rmse, mape, r

def get_ibi(pred, gt, fs=30, smooth_window=7):
    if smooth_window:
        pred = np.convolve(pred, np.ones((smooth_window))/smooth_window, mode='same')
        gt = np.convolve(gt, np.ones((smooth_window))/smooth_window, mode='same')
    rol_mean = rolling_mean(pred, windowsize=1, sample_rate = fs)
    wd = hp.peakdetection.detect_peaks(pred, rol_mean, ma_perc = 20, sample_rate = fs)
    pred_peaks = wd['peaklist']
    # plt.figure(figsize=(12, 6))
    # plt.plot(pred, label='Filtered and Smoothed rPPG Signal', color='b')
    # plt.plot(pred_peaks, pred[pred_peaks], 'rx', label='Detected Peaks', markersize=10)
    # plt.legend()
    # plt.grid(True)

    rol_mean = rolling_mean(gt, windowsize=1, sample_rate = fs)
    wd = hp.peakdetection.detect_peaks(gt, rol_mean, ma_perc = 20, sample_rate = fs)
    gt_peaks = wd['peaklist']
    # plt.figure(figsize=(12, 6))
    # plt.plot(gt, label='Filtered and Smoothed rPPG Signal', color='b')
    # plt.plot(gt_peaks, gt[gt_peaks], 'rx', label='Detected Peaks', markersize=10)
    # plt.legend()
    # plt.grid(True)
    pred_ibi = np.diff(pred_peaks)
    gt_ibi = np.diff(gt_peaks)
    return pred_ibi, gt_ibi

def get_filtered_ibi(pred, gt, fs=30, smooth_window=7):
    pred_ibi, gt_ibi = get_ibi(pred, gt, fs=fs, smooth_window=smooth_window)
    # Filter IBI (read the code above)
    pred_q1 = np.percentile(pred_ibi, 25)
    pred_q2 = np.percentile(pred_ibi, 50)
    pred_q3 = np.percentile(pred_ibi, 75)
    pred_iqr = pred_q3 - pred_q1
    pred_lower_bound = pred_q1 - 1.5 * pred_iqr
    pred_upper_bound = pred_q3 + 1.5 * pred_iqr
    pred_filtered_ibi = [x for x in pred_ibi if pred_lower_bound <= x <= pred_upper_bound]
    gt_q1 = np.percentile(gt_ibi, 25)
    gt_q2 = np.percentile(gt_ibi, 50)
    gt_q3 = np.percentile(gt_ibi, 75)
    gt_iqr = gt_q3 - gt_q1
    gt_lower_bound = gt_q1 - 1.5 * gt_iqr
    gt_upper_bound = gt_q3 + 1.5 * gt_iqr
    gt_filtered_ibi = [x for x in gt_ibi if gt_lower_bound <= x <= gt_upper_bound]
    # Sample to secs
    pred_filtered_ibi = np.array(pred_filtered_ibi) * (1/fs)
    gt_filtered_ibi = np.array(gt_filtered_ibi) * (1/fs)
    return pred_filtered_ibi, gt_filtered_ibi

In [None]:
waveform_file = "waveforms/rppg/test_fusion/pred.pickle"
with open(waveform_file, 'rb') as f:
    data = pickle.load(f)
data.keys(), data['participant_task_chunk_id_list']
all_pred = data['pred']
all_gt = data['gt']
all_participant_task_chunk_list = data['participant_task_chunk_id_list']

In [None]:
metadata_file = "metadata.csv"
metadata = pd.read_csv(metadata_file)
metadata['Int Skin Tone'] = metadata['Skin Tone'].replace({'fp1':1,'fp2':2,'fp3':3,'fp4':4,'fp5':5,'fp6':6})
# Create a dictionary for participant and skin tone
participant_numpy = metadata['Participant'].to_numpy()
int_skin_tone_numpy = metadata['Int Skin Tone'].to_numpy()
fitzpatrick_dict = {par: ist for par, ist in zip(participant_numpy, int_skin_tone_numpy)}
print(len(fitzpatrick_dict), fitzpatrick_dict)

In [None]:
fs = 30
ll_cutoff = 40
ul_cutoff = 180
all_pred_hr = []
all_gt_hr = []
all_pred_ibi = []
all_gt_ibi = []
all_goodness = []
for pred, gt, (participant_task, chunk_id_list) in zip(all_pred, all_gt, all_participant_task_chunk_list):
    # Norm
    pred = (pred - np.mean(pred)) / np.std(pred)
    gt = (gt - np.mean(gt)) / np.std(gt)
    # Detrend and Goodness
    goodness = calcMetrics(pred, fs, 0.1, normalize=True)[1][0]
    # 1-D gauss blur
    pred = custom_detrend(pred, 100)
    gt = custom_detrend(gt, 100)
    pred = np.convolve(pred, np.ones((7))/7, mode='same')
    gt = np.convolve(gt, np.ones((7))/7, mode='same')
    # Norm
    pred = (pred - np.mean(pred)) / np.std(pred)
    gt = (gt - np.mean(gt)) / np.std(gt)
    # HR
    pred_hr = pulse_rate_from_power_spectral_density(pred, fs, ll_cutoff, ul_cutoff, BUTTER_ORDER=6, 
                                                        DETREND=False, WELCH=True)
    gt_hr = pulse_rate_from_power_spectral_density(gt, fs, ll_cutoff, ul_cutoff, BUTTER_ORDER=6, 
                                                        DETREND=False, WELCH=True)
    all_pred_hr.append(pred_hr)
    all_gt_hr.append(gt_hr)
    all_goodness.append(goodness)
    # IBI
    pred_ibi, gt_ibi = get_filtered_ibi(pred, gt, fs=fs, smooth_window=7)
    all_pred_ibi.append(np.mean(pred_ibi))
    all_gt_ibi.append(np.mean(gt_ibi))
    pred_hr_ibi = 60 / np.mean(pred_ibi)
    gt_hr_ibi = 60 / np.mean(gt_ibi)
all_pred_hr = np.array(all_pred_hr)
all_gt_hr = np.array(all_gt_hr)
all_goodness = np.array(all_goodness)
all_pred_ibi = np.array(all_pred_ibi)
all_gt_ibi = np.array(all_gt_ibi)

In [None]:
all_errors = np.abs(all_pred_hr - all_gt_hr)
# MAE, RMSE, MAPE, r
mae, rmse, mape, r = get_error_metric(all_pred_hr, all_gt_hr)
ibi_error = np.mean(np.abs(all_pred_ibi - all_gt_ibi)) * 1000
print("MAE, RMSE, MAPE, r, IBI")
print(np.round(mae,2),"&",np.round(rmse,2),"&",np.round(mape,2),"&",np.round(r,2),"&",np.round(ibi_error,2),r"\\")

In [None]:
fp_pred_dict = {1:[], 2:[], 3:[], 4:[], 5:[], 6:[]}
fp_gt_dict = {1:[], 2:[], 3:[], 4:[], 5:[], 6:[]}
fp_ibi_pred_dict = {1:[], 2:[], 3:[], 4:[], 5:[], 6:[]}
fp_ibi_gt_dict = {1:[], 2:[], 3:[], 4:[], 5:[], 6:[]}
for pred, gt, pred_ibi , gt_ibi, (participant_task, chunk_id_list) in \
        zip(all_pred_hr, all_gt_hr, all_pred_ibi, all_gt_ibi, all_participant_task_chunk_list):
    participant = participant_task.split("_")[0]
    skin_tone = fitzpatrick_dict[participant]
    fp_pred_dict[skin_tone].append(pred)
    fp_gt_dict[skin_tone].append(gt)
    fp_ibi_pred_dict[skin_tone].append(pred_ibi)
    fp_ibi_gt_dict[skin_tone].append(gt_ibi)
# Calculate the error metrics for each skin tone
# Combine (1,2) & (3,4) & (5,6)
lmd_pred_dict = {'l':[], 'm':[], 'd':[]}
lmd_gt_dict = {'l':[], 'm':[], 'd':[]}
lmd_ibi_pred_dict = {'l':[], 'm':[], 'd':[]}
lmd_ibi_gt_dict = {'l':[], 'm':[], 'd':[]}
for key in fp_pred_dict.keys():
    if key == 1 or key == 2:
        lmd_pred_dict['l'] += fp_pred_dict[key]
        lmd_gt_dict['l'] += fp_gt_dict[key]
        lmd_ibi_pred_dict['l'] += fp_ibi_pred_dict[key]
        lmd_ibi_gt_dict['l'] += fp_ibi_gt_dict[key]
    elif key == 3 or key == 4:
        lmd_pred_dict['m'] += fp_pred_dict[key]
        lmd_gt_dict['m'] += fp_gt_dict[key]
        lmd_ibi_pred_dict['m'] += fp_ibi_pred_dict[key]
        lmd_ibi_gt_dict['m'] += fp_ibi_gt_dict[key]
    elif key == 5 or key == 6:
        lmd_pred_dict['d'] += fp_pred_dict[key]
        lmd_gt_dict['d'] += fp_gt_dict[key]
        lmd_ibi_pred_dict['d'] += fp_ibi_pred_dict[key]
        lmd_ibi_gt_dict['d'] += fp_ibi_gt_dict[key]
# Calculate the error metrics for each skin tone
print()
print("Skin Tone: MAE, RMSE, MAPE, r, IBI")
l_mae, l_rmse, l_mape, l_r = get_error_metric(np.array(lmd_pred_dict['l']), np.array(lmd_gt_dict['l']))
l_ibi = np.mean(np.abs(np.array(lmd_ibi_pred_dict['l']) - np.array(lmd_ibi_gt_dict['l']))) * 1000
print("Light:", np.round(l_mae,2),"&",np.round(l_rmse,2),"&",np.round(l_mape,2),"&",np.round(l_r,2),"&",np.round(l_ibi,2),r"\\")
m_mae, m_rmse, m_mape, m_r = get_error_metric(np.array(lmd_pred_dict['m']), np.array(lmd_gt_dict['m']))
m_ibi = np.mean(np.abs(np.array(lmd_ibi_pred_dict['m']) - np.array(lmd_ibi_gt_dict['m']))) * 1000
print("Medium:", np.round(m_mae,2),"&",np.round(m_rmse,2),"&",np.round(m_mape,2),"&",np.round(m_r,2),"&",np.round(m_ibi,2),r"\\")
d_mae, d_rmse, d_mape, d_r = get_error_metric(np.array(lmd_pred_dict['d']), np.array(lmd_gt_dict['d']))
d_ibi = np.mean(np.abs(np.array(lmd_ibi_pred_dict['d']) - np.array(lmd_ibi_gt_dict['d']))) * 1000
print("Dark:", np.round(d_mae,2),"&",np.round(d_rmse,2),"&",np.round(d_mape,2),"&",np.round(d_r,2),"&",np.round(d_ibi,2),r"\\")
print()
# Check if mae and the weighted avg of the three skin tones are equal
weighted_mae = (l_mae*len(lmd_pred_dict['l']) + m_mae*len(lmd_pred_dict['m']) 
                + d_mae*len(lmd_pred_dict['d'])) / (len(lmd_pred_dict['l']) 
                                                    + len(lmd_pred_dict['m']) + len(lmd_pred_dict['d']))
weighted_ibi = (l_ibi*len(lmd_pred_dict['l']) + m_ibi*len(lmd_pred_dict['m'])
                + d_ibi*len(lmd_pred_dict['d'])) / (len(lmd_pred_dict['l']) 
                                                    + len(lmd_pred_dict['m']) + len(lmd_pred_dict['d']))
assert np.isclose(mae, weighted_mae), f"{mae} != {weighted_mae}"
assert np.isclose(ibi_error, weighted_ibi), f"{ibi_error} != {weighted_ibi}"
print("-"*100, flush=True)