---
# **PyVHR (rppg to bvp)**
---

In [None]:
# -- MAIN IMPORT

import pyVHR as vhr
import numpy as np
import os

# Plotting: set 'colab' for Google Colaboratory, 'notebook' otherwise
vhr.plot.VisualizeParams.renderer = 'notebook'  # or 'notebook'

In [None]:
# -- LOAD A DATASET

dataset_name = 'PURE'                   # the name of the python class handling it
video_DIR = 'C:/Users/Eddie/rppg/downloaded_PURE'  # dir containing videos
BVP_DIR = 'C:/Users/Eddie/rppg/downloaded_PURE'    # dir containing BVPs GT

dataset = vhr.datasets.datasetFactory(dataset_name, videodataDIR=video_DIR, BVPdataDIR=BVP_DIR)
allvideo = dataset.videoFilenames

# print the list of video names with the progressive index (idx)
for v in range(len(allvideo)):
  print(v, allvideo[v])

In [None]:
# -- PARAMETER SETTING

wsize = 6           # seconds of video processed (with overlapping) for each estimate
video_idx = 0      # index of the video to be processed
fname = dataset.getSigFilename(video_idx)
sigGT = dataset.readSigfile(fname)
bpmGT, timesGT = sigGT.getBPM(wsize)
videoFileName = dataset.getVideoFilename(video_idx)
print('Video processed name: ', videoFileName)
fps = vhr.extraction.get_fps(videoFileName)
print('Video frame rate:     ',fps)

In [None]:
# -- DISPLAY VIDEO FRAMES

vhr.plot.display_video(videoFileName)

In [None]:
# extract raw frames
sp = vhr.extraction.sig_processing.SignalProcessing()
frames = sp.extract_raw(videoFileName)
print('Frames shape:', frames.shape)

# MTTS-CAN

**DeepPhys: Video-Based Physiological Measurement Using Convolutional Attention Networks**<br>
*利用MTTS-CAN卷積神經網路將rppg的訊號轉換成bvp等*<br>
*Weixuan Chen and Daniel McDuff*

