In [None]:
import numpy as np
from scipy.signal import butter, filtfilt, find_peaks, hilbert, detrend, welch
import tqdm


def estimate_hr(signal, fs):
    b1, a1 = butter(2, [0.8/(fs/2), 3.0/(fs/2)], btype='band')
    heart_sig = filtfilt(b1, a1, signal)
    peaks, _ = find_peaks(heart_sig, distance=fs*0.4)
    heart_rate = len(peaks) * (60 / 10)
    return heart_rate

def estimate_rr(signal, fs):
    analytic = hilbert(signal)
    envelope = np.abs(analytic)
    b2, a2 = butter(2, [0.1/(fs/2), 0.5/(fs/2)], btype='band')
    resp_sig = filtfilt(b2, a2, envelope)
    rpeaks, _ = find_peaks(resp_sig, distance=fs*2)
    resp_rate = len(rpeaks) * (60 / 10)
    return resp_rate

In [2]:
def calculate_mae(hr_pred_list, hr_label_list, rr_pred_list, rr_label_list):
    hr_error = np.abs(hr_pred_list - hr_label_list)
    rr_error = np.abs(rr_pred_list - rr_label_list)
    return np.mean(hr_error), np.mean(rr_error)


def calculate_rmse(hr_pred_list, hr_label_list, rr_pred_list, rr_label_list):
    hr_error = np.abs(hr_pred_list - hr_label_list)
    rr_error = np.abs(rr_pred_list - rr_label_list)
    return np.sqrt(np.mean(hr_error**2)), np.sqrt(np.mean(rr_error**2))


def calculate_std(hr_pred_list, hr_label_list, rr_pred_list, rr_label_list):
    hr_error = np.abs(hr_pred_list - hr_label_list)
    rr_error = np.abs(rr_pred_list - rr_label_list)
    return np.std(hr_error), np.std(rr_error)

def calculate_rmse(hr_pred_list, hr_label_list, rr_pred_list, rr_label_list):
    hr_error = np.abs(hr_pred_list - hr_label_list)
    rr_error = np.abs(rr_pred_list - rr_label_list)
    return np.sqrt(np.mean(hr_error**2)), np.sqrt(np.mean(rr_error**2))


def calculate_mare(hr_pred_list, hr_label_list, rr_pred_list, rr_label_list):
    # MARE = (1/n) Σ |( Tᵢ - Rᵢ) / Rᵢ|
    hr_error = np.abs(hr_pred_list - hr_label_list)
    rr_error = np.abs(rr_pred_list - rr_label_list)
    return np.mean(hr_error / hr_label_list), np.mean(rr_error / rr_label_list)



In [3]:
data = np.load("/home/ghosn/Project/csee8300_3/data/dataset_constant_ibi_constant_wa.npy")
# data = (10 seconds * 100Hz) + Time + HR + RR + SV + TPR + MAP. 
hr_pred_list = []
hr_label_list = []
rr_pred_list = []
rr_label_list = []

# for i in range(len(data)):
from tqdm import tqdm
for i in tqdm(range(len(data))):
    signal = data[i, :]
    fs = 100
    # result = estimate_hr_rr(signal, fs)
    # hr_pred = result["heart_rate_bpm"]
    # rr_pred = result["resp_rate_rpm"]
    hr_pred = estimate_hr(signal, fs)
    rr_pred = estimate_rr(signal, fs)
    hr_pred_list.append(hr_pred)
    hr_label_list.append(data[i, -5])
    rr_pred_list.append(rr_pred)
    rr_label_list.append(data[i, -4])


100%|██████████| 20000/20000 [00:05<00:00, 3689.27it/s]


In [4]:

# Convert lists to numpy arrays before calculation to avoid TypeError
hr_pred_arr = np.array(hr_pred_list)
hr_label_arr = np.array(hr_label_list)
rr_pred_arr = np.array(rr_pred_list)
rr_label_arr = np.array(rr_label_list)

mae_hr, mae_rr = calculate_mae(hr_pred_arr, hr_label_arr, rr_pred_arr, rr_label_arr)
rmse_hr, rmse_rr = calculate_rmse(hr_pred_arr, hr_label_arr, rr_pred_arr, rr_label_arr)
std_hr, std_rr = calculate_std(hr_pred_arr, hr_label_arr, rr_pred_arr, rr_label_arr)
mape_hr, mape_rr = calculate_mare(hr_pred_arr, hr_label_arr, rr_pred_arr, rr_label_arr)

print(f"MAE_HR: {mae_hr}, RMSE_HR: {rmse_hr}, STD_HR: {std_hr}, MAPE_HR: {mape_hr}")
print(f"MAE_RR: {mae_rr}, RMSE_RR: {rmse_rr}, STD_RR: {std_rr}, MAPE_RR: {mape_rr}")


MAE_HR: 15.23335, RMSE_HR: 16.890282709297676, STD_HR: 7.295662942975093, MAPE_HR: 0.2049928846012694
MAE_RR: 3.9974, RMSE_RR: 4.7586762865317915, STD_RR: 2.5818197535846688, MAPE_RR: 0.22939070552512503