papers:
* [DeepPhys: Video-Based Physiological Measurement Using Convolutional Attention Networks](https://web.media.mit.edu/~cvx/docs/18.Chen-etal-ECCV.pdf),
* [Multi-Task Temporal Shift Attention Networks for On-Device Contactless Vitals Measurement
](https://papers.nips.cc/paper/2020/file/e1228be46de6a0234ac22ded31417bc7-Paper.pdf)


In [None]:
# apply MTTS_CAN model
bvp_pred = vhr.deepRPPG.MTTS_CAN_deep(frames, fps, verb=1)
bvps = vhr.BPM.BVPsignal(bvp_pred, fps) # BVP object
vhr.plot.visualize_BVPs([bvps.data], 0)

In [None]:
## -- analysis
from pyVHR.utils.errors import getErrors, printErrors, displayErrors, BVP_windowing

# BVP windowing & BPM estimate
bvp_win, timesES = BVP_windowing(bvp_pred, wsize, fps, stride=1)
bpmES = vhr.BPM.BVP_to_BPM_cuda(bvp_win, fps)  #一開始用非cuda版本會出錯

# compute and print errors
RMSE, MAE, MAX, PCC, CCC, SNR = vhr.utils.getErrors(bvp_win, fps, bpmES, bpmGT, timesES, timesGT)
vhr.utils.printErrors(RMSE, MAE, MAX, PCC, CCC, SNR)
displayErrors(bpmES, bpmGT, timesES, timesGT)


In [None]:
print(bpmES)
print(bpmGT)


In [None]:
print(bvps.data)
print(bvps.data[0])
print(len(bvps.data[0])) #2020筆資料(data point)
print(timesGT) #影片長度共67s
#sample rate計算方法為data_point/time(每秒所採集的數據點數，頻率Hz)

In [None]:
%pip install pyhrv

## 用biosppy將bvp轉成rr 再用pyhrv轉成nn-interval(nn區間資料)

In [None]:
# 導入所需的模組
from biosppy.signals import bvp

# 計算採樣率
num_data_points = len(bvps.data[0])
video_duration = 67  # 視頻持續時間為 67 秒
sampling_rate = num_data_points / video_duration

# 調用 bvp 函數來處理信號
result = bvp.bvp(signal=bvps.data[0], sampling_rate=sampling_rate, show=True)
"""
Process a raw BVP signal and extract relevant signal features using
    default parameters.
    Parameters
    ----------
    signal : array
        Raw BVP signal.
    sampling_rate : int, float, optional
        Sampling frequency (Hz).
    path : str, optional
        If provided, the plot will be saved to the specified file.
    show : bool, optional
        If True, show a summary plot.
    Returns
    -------
    ts : array
        Signal time axis reference (seconds).
    filtered : array
        Filtered BVP signal.
    onsets : array
        Indices of BVP pulse onsets.
    heart_rate_ts : array
        Heart rate time axis reference (seconds).
    heart_rate : array
        Instantaneous heart rate (bpm).
"""

In [None]:
# 從結果對象中提取心跳的時間戳
heart_beat_ts = result['heart_rate_ts']

# 計算相鄰心跳之間的時間間隔，即 RR 間期
rr_intervals = np.diff(heart_beat_ts)

# 打印 RR 間期
print(rr_intervals)

In [None]:
import pyhrv.tools as tools
# pyvhr的時域、頻域分析的input為nn-interval，因此要先將rr轉為nn
# Compute NNI
nni = tools.nn_intervals(rr_intervals)

#時域分析

In [None]:
import matplotlib.pyplot as plt
from pyhrv import time_domain

# 提取時域 HRV 參數
sdnn = time_domain.sdnn(nni)['sdnn']  # 標準差
rmssd = time_domain.rmssd(nni)['rmssd']  # RMSSD
print(sdnn)
print(rmssd)

#頻域分析

In [None]:
import pyhrv.frequency_domain as fd

# Compute the PSD and frequency domain parameters
result = fd.lomb_psd(nni=nni)

# Access peak frequencies using the key 'lomb_peak'
print(result['lomb_peak'])

# HR-CNN

**Visual Heart Rate Estimation with Convolutional Neural Network**

Spetlik, R., Franc, V., Cech, J. and Matas, J. (2018)

See http://cmp.felk.cvut.cz/~spetlrad/ecg-fitness/ for the original paper and the ECG-Fitness dataset.



In [None]:
# apply HR_CNN model
bvp_pred = vhr.deepRPPG.HR_CNN_bvp_pred(frames)
bvps = vhr.BPM.BVPsignal(bvp_pred, fps) # BVP object
vhr.plot.visualize_BVPs([bvps.data], 0)

In [None]:
## -- analysis
from pyVHR.utils.errors import getErrors, printErrors, displayErrors, BVP_windowing

# BVP windowing & BPM estimate
bvp_win, timesES = BVP_windowing(bvp_pred, wsize, fps, stride=1)
bpmES = vhr.BPM.BVP_to_BPM_cuda(bvp_win, fps)

# compute and print errors
RMSE, MAE, MAX, PCC, CCC, SNR = vhr.utils.getErrors(bvp_win, fps, bpmES, bpmGT, timesES, timesGT)
vhr.utils.printErrors(RMSE, MAE, MAX, PCC, CCC, SNR)
displayErrors(bpmES, bpmGT, timesES, timesGT)

# rPPG-Transformer

**Improving CHROM rPPG Estimation Through Multi-patch Analysis**

In [None]:
# import for CHROM pipeline
from pyVHR.plot.visualize import *
from pyVHR.utils.errors import getErrors, printErrors, displayErrors, BVP_windowing

In [None]:
# Test rPPG-Transformer against VIPL-HR dataset

dataset_name = 'PURE'                   # the name of the python class handling it
video_DIR = 'C:/Users/Eddie/rppg/downloaded_PURE'  # dir containing videos
BVP_DIR = 'C:/Users/Eddie/rppg/downloaded_PURE'    # dir containing BVPs GT

dataset = vhr.datasets.datasetFactory(dataset_name, videodataDIR=video_DIR, BVPdataDIR=BVP_DIR)

In [None]:

# -- PARAMETER SETTING

wsize = 10           # seconds of video processed (with overlapping) for each estimate
video_idx = 0     # index of the video to be processed
fname = dataset.getSigFilename(video_idx)
sigGT = dataset.readSigfile(fname)
bpmGT, timesGT = sigGT.getBPM(wsize)
videoFileName = dataset.getVideoFilename(video_idx)
print('Video processed name: ', videoFileName)
fps = vhr.extraction.get_fps(videoFileName)
print('Video frame rate:     ',fps)

In [None]:
sig_extractor = vhr.extraction.SignalProcessing()
sig_extractor.display_cuda_device()
sig_extractor.choose_cuda_device(0)

In [None]:
sig_extractor.set_skin_extractor(vhr.extraction.SkinExtractionConvexHull('GPU'))

In [None]:
# set the number of seconds (0 for all video)
seconds = 0
sig_extractor.set_total_frames(seconds*fps)

In [None]:
vhr.extraction.SkinProcessingParams.RGB_LOW_TH =  0
vhr.extraction.SkinProcessingParams.RGB_HIGH_TH = 240

vhr.extraction.SignalProcessingParams.RGB_LOW_TH = 0
vhr.extraction.SignalProcessingParams.RGB_HIGH_TH = 240

In [None]:
# -- PATCHES EXTRACTION
sig_extractor.set_square_patches_side(80.0)
patch_sig = sig_extractor.extract_patches(videoFileName, "squares", "mean")
print('Size: (#frames, #landmarks, #channels) = ', patch_sig.shape)

In [None]:
# -- INTERACTIVE VISUALIZATION OF PATCHES
visualize_patches_coll = sig_extractor.get_visualize_patches()
print('Number of frames processed: ',len(visualize_patches_coll))
vhr.plot.interactive_image_plot(visualize_patches_coll,1.0)

In [None]:
# WING OF RGB SIGNALS ON PATCHES
windowed_patch_sig, timesES = vhr.extraction.sig_windowing(patch_sig, wsize, 1, fps)
print('Num windows: ',len(windowed_patch_sig))
print('Num channels and window length: ', windowed_patch_sig[0].shape)

In [None]:
# -- PLOT A WINDOW (randomly chosen)
w = np.random.randint(0, len(windowed_patch_sig))  # window number
vhr.plot.visualize_windowed_sig(windowed_patch_sig, w)

In [None]:
# -- APPLY TRESHOLDING ON RGB COLORS (suggested)

filtered_windowed_patch_sig = vhr.BVP.apply_filter(windowed_patch_sig, vhr.BVP.rgb_filter_th, params={'RGB_LOW_TH': 0, 'RGB_HIGH_TH': 230})
print('Num windows: ', len(filtered_windowed_patch_sig))
print('Win size: (#landmarks, #channels, #frames) = ', filtered_windowed_patch_sig[1].shape)

In [None]:
# -- SELECT THE FILTER CASCADE

filtered_windowed_patch_sig = vhr.BVP.apply_filter(filtered_windowed_patch_sig, vhr.BVP.BPfilter, params={'order':6,'minHz':0.75,'maxHz':4.0,'fps':fps})
#filtered_windowed_patch_sig = vhr.BVP.apply_filter(filtered_windowed_patch_sig, vhr.BVP.sg_detrend)
#filtered_windowed_patch_sig = vhr.BVP.apply_filter(filtered_windowed_patch_sig, vhr.BVP.detrend, params={'detLambda':100})
#filtered_windowed_patch_sig = vhr.BVP.apply_filter(filtered_windowed_patch_sig, vhr.BVP.zscore)
#filtered_windowed_patch_sig = vhr.BVP.apply_filter(filtered_windowed_patch_sig, vhr.BVP.zeromean)
print('Num windows: ', len(filtered_windowed_patch_sig))
print('Win size: (#landmarks, #channels, #frames) = ', filtered_windowed_patch_sig[0].shape)

In [None]:
# -- PLOT A WINDOW (randomly chosen)

w = np.random.randint(0, len(windowed_patch_sig))  # window number
vhr.plot.visualize_windowed_sig(filtered_windowed_patch_sig, w)

In [None]:
# -- APPLY A METHOD TO EXTRACT BVP

from pyVHR.BVP import *

#patch_bvps = RGB_sig_to_BVP(filtered_windowed_patch_sig, fps, device_type='cpu', method=cpu_CHROM)
patch_bvps = RGB_sig_to_BVP(filtered_windowed_patch_sig, fps, device_type='cuda', method=cupy_CHROM)
#patch_bvps = RGB_sig_to_BVP(filtered_windowed_patch_sig, fps, device_type='torch', method=torch_CHROM)
#patch_bvps = RGB_sig_to_BVP(filtered_windowed_patch_sig, fps, device_type='cuda', method=cupy_POS, params={'fps':fps})
#patch_bvps = RGB_sig_to_BVP(filtered_windowed_patch_sig, fps, device_type='cpu', method=cpu_POS, params={'fps':fps})
#patch_bvps = RGB_sig_to_BVP(filtered_windowed_patch_sig, fps, device_type='cpu', method=cpu_LGI)
#patch_bvps = RGB_sig_to_BVP(filtered_windowed_patch_sig, fps, device_type='cpu', method=cpu_GREEN)
#patch_bvps = RGB_sig_to_BVP(filtered_windowed_patch_sig, fps, device_type='cpu', method=cpu_ICA, params={'component':'second_comp'})

print('Number of windows: ', len(patch_bvps))
print('Number of estimators and number of number of frames in a windows: ', patch_bvps[0].shape)

In [None]:
# -- PLOT A WINDOW (randomly chosen)
w = np.random.randint(0, len(windowed_patch_sig))  # window number
vhr.plot.visualize_BVPs(patch_bvps, w)

In [None]:
# -- APPLY BPFILTER TO BVP WINDOWED PATCHES

patch_bvps = vhr.BVP.apply_filter(patch_bvps, BPfilter, params={'order':6,'minHz':0.75,'maxHz':4.0,'fps':fps})
print('Num windows: ', len(patch_bvps))
print('Win size: (#landmarks, #frames) = ', patch_bvps[0].shape)

In [None]:
# -- PLOT A WINDOW (randomly chosen)

w = np.random.randint(0, len(windowed_patch_sig))  # window number
vhr.plot.visualize_BVPs(patch_bvps, w)

In [None]:
wind = np.random.randint(0,len(patch_bvps))  # window number
vhr.plot.visualize_BVPs_PSD(patch_bvps, wind, fps, maxHz=10)

In [None]:
# -- BPM ESTIMATION BY PATCHES
#patch_bpmES = vhr.BPM.BVP_to_BPM(patch_bvps, fps)          # CPU version

patch_bpmES = vhr.BPM.BVP_to_BPM_cuda(patch_bvps, fps)    # CUDA version

In [None]:
# -- MEDIANS OF BPMS

patch_median_bpmES, MAD = vhr.BPM.BPM_median(patch_bpmES)

In [None]:
# -- VISUALIZE ALL BPMs AND MEDIANS
vhr.plot.visualize_multi_est_BPM_vs_BPMs_list([patch_bpmES, timesES], [[patch_median_bpmES, timesES, "medianES"],[bpmGT, timesGT, "GT"]])

In [None]:
# -- PRINT ERRORS USING METRICS: RMSE, MAE, MAX, PCC, CCC, SNR

from pyVHR.utils.errors import getErrors, printErrors, displayErrors

RMSE, MAE, MAX, PCC, CCC, SNR = getErrors(patch_bvps, fps, patch_median_bpmES, bpmGT, timesES, timesGT)
printErrors(RMSE, MAE, MAX, PCC, CCC, SNR)
displayErrors(patch_median_bpmES, bpmGT, timesES, timesGT)

In [None]:
from pyVHR.deepRPPG import transformer

patch_bvps = np.array(patch_bvps)
#bvp_pred = vhr.deepRPPG.RPPG_TRANSFORMER_bvp_pred(patch_bvps[:,0:104,:])
bvp_pred = transformer.RPPG_TRANSFORMER_bvp_pred(patch_bvps[:,0:104,:])

In [None]:
bvps = vhr.BPM.BVPsignal(bvp_pred, fps)
vhr.plot.visualize_BVPs([bvps.data], 0)

In [None]:
## -- analysis
from pyVHR.utils.errors import getErrors, printErrors, displayErrors, BVP_windowing



# BVP windowing & BPM estimate
bvp_win, timesES = BVP_windowing(bvp_pred, wsize, fps, stride=1)

bpmES = vhr.BPM.BVP_to_BPM(bvp_win, fps)

In [None]:
# resampling

bvp_win = [bvp for bvp,t in zip(bvp_win, timesES) if t in set(list(timesGT)) & set(list(timesES))]
bpmGT = np.array([bpm for bpm,t in zip(bpmGT, timesGT) if t in set(list(timesGT)) & set(list(timesES))])
bpmES = np.array([bpm for bpm,t in zip(bpmES, timesES) if t in set(list(timesGT)) & set(list(timesES))])
timesGT = np.array([t for t in timesGT if t in set(timesGT) & set(list(timesES))])
timesES = np.array([t for t in timesES if t in set(timesGT) & set(list(timesES))])

In [None]:
# filter out bad GT frames

bvp_win_filt = [bvp for bvp, bpm in zip(bvp_win, bpmGT) if bpm <= 150]
bpmGT_filt = [bpm for bpm in bpmGT if bpm <= 150]
bpmES_filt = [bpm for bpm_gt, bpm  in zip(bpmGT, bpmES) if bpm_gt <= 150]
timesGT_filt =  [t for t, bpm_gt in zip(timesGT, bpmGT) if bpm_gt <= 150]
timesES_filt = [t for t, bpm_gt  in zip(timesES, bpmGT) if bpm_gt <= 150]

In [None]:
# compute and print errors
RMSE, MAE, MAX, PCC, CCC, SNR = vhr.utils.getErrors(bvp_win_filt, fps, bpmES_filt, bpmGT_filt, timesES_filt, timesGT_filt)
vhr.utils.printErrors(RMSE, MAE, MAX, PCC, CCC, SNR)
displayErrors(bpmES_filt, bpmGT_filt, timesES_filt, timesGT_filt)