---
# **PyVHR using the CVP Dataset**
---

# Single Video Test

## Imports

In [None]:
# -- MAIN IMPORT

import pyVHR as vhr
import numpy as np
import pandas as pd
import cv2
from pyVHR.utils.errors import *
from pyVHR.BVP import *

# Plotting: set 'colab' for Google Colaboratory, 'notebook' otherwise
vhr.plot.VisualizeParams.renderer = 'notebook'

## Load Dataset

In [None]:
# -- LOAD A DATASET

dataset_name = 'CVP'                      # the name of the python class handling it 

video_DIR = "C:\\Users\\20759193\\source\\repos\\pyVHR\\data"  # dir containing videos
BVP_DIR = "C:\\Users\\20759193\\source\\repos\\pyVHR\\data"    # dir containing the ground truth signals

dataset = vhr.datasets.datasetFactory(dataset_name, videodataDIR=video_DIR, BVPdataDIR=BVP_DIR)
allvideo = dataset.videoFilenames
allsignals = dataset.sigFilenames
"""
# print the first 10 in the list of video names with the progressive index (idx)
for v in range(len(allvideo)):
  while v < 5:
    print(v, allvideo[v])
    break

# print the first 10 in the list of ground truths with the progressive index (idx)
for s in range(len(allsignals)):
  while s < 5: 
    print(s, allsignals[s])
    break
  """

## Load the Ground Truth Data

In [None]:
# -- PARAMETER SETTING
wsize = 8        # seconds of video processed (with overlapping) for each estimate 
video_idx = 4 #np.random.randint(0,len(allvideo))    # index of the video to be processed
fname = dataset.getSigFilename(video_idx) # get the filename of the signal file
videoFileName = dataset.getVideoFilename(video_idx) # get the filename of the video file

try:
    sigGT_ECG = dataset.readSigfile(fname, signalGT='ECG')
    sigGT_ECG.show_ECG = True
    bpmGT_ECG, timesGT_ECG = sigGT_ECG.getBPM(wsize)
except Exception as e:
    print("ECG not found. Error:", e)
    sigGT_ECG = bpmGT_ECG = timesGT_ECG = None

try:
    sigGT_ABP = dataset.readSigfile(fname, signalGT='ABP')
    bpmGT_ABP, timesGT_ABP = sigGT_ABP.getBPM(wsize)
except Exception as e:
    print("ABP not found. Error:", e)
    sigGT_ABP = bpmGT_ABP = timesGT_ABP = None

try:
    sigGT_CVP = dataset.readSigfile(fname, signalGT='CVP')
    bpmGT_CVP, timesGT_CVP = sigGT_CVP.getBPM(wsize)
except Exception as e:
    print("CVP not found. Error:", e)
    sigGT_CVP = bpmGT_CVP = timesGT_CVP = None



path_segments = videoFileName.split("\\")
instance, camera = path_segments[-2].split("_")

print('Video processed name: ', videoFileName)
print('Signal processed name: ', fname)
print('Instance:            ', instance)
print('Camera:              ', camera)
fps = vhr.extraction.get_fps(videoFileName)
print('Video frame rate:     ',fps)

# -- DISPLAY CVP_GT SPECTROGRAM
try:
    print("CVP Spectrum")
    sigGT_CVP.displaySpectrum()
except Exception as e:
    print("No CVP")

try:
    print("ABP Spectrum")
    sigGT_ABP.displaySpectrum()
except Exception as e:
    print("No ABP")

try:
    print("ECG BPMs")
    for t in range(len(timesGT_ECG)):
        print(round(timesGT_ECG[t],2), round(bpmGT_ECG[t],2))
except Exception as e:
    print("No ECG")

# -- DISPLAY VIDEO FRAMES

vhr.plot.display_video(videoFileName)

# Skin extraction

We've defined a custom version of skin extraction for the neck, compared with the face detection versions that use the **convex hull** or **face parsing** versions.

Once the skin is selected, select how to process it: 

* **Patches**: small facial regions of skin  centered on landmarks (provides multiple estimators)
* **Holistic**: convex hull of patches or face parsing CNN (provides a single  estimator)

***Note***: `SignalProcessing` is powered by CUDA

In [None]:
sig_extractor = vhr.extraction.SignalProcessing()
sig_extractor.display_cuda_device()
sig_extractor.choose_cuda_device(0)

Use our custom skin extraction method where we define a rectangle

In [None]:
sig_extractor.set_skin_extractor(vhr.extraction.SkinExtractionRectangle('GPU'))

Choose a specific number of frames of the video to process... 

In [None]:
# set the number of seconds (0 for all video)
seconds = 0
sig_extractor.set_total_frames(seconds*fps)

### Color-thresholding

**OPTIONAL**: Both signal extraction and skin extraction have a color-threshold filter for removing unwanted RGB colors. We can set the RGB threshold interval using theese classes:

In [None]:
vhr.extraction.SkinProcessingParams.RGB_LOW_TH =  0
vhr.extraction.SkinProcessingParams.RGB_HIGH_TH = 255

vhr.extraction.SignalProcessingParams.RGB_LOW_TH = 0
vhr.extraction.SignalProcessingParams.RGB_HIGH_TH = 255

## Visualize skin and landmarks 

* To visualize skin processing intermediate results call `set_visualize_skin_and_landmarks` method.
* To retrieve any intermediate result call the methods `get_visualize_skin` and 
`get_visualize_patches`

In [None]:
# -- SET VISUALIZATION MODE 
sig_extractor.set_visualize_skin_and_landmarks(
      visualize_skin=True, 
      visualize_landmarks=True, 
      visualize_landmarks_number=True, 
      visualize_patch=True)

# ROI processing and RGB computation

Choose how to extract the RGB signal from ROI:

* **Holistic** mean
* **Patches** mean

Patches are square (with a fixed edge for all) or rectangular (with xy_dimension for each region).

# Holistic extraction

In [None]:
# -- HOLISTIC EXTRACTION
hol_sig = sig_extractor.extract_holistic_rectangle(videoFileName,40)
print('Size: (#frames, #landmarks, #channels) = ',hol_sig.shape)

In [None]:
# -- INTERACTIVE VISUALIZATION OF EXTRACTED SKIN
visualize_skin_coll = sig_extractor.get_visualize_skin()
print('Number of frames processed: ',len(visualize_skin_coll))
vhr.plot.interactive_image_plot(visualize_skin_coll,1.0)

## Signal windowing

Windowing means to split a video into a set of strided and overlapped windows of frames. For each window the RGB signal is estracted by averaging over pixels in holistic (all skin pixels) or local (averaging on patches) fashion. Shapes are `(rgb_channels, #frames)` and `(#landmarks, rgb_channels, #frames)` respectively. 

In [None]:
# -- WINDOWING OF RGB SIGNALS (HOLISTIC)
windowed_hol_sig, timesES = vhr.extraction.sig_windowing(hol_sig, wsize, 1, fps)
print('Num windows: ',len(windowed_hol_sig))
print('Num channels and window length: ', windowed_hol_sig[0].shape)

"""
# -- PLOT A WINDOW (randomly chosen)
w = np.random.randint(0, len(windowed_hol_sig))   # window number
vhr.plot.visualize_windowed_sig(windowed_hol_sig, w)
"""

## Pre-filtering

The implemented (standard) filters are:

* `rgb_filter_ths`: color threshold filter that filters out signals that, in at least one frame of the window, are outside the rgb colors interval `[(LOW, LOW, LOW), (HIGH, HIGH, HIGH)]` where `LOW` is the dictionary parameter `RGB_LOW_TH`, and `HIGH` is `RGB_HIGH_TH` (we suggest to always use this filter before applying a BVP method)
* `detrend`: apply detrend to the signal
* `sg_detrend`: apply detrend to the signal, i.e. remove the low-frequency components with the low-pass filter developed by Savitzky-Golay
* `zscore`: apply z-score to the signal
* `BPfilter`: apply Butterworth band-pass filter to the signal

In [None]:
# -- APPLY TRESHOLDING ON RGB COLORS (suggested)

filtered_windowed_hol_sig = vhr.BVP.apply_filter(windowed_hol_sig, vhr.BVP.rgb_filter_th, fps=fps, params={'RGB_LOW_TH': vhr.extraction.SignalProcessingParams.RGB_LOW_TH, 'RGB_HIGH_TH': vhr.extraction.SignalProcessingParams.RGB_HIGH_TH})

print('Num windows: ', len(filtered_windowed_hol_sig))
print('Win size: (#signals, #channels, #frames) = ', filtered_windowed_hol_sig[0].shape)

# -- SELECT THE FILTER CASCADE
filtered_windowed_hol_sig = vhr.BVP.apply_filter(filtered_windowed_hol_sig, vhr.BVP.BPfilter, params={'order':6,'minHz':0.65,'maxHz':4.0,'fps':fps})
#filtered_windowed_hol_sig = vhr.BVP.apply_filter(filtered_windowed_hol_sig, vhr.BVP.detrend)
#filtered_windowed_hol_sig = vhr.BVP.apply_filter(filtered_windowed_hol_sig, vhr.BVP.sg_detrend)
#filtered_windowed_hol_sig = vhr.BVP.apply_filter(filtered_windowed_hol_sig, vhr.BVP.zscore)
#filtered_windowed_hol_sig = vhr.BVP.apply_filter(filtered_windowed_hol_sig, vhr.BVP.zeromean)
print('Num windows: ', len(filtered_windowed_hol_sig))
print('Win size: (#signals, #channels, #frames) = ', filtered_windowed_hol_sig[0].shape)

## Method: BVP extraction

To extract the BVP signal call the function `RGB_sig_to_BVP` with the following parameters:


*   `filt_windowed_sig`: the list of windows
*   `fps`: frame rate
*   `device_type`: `cuda`, `cpu`, `torch`
*   `method`: method function that supports method_type device
*   `params`: dictionary of parameters needed by the method ( default is {}).

Core methods implemented within pyVHR:
* **Methods**: `cpu_CHROM`, `cupy_CHROM`, `torch_CHROM`, `cpu_LGI`, `cpu_POS`, `cupy_POS`, `cpu_PBV`, `cpu_PCA`, `cpu_GREEN`, `cpu_OMIT`, `cpu_ICA`, `cpu_SSR`


***Note***: pyVHR contains many methods, but you can also use a custom method. Remember that it must accept a numpy.ndarray with shape (num_estimators, channels, num_frames) and return a numpy.ndarray with shape (num_estimators, num_frames)


In [None]:
"""# -- PLOT A WINDOW (randomly chosen)
w = np.random.randint(0, len(windowed_hol_sig))  # window number
vhr.plot.visualize_windowed_sig(filtered_windowed_hol_sig, w)"""

*bvps* is a list of length num_windows of numpy.ndarray with shape (num_estimators,num_frames)

In [None]:
# -- APPLY A METHOD TO EXTRACT BVP




# CHROM
#hol_bvps = RGB_sig_to_BVP(windowed_hol_sig, fps, device_type='cpu', method=cpu_CHROM)
#hol_bvps = RGB_sig_to_BVP(windowed_hol_sig, fps, device_type='torch', method=torch_CHROM)
hol_bvps = RGB_sig_to_BVP(filtered_windowed_hol_sig, fps, device_type='cuda', method=cupy_CHROM)

# LGI
#bvp = hol_bvps.append(RGB_sig_to_BVP(filtered_windowed_hol_sig, fps, device_type='cpu', method=cpu_LGI))
#print("LGI: ", bvp[0].shape)
#hol_bvps.append(bvp)

# POS
#hol_bvps = RGB_sig_to_BVP(windowed_hol_sig, fps, device_type='cpu', method=cpu_POS, params={'fps':fps})
#bvp = RGB_sig_to_BVP(filtered_windowed_hol_sig, fps, device_type='cuda', method=cupy_POS, params={'fps':fps})
#print("POS: ", bvp[0].shape)
#hol_bvps.append(bvp)

# PBV
#bvp = hol_bvps.append(RGB_sig_to_BVP(filtered_windowed_hol_sig, fps, device_type='cpu', method=cpu_PBV))
#print("PBV: ", bvp[0].shape)
#hol_bvps.append(bvp)

# PCA
#hol_bvps = RGB_sig_to_BVP(filtered_windowed_hol_sig, fps, device_type='cpu', method=cpu_PCA, params={'component':'all_comp'})
#print("PCA: ", bvp[0].shape)
#hol_bvps.append(bvp)

# GREEN
#bvp = hol_bvps.append(RGB_sig_to_BVP(filtered_windowed_hol_sig, fps, device_type='cpu', method=cpu_GREEN))
#print("GREEN: ", bvp[0].shape)
#hol_bvps.append(bvp)

# OMIT
#bvp = hol_bvps.append(RGB_sig_to_BVP(filtered_windowed_hol_sig, fps, device_type='cpu', method=cpu_OMIT))
#print("OMIT: ", bvp[0].shape)
#hol_bvps.append(bvp)

# ICA
#hol_bvps = RGB_sig_to_BVP(filtered_windowed_hol_sig, fps, device_type='cpu', method=cpu_ICA, params={'component':'all_comp'})
#print("ICA: ", bvp[0].shape)
#hol_bvps.append(bvp)

# SSR
#hol_bvps.append(RGB_sig_to_BVP(filtered_windowed_hol_sig, fps, device_type='cpu', method=cpu_SSR, params={'fps':fps}))

#print('Number of windows: ', len(hol_bvps))
#print('Number of estimators and number of number of frames in a windows: ', hol_bvps[0].shape)

In [None]:
"""# -- PLOT A WINDOW (randomly chosen)

w = np.random.randint(0, len(filtered_windowed_hol_sig))  # window number
vhr.plot.visualize_BVPs(hol_bvps, w)"""

## Post Filtering

As with prefiltering, we can apply all the filters showed before also to the *BVP*. 

In [None]:
# -- APPLY BPFILTER TO BVP WINDOWED 

hol_bvps = vhr.BVP.apply_filter(hol_bvps, BPfilter, params={'order':6,'minHz':0.75,'maxHz':4.0,'fps':fps})

#print('Num windows: ', len(hol_bvps))

## BVP spectrum

BVP spectrum analysis via PSD for holistic and patches approaches.

In [None]:
"""w = np.random.randint(0, len(windowed_hol_sig))  # window number
vhr.plot.visualize_BVPs_PSD(hol_bvps, w, fps)"""

## BMP estimation 

This function process all the windows and all the estimators (one for holistic and many for patches), and returns a list of numpy ndarray with shape (num_estimators,).

## BPM vs GT ANALYSIS

Error computation and visualization 

In [None]:
# -- BPM ESTIMATION 

#hol_bpmES = vhr.BPM.BVP_to_BPM(hol_bvps, fps)       # CPU version

hol_bpmES = vhr.BPM.BVP_to_BPM_cuda(hol_bvps, fps)  # CUDA version

In [None]:
# -- PRINT ERRORS USING METRICS: RMSE, MAE, MAX, PCC, CCC, SNR
# Get errors RMSE, MAE, MAX, PCC, CCC, SNR
RMSE, MAE, MAX, PCC, CCC, SNR = getErrors(hol_bvps, fps, hol_bpmES, bpmGT_ECG, timesES, timesGT_ECG)
#printErrors(RMSE, MAE, MAX, PCC, CCC, SNR)
#displayErrors(hol_bpmES, bpmGT_ECG, timesES, timesGT_ECG)

# Patch Extraction

## Get the patch_sig

### Defining fixed landmarks

In [None]:
# Given the size of the video, we set our shape of each patch, and how much overlap we want for the signals, to define
# the landmarks (think coordinates) of the patches.
sig_extractor.set_square_patches_side(50.)
sig_extractor.set_fixed_patches(videoFileName,region_type="squares",overlap=0)
print("Number of patches is: ", len(sig_extractor.ldmks))
wind = 0

In [None]:
sig_extraction_method = "mean" # or "median"
sig_extractor.thickness = 1
sig_extractor.font_size = 0.1
patch_sig = sig_extractor.extract_fixed_patches(sig_extraction_method,segmented_frames = False)

In [None]:
# -- INTERACTIVE VISUALIZATION OF PATCHES
visualize_patches_coll = sig_extractor.get_visualize_patches()
print('Number of frames processed: ',len(visualize_patches_coll))
vhr.plot.interactive_image_plot(visualize_patches_coll)

## Signal Windowing

In [None]:
# -- WINDOWING OF RGB SIGNALS ON PATCHES 
windowed_patch_sig, timesES = vhr.extraction.sig_windowing(patch_sig, wsize, 1, fps)
print('Num windows: ',len(windowed_patch_sig))
print('Num patches, Num channels, and window length: ', windowed_patch_sig[wind].shape)

In [None]:
"""# -- PLOT A WINDOW (randomly chosen)
w = np.random.randint(0, len(windowed_patch_sig))  # window number
vhr.plot.visualize_windowed_sig(windowed_patch_sig, w)"""

## Filtering

In [None]:
# -- APPLY TRESHOLDING ON RGB COLORS (suggested)
wind = 0
filtered_windowed_patch_sig, patch_ids = vhr.BVP.apply_custom_filter(windowed_patch_sig, vhr.BVP.rgb_filter_th_with_ids, params={'RGB_LOW_TH': vhr.extraction.SignalProcessingParams.RGB_LOW_TH, 'RGB_HIGH_TH': vhr.extraction.SignalProcessingParams.RGB_HIGH_TH})
print('Num windows: ', len(filtered_windowed_patch_sig))
print('Win size: (#landmarks, #channels, #frames) = ', filtered_windowed_patch_sig[wind].shape)

In [None]:
patch_ids[wind].shape

In [None]:
# -- SELECT THE FILTER CASCADE

filtered_windowed_patch_sig = vhr.BVP.apply_filter(filtered_windowed_patch_sig, vhr.BVP.BPfilter, params={'order':6,'minHz':0.75,'maxHz':4.0,'fps':fps})
#filtered_windowed_patch_sig = vhr.BVP.apply_filter(filtered_windowed_patch_sig, vhr.BVP.sg_detrend)
#filtered_windowed_patch_sig = vhr.BVP.apply_filter(filtered_windowed_patch_sig, vhr.BVP.detrend, params={'detLambda':100}) 
#filtered_windowed_patch_sig = vhr.BVP.apply_filter(filtered_windowed_patch_sig, vhr.BVP.zscore)
#filtered_windowed_patch_sig = vhr.BVP.apply_filter(filtered_windowed_patch_sig, vhr.BVP.zeromean)
print('Num windows: ', len(filtered_windowed_patch_sig))
print('Win size: (#landmarks, #channels, #frames) = ', filtered_windowed_patch_sig[wind].shape)

In [None]:
"""# -- PLOT A WINDOW (randomly chosen)

w = np.random.randint(0, len(windowed_patch_sig))  # window number
vhr.plot.visualize_windowed_sig(filtered_windowed_patch_sig, w)"""


## BVP

In [None]:
# -- APPLY A METHOD TO EXTRACT BVP

from pyVHR.BVP import *

#patch_bvps = RGB_sig_to_BVP(filtered_windowed_patch_sig, fps, device_type='cpu', method=cpu_CHROM)
patch_bvps = RGB_sig_to_BVP(filtered_windowed_patch_sig, fps, device_type='cuda', method=cupy_CHROM)
#patch_bvps = RGB_sig_to_BVP(filtered_windowed_patch_sig, fps, device_type='torch', method=torch_CHROM)
#patch_bvps = RGB_sig_to_BVP(filtered_windowed_patch_sig, fps, device_type='cuda', method=cupy_POS, params={'fps':fps})
#patch_bvps = RGB_sig_to_BVP(filtered_windowed_patch_sig, fps, device_type='cpu', method=cpu_POS, params={'fps':fps})
#patch_bvps = RGB_sig_to_BVP(filtered_windowed_patch_sig, fps, device_type='cpu', method=cpu_LGI)
#patch_bvps = RGB_sig_to_BVP(filtered_windowed_patch_sig, fps, device_type='cpu', method=cpu_GREEN)
#patch_bvps = RGB_sig_to_BVP(filtered_windowed_patch_sig, fps, device_type='cpu', method=cpu_ICA, params={'component':'second_comp'})

print('Number of windows: ', len(patch_bvps))
print('Number of estimators and number of number of frames in a windows: ', patch_bvps[wind].shape)

In [None]:
"""# -- PLOT A WINDOW (randomly chosen)
w = np.random.randint(0, len(windowed_patch_sig))  # window number
vhr.plot.visualize_BVPs(patch_bvps, w)"""

"""# -- PLOT A WINDOW (randomly chosen)
wind = np.random.randint(0, len(windowed_hol_sig))  # window number
vhr.plot.visualize_BVPs(hol_bvps, wind)"""

## Filter BVP


In [None]:
# -- APPLY BPFILTER TO BVP WINDOWED PATCHES

patch_bvps = vhr.BVP.apply_filter(patch_bvps, BPfilter, params={'order':6,'minHz':0.75,'maxHz':4.0,'fps':fps})
print('Num windows: ', len(patch_bvps))
print('Win size: (#landmarks, #frames) = ', patch_bvps[wind].shape)

In [None]:
processed_patch_bvps = vhr.plot.visualize.process_patch_BVPs(sig_extractor.fixed_patches, patch_ids, patch_bvps, fps, minHz = 0.65, maxHz = 4.0)

In [None]:
print('Win size: (#landmarks, #frames) = ', patch_bvps[wind].shape)

In [None]:
"""# -- PLOT A WINDOW (randomly chosen)

w = np.random.randint(0, len(windowed_patch_sig))  # window number
vhr.plot.visualize_BVPs(patch_bvps, w)"""

In [None]:
#wind = np.random.randint(0,len(patch_bvps))  # window number
#vhr.plot.visualize_BVPs_PSD_with_IDs(patch_bvps,patch_ids, wind, fps, maxHz=10)

### BPM by median

In [None]:
# -- BPM ESTIMATION BY PATCHES
#patch_bpmES = vhr.BPM.BVP_to_BPM(patch_bvps, fps)          # CPU version

patch_bpmES = vhr.BPM.BVP_to_BPM_cuda(patch_bvps, fps)    # CUDA version

In [None]:
def add_fixed_patch_info(fixed_patches,patch_ids,patch_bvps,patch_bpmES,timesES):
    for fixed_patch in fixed_patches:
        for window in range(len(patch_ids)):
            # loop through each patch, check if it's the one we're after, and append if it is, including the 
            for patch in range(len(patch_ids[window])):
                if patch_ids[window][patch] == fixed_patch.ID:
                    fixed_patch.times.append(timesES[window])
                    fixed_patch.bvps.append(np.array([patch_bvps[window][patch]]))
                    fixed_patch.bpms.append(np.array(patch_bpmES[window][patch]))
    return fixed_patches

In [None]:
def update_patch_snrs(fixed_patches,gt_bpms,fps):
    for fixed_patch in fixed_patches:
        if len(fixed_patch.times) > 0:
            fixed_patch.snr = []
            for i, bvp in enumerate(fixed_patch.bvps):
                fixed_patch.snr.append(get_SNR([bvp],fps,gt_bpms,[fixed_patch.times[i]]))
            fixed_patch.snr = np.array(fixed_patch.snr)
    return fixed_patches

In [None]:
def update_patch_errors(fixed_patches,gt_times,gt_bpms):
    for fixed_patch in fixed_patches:
        if len(fixed_patch.times) > 0:
            temp_bpms = np.expand_dims(np.array(fixed_patch.bpms),axis=0)
            fixed_patch.rmse = RMSEerror(temp_bpms,gt_bpms,fixed_patch.times,gt_times)
            fixed_patch.mae = MAEerror(temp_bpms,gt_bpms,fixed_patch.times,gt_times)
            fixed_patch.max = MAXError(temp_bpms,gt_bpms,fixed_patch.times,gt_times)
    return fixed_patches

In [None]:
def update_patch_metrics(fixed_patches,gt_times,gt_bpms,fps):
    fixed_patches = update_patch_snrs(fixed_patches,gt_bpms,fps)
    fixed_patches = update_patch_errors(fixed_patches,gt_times,gt_bpms)
    return fixed_patches

In [None]:
sig_extractor.fixed_patches = add_fixed_patch_info(sig_extractor.fixed_patches,patch_ids,patch_bvps,patch_bpmES,timesES)

In [None]:
sig_extractor.fixed_patches = update_patch_metrics(sig_extractor.fixed_patches,timesGT_ECG,bpmGT_ECG,fps)

In [None]:
# Read a json dataframe
df = pd.read_json("C:\\Users\\20759193\\source\\repos\\pyVHR\\results\\patch_dataframes\\2\\CHROM_K1_patch_df.json")

In [None]:
df.head()

In [None]:
def make_patch_dataframe(fixed_patches):    
    patch_data = []
    for fixed_patch in fixed_patches:
        patch_data.append([fixed_patch.ID,
                        fixed_patch.x_min,
                        fixed_patch.x_max,
                        fixed_patch.y_min,
                        fixed_patch.y_max,
                        np.array(fixed_patch.times),
                        np.array(fixed_patch.bvps),
                        np.array(fixed_patch.bpms),
                        fixed_patch.rmse[0],
                        fixed_patch.mae[0],
                        fixed_patch.max[0],
                        np.array(fixed_patch.snr)])
    df = pd.DataFrame(patch_data,columns = ["ID","x_min","x_max","y_min","y_max","times","bvps","bpms","RMSE","MAE","MAX","SNRs"])
    return df

In [None]:
patch_df = make_patch_dataframe(sig_extractor.fixed_patches)

In [None]:
# -- MEDIANS OF BPMS

patch_median_bpmES, MAD = vhr.BPM.BPM_median(patch_bpmES)

# -- VISUALIZE ALL BPMs AND MEDIANS
vhr.plot.visualize_multi_est_BPM_vs_BPMs_list([patch_bpmES, timesES], [[patch_median_bpmES, timesES, "medianES"],[bpmGT_ECG, timesGT_ECG, "GT"]])

In [None]:
patch_shape = (int(sig_extractor.square),int(sig_extractor.square))
patch_overlap = sig_extractor.overlap
image = sig_extractor.display_frame
overlap = sig_extractor.overlap
estimated_figs = []
error_figs = []

for wind in range(len(patch_bvps)):
    ldmks, fig1 = vhr.plot.visualize_BVPs_heatmap(image, patch_bvps, patch_ids, wind, patch_shape, overlap, fps, minHz=0.65, maxHz=4)
    fig2 = vhr.plot.visualize_BPM_Errors_heatmap(image, ldmks, timesES, wind, timesGT_ECG, bpmGT_ECG, patch_shape, overlap,vmin=-20,vmax=20)
    estimated_figs.append(fig1)
    error_figs.append(fig2)

In [None]:
vhr.plot.interactive_image_plot(estimated_figs,1.0)

In [None]:
vhr.plot.interactive_image_plot(error_figs,1.0)

### Patches - medians


In [None]:
# -- PRINT ERRORS USING METRICS: RMSE, MAE, MAX, PCC, CCC, SNR

from pyVHR.utils.errors import getErrors, printErrors, displayErrors


RMSE, MAE, MAX, PCC, CCC, SNR = getErrors(patch_bvps, fps, patch_median_bpmES, bpmGT_ECG, timesES, timesGT_ECG)
printErrors(RMSE, MAE, MAX, PCC, CCC, SNR)
displayErrors(patch_median_bpmES, bpmGT_ECG, timesES, timesGT_ECG)

## BPM by PSD


In [None]:
# -- BPM ESTIMATION BY PSD CUMUL
ma = vhr.extraction.MotionAnalysis(sig_extractor, wsize, fps)


In [None]:
psd_bpmES = vhr.BPM.BPM_clustering(ma, patch_bvps, fps, wsize, movement_thrs=None, opt_factor=0.5)

In [None]:

# -- PRINT ERRORS USING METRICS: RMSE, MAE, MAX, PCC, SNR
from pyVHR.utils.errors import getErrors, printErrors, displayErrors

RMSE, MAE, MAX, PCC, CCC, SNR = getErrors(patch_bvps, fps, psd_bpmES, bpmGT_ECG, timesES, timesGT_ECG)
printErrors(RMSE, MAE, MAX, PCC, CCC, SNR)
displayErrors(psd_bpmES, bpmGT_ECG, timesES, timesGT_ECG)

# Deep Models - Holistic

In [None]:
from pyVHR.utils.errors import getErrors, printErrors, displayErrors, BVP_windowing
frames = sig_extractor.extract_raw(videoFileName)
skin_frames = np.array(sig_extractor.visualize_skin_collection)

In [None]:
# apply MTTS_CAN model
bvp_pred = vhr.deepRPPG.MTTS_CAN_deep(frames, fps, verb=1)
bvps = vhr.BPM.BVPsignal(bvp_pred, fps) # BVP object
vhr.plot.visualize_BVPs([bvps.data], 0)
# BVP windowing & BPM estimate
bvp_win, timesES = BVP_windowing(bvp_pred, wsize, fps, stride=1)
bpmES = vhr.BPM.BVP_to_BPM_cuda(bvp_win, fps) 
# compute and print errors
RMSE, MAE, MAX, PCC, CCC, SNR = vhr.utils.getErrors(bvp_win, fps, bpmES, bpmGT_ECG, timesES, timesGT_ECG)
vhr.utils.printErrors(RMSE, MAE, MAX, PCC, CCC, SNR)
displayErrors(bpmES, bpmGT_ECG, timesES, timesGT_ECG)

In [None]:
for patch in sig_extractor.fixed_patches:
    bvp_pred = vhr.deepRPPG.MTTS_CAN_deep(patch.raw_frames, fps, verb=0)
    bvps = vhr.BPM.BVPsignal(bvp_pred, fps) # BVP object
    bvp_win, timesES = BVP_windowing(bvp_pred, wsize, fps, stride=1)
    bpmES = vhr.BPM.BVP_to_BPM_cuda(bvp_win, fps)
    patch.bvps = bvp_win
    patch.bpms = bpmES
    patch.times = timesES

In [None]:
# apply MTTS_CAN model
bvp_pred = vhr.deepRPPG.MTTS_CAN_deep(skin_frames, fps, verb=1)
bvps = vhr.BPM.BVPsignal(bvp_pred, fps) # BVP object
vhr.plot.visualize_BVPs([bvps.data], 0)
# BVP windowing & BPM estimate
bvp_win, timesES = BVP_windowing(bvp_pred, wsize, fps, stride=1)
bpmES = vhr.BPM.BVP_to_BPM_cuda(bvp_win, fps) 
# compute and print errors
RMSE, MAE, MAX, PCC, CCC, SNR = vhr.utils.getErrors(bvp_win, fps, bpmES, bpmGT_ECG, timesES, timesGT_ECG)
vhr.utils.printErrors(RMSE, MAE, MAX, PCC, CCC, SNR)
displayErrors(bpmES, bpmGT_ECG, timesES, timesGT_ECG)

In [None]:
# apply HR_CNN model
bvp_pred = vhr.deepRPPG.HR_CNN_bvp_pred(frames)
bvps = vhr.BPM.BVPsignal(bvp_pred, fps) # BVP object
vhr.plot.visualize_BVPs([bvps.data], 0)

# BVP windowing & BPM estimate
bvp_win, timesES = BVP_windowing(bvp_pred, wsize, fps, stride=1)
bpmES = vhr.BPM.BVP_to_BPM_cuda(bvp_win, fps) 

# compute and print errors
RMSE, MAE, MAX, PCC, CCC, SNR = vhr.utils.getErrors(bvp_win, fps, bpmES, bpmGT_ECG, timesES, timesGT_ECG)
vhr.utils.printErrors(RMSE, MAE, MAX, PCC, CCC, SNR)
displayErrors(bpmES, bpmGT_ECG, timesES, timesGT_ECG)

In [None]:
# apply HR_CNN model
bvp_pred = vhr.deepRPPG.HR_CNN_bvp_pred(skin_frames)
bvps = vhr.BPM.BVPsignal(bvp_pred, fps) # BVP object
vhr.plot.visualize_BVPs([bvps.data], 0)

# BVP windowing & BPM estimate
bvp_win, timesES = BVP_windowing(bvp_pred, wsize, fps, stride=1)
bpmES = vhr.BPM.BVP_to_BPM_cuda(bvp_win, fps) 

# compute and print errors
RMSE, MAE, MAX, PCC, CCC, SNR = vhr.utils.getErrors(bvp_win, fps, bpmES, bpmGT_ECG, timesES, timesGT_ECG)
vhr.utils.printErrors(RMSE, MAE, MAX, PCC, CCC, SNR)
displayErrors(bpmES, bpmGT_ECG, timesES, timesGT_ECG)

## Deep Processing - Patches

In [None]:
from tqdm import tqdm

In [None]:
for patch in tqdm(sig_extractor.fixed_patches):
    patch.bvps = []
    patch.bpms = []
    patch.times = []
    patch.snr = []
    patch.rmse = []
    patch.mae = []
    patch.max = []
    # apply MTTS_CAN model
    bvp_pred = vhr.deepRPPG.HR_CNN_bvp_pred(patch.raw_frames)
    bvps = vhr.BPM.BVPsignal(bvp_pred, fps) # BVP object
    # BVP windowing & BPM estimate
    bvp_win, timesES = BVP_windowing(bvp_pred, wsize, fps, stride=1)
    bpmES = vhr.BPM.BVP_to_BPM_cuda(bvp_win, fps)
    patch.bvps = bvp_win
    patch.bpms = bpmES
    patch.times = timesES
    # compute and print errors
    RMSE, MAE, MAX, PCC, CCC, SNR = vhr.utils.getErrors(bvp_win, fps, bpmES, bpmGT_ECG, timesES, timesGT_ECG)
    patch.rmse = RMSE
    patch.mae = MAE
    patch.max = MAX
    patch.snr = SNR

In [None]:
sig_extractor.fixed_patches[0].snr

## Process the whole dataset

In [None]:
"""# Single Block Patches

# Initialise everything
path = df.loc[df['INSTANCE'] == instance, camera + '_DATA_PATH'].values[0]
videoFileName = path + camera + "_Cropped_Colour.mkv"
sigFileName = path + "data.csv"
sig_extractor = vhr.extraction.SignalProcessing()   # Set the class
sig_extractor.set_visualize_skin_and_landmarks(
    visualize_skin=True, 
    visualize_landmarks=True, 
    visualize_landmarks_number=True, 
    visualize_patch=True)
sig_extractor.choose_cuda_device(0)                 # Set the GPU
sig_extractor.set_skin_extractor(vhr.extraction.SkinExtractionRectangle('GPU')) # Set the skin extractor
sig_extractor.set_total_frames(seconds*fps) # Set the number of frames
pixel_threshold = 30

# Get the ground truth data
try:
    sigGT_ECG = dataset.readSigfile(fname, signalGT='ECG')
    sigGT_ECG.show_ECG = True
    bpmGT_ECG, timesGT_ECG = sigGT_ECG.getBPM(wsize)
    # Add these to the DataFrame
    ecg_bpms = bpmGT_ECG.tolist()
    ecg_times = timesGT_ECG.tolist()
    ECG = [ecg_times, ecg_bpms]
    df.at[df[df['INSTANCE'] == int(instance)].index[0], 'ECG'] = ECG
except Exception as e:
    print("ECG not found. Error:", e)
    sigGT_ECG = bpmGT_ECG = timesGT_ECG = None

try:
    sigGT_ABP = dataset.readSigfile(fname, signalGT='ABP')
    bpmGT_ABP, timesGT_ABP = sigGT_ABP.getBPM(wsize)
    # Add these to the DataFrame
    abp_bpms = bpmGT_ABP.tolist()
    abp_times = timesGT_ABP.tolist()
    ABP = [abp_times, abp_bpms]
    df.at[df[df['INSTANCE'] == int(instance)].index[0], 'ABP'] = ABP
except Exception as e:
    print("ABP not found. Error:", e)
    sigGT_ABP = bpmGT_ABP = timesGT_ABP = None

try:
    sigGT_CVP = dataset.readSigfile(fname, signalGT='CVP')
    bpmGT_CVP, timesGT_CVP = sigGT_CVP.getBPM(wsize)
    # Add these to the DataFrame
    cvp_bpms = bpmGT_CVP.tolist()
    cvp_times = timesGT_CVP.tolist()
    CVP = [cvp_times, cvp_bpms]
    df.at[df[df['INSTANCE'] == int(instance)].index[0], 'CVP'] = CVP
except Exception as e:
    print("CVP not found. Error:", e)
    sigGT_CVP = bpmGT_CVP = timesGT_CVP = None

# Prefilter before we use the specific method
hol_sig = sig_extractor.extract_holistic_rectangle(videoFileName,pixel_threshold)    # Extract the signal
# TODO: Add patch signal

# Save the 10th frame just to check filtering.
frames = sig_extractor.extract_raw(videoFileName)
print(frames.shape)
skin_frames = np.array(sig_extractor.visualize_skin_collection)
print(skin_frames.shape)
# TODO: Add patch frames

cv2.imwrite(path + 'frame.png', cv2.cvtColor(frames[9], cv2.COLOR_RGB2BGR))
cv2.imwrite(path + 'skin_frame.png', cv2.cvtColor(skin_frames[9], cv2.COLOR_RGB2BGR))


windowed_hol_sig, timesES = vhr.extraction.sig_windowing(hol_sig, wsize, 1, fps) # Window the signal
filtered_windowed_hol_sig = vhr.BVP.apply_filter(windowed_hol_sig, vhr.BVP.rgb_filter_th, fps=fps, params={'RGB_LOW_TH': 5, 'RGB_HIGH_TH': 230}) # Apply the threshold filter
filtered_windowed_hol_sig = vhr.BVP.apply_filter(filtered_windowed_hol_sig, vhr.BVP.BPfilter, params={'order':6,'minHz':0.65,'maxHz':4.0,'fps':fps}) # Apply the other filter

# TODO: add the patched version of all of this

methods = ['CHROM', 'LGI', 'POS', 'PBV', 'GREEN', 'OMIT','ICA','PCA']
for method in methods:
    if method == "CHROM":
        print("Processing CHROM - Holistic")
        hol_bvps = RGB_sig_to_BVP(filtered_windowed_hol_sig, fps, device_type='cuda', method=cupy_CHROM)
        print("PROCESSING CHROM - Patches")
    elif method == "LGI":
        print("PROCESSING LGI - Holistic")
        hol_bvps = RGB_sig_to_BVP(filtered_windowed_hol_sig, fps, device_type='cpu', method=cpu_LGI)
        print("Processing LGI - Patches")
    elif method == "POS":
        print("PROCESSING POS - Holistic")
        hol_bvps = RGB_sig_to_BVP(filtered_windowed_hol_sig, fps, device_type='cuda', method=cupy_POS, params={'fps':fps})
        print("PROCESSING POS - Patches")
    elif method == "PBV":
        print("PROCESSING PBV - Holistic")
        hol_bvps = RGB_sig_to_BVP(filtered_windowed_hol_sig, fps, device_type='cpu', method=cpu_PBV)
        print("PROCESSING PBV - Patches")
    elif method == "PCA":
        print("PROCESSING PCA - Holistic")
        hol_bvps = RGB_sig_to_BVP(filtered_windowed_hol_sig, fps, device_type='cpu', method=cpu_PCA, params={'component':'all_comp'})
        print("PROCESSING PCA - Patches"")
    elif method == "GREEN":
        print("PROCESSING GREEN - Holistic")
        hol_bvps = RGB_sig_to_BVP(filtered_windowed_hol_sig, fps, device_type='cpu', method=cpu_GREEN)
        print("PROCESSING GREEN - Patches")
    elif method == "OMIT":
        print("PROCESSING OMIT - Holistic")
        hol_bvps = RGB_sig_to_BVP(filtered_windowed_hol_sig, fps, device_type='cpu', method=cpu_OMIT)
        print("PROCESSING OMIT - Patches"")
    elif method == "ICA":
        print("PROCESSING ICA - Holistic")
        hol_bvps = RGB_sig_to_BVP(filtered_windowed_hol_sig, fps, device_type='cpu', method=cpu_ICA, params={'component':'all_comp'})
        print("PROCESSING ICA - Patches")
    else:
        print("Method not found")
        continue

    # Apply the filter            
    hol_bvps = vhr.BVP.apply_filter(hol_bvps, BPfilter, params={'order':6,'minHz':0.75,'maxHz':4.0,'fps':fps})
    # Get BPM

    hol_bpmES = vhr.BPM.BVP_to_BPM_cuda(hol_bvps, fps)
    if method == "PCA":
        hol_bpmES = [x[0] for x in hol_bpmES]
    if method == "ICA":
        hol_bpmES = [x[0] for x in hol_bpmES]
        
    # Get the errors
    if sigGT_ECG is not None:
        RMSE, MAE, MAX, PCC, CCC, SNR = getErrors(hol_bvps, fps, hol_bpmES, bpmGT_ECG, timesES, timesGT_ECG)
    elif sigGT_ABP is not None:
        RMSE, MAE, MAX, PCC, CCC, SNR = getErrors(hol_bvps, fps, hol_bpmES, bpmGT_ABP, timesES, timesGT_ABP)
    elif sigGT_CVP is not None:
        RMSE, MAE, MAX, PCC, CCC, SNR = getErrors(hol_bvps, fps, hol_bpmES, bpmGT_CVP, timesES, timesGT_CVP)

    # Update the DataFrame
    ERRORS = [RMSE, MAE, MAX, PCC, CCC, SNR]
    OUTPUT = [timesES, hol_bpmES, ERRORS]
    df.at[df[df['INSTANCE'] == int(instance)].index[0], 'HOL_' + method + '_' + camera] = OUTPUT"""

In [None]:
"""# Single running block

# Set our parameters

seconds = 0     # seconds of video to be processed (0 for all video)
wsize = 8       # seconds of video processed (with overlapping) for each estimate
vhr.extraction.SkinProcessingParams.RGB_LOW_TH =  5     # threshold for skin extraction
vhr.extraction.SkinProcessingParams.RGB_HIGH_TH = 230   # threshold for skin extraction
vhr.extraction.SignalProcessingParams.RGB_LOW_TH = 5    # threshold for signal extraction
vhr.extraction.SignalProcessingParams.RGB_HIGH_TH = 230 # threshold for signal extraction
sig_extractor.set_visualize_skin_and_landmarks(
    visualize_skin=True, 
    visualize_landmarks=True, 
    visualize_landmarks_number=True, 
    visualize_patch=True)

# Select the video, to be used in loops later
video_idx = 0 #np.random.randint(0,len(allvideo))    # index of the video to be processed

# Initialise everything
sig_extractor = vhr.extraction.SignalProcessing()   # Set the class
sig_extractor.choose_cuda_device(0)                 # Set the GPU
sig_extractor.set_skin_extractor(vhr.extraction.SkinExtractionRectangle('GPU')) # Set the skin extractor
sig_extractor.set_total_frames(seconds*fps) # Set the number of frames
fname = dataset.getSigFilename(video_idx) # get the filename of the signal file
videoFileName = dataset.getVideoFilename(video_idx) # get the filename of the video file


# Get the ground truth data
try:
    sigGT_ECG = dataset.readSigfile(fname, signalGT='ECG')
    sigGT_ECG.show_ECG = True
    bpmGT_ECG, timesGT_ECG = sigGT_ECG.getBPM(wsize)
except Exception as e:
    print("ECG not found. Error:", e)
    sigGT_ECG = bpmGT_ECG = timesGT_ECG = None

try:
    sigGT_ABP = dataset.readSigfile(fname, signalGT='ABP')
    bpmGT_ABP, timesGT_ABP = sigGT_ABP.getBPM(wsize)
except Exception as e:
    print("ABP not found. Error:", e)
    sigGT_ABP = bpmGT_ABP = timesGT_ABP = None

try:
    sigGT_CVP = dataset.readSigfile(fname, signalGT='CVP')
    bpmGT_CVP, timesGT_CVP = sigGT_CVP.getBPM(wsize)
except Exception as e:
    print("CVP not found. Error:", e)
    sigGT_CVP = bpmGT_CVP = timesGT_CVP = None

# Prefilter before we use the specific method
hol_sig = sig_extractor.extract_holistic_rectangle(videoFileName,40)    # Extract the signal
sig_extractor.skin_collection
# TODO: Add a save of the 10th frame, normal and segmented
windowed_hol_sig, timesES = vhr.extraction.sig_windowing(hol_sig, wsize, 1, fps) # Window the signal
filtered_windowed_hol_sig = vhr.BVP.apply_filter(windowed_hol_sig, vhr.BVP.rgb_filter_th, fps=fps, params={'RGB_LOW_TH': 5, 'RGB_HIGH_TH': 230}) # Apply the threshold filter
filtered_windowed_hol_sig = vhr.BVP.apply_filter(filtered_windowed_hol_sig, vhr.BVP.BPfilter, params={'order':6,'minHz':0.65,'maxHz':4.0,'fps':fps}) # Apply the other filter
# TODO: add the patched version of all of this

methods = ['CHROM', 'LGI', 'POS', 'PBV', 'PCA', 'GREEN', 'OMIT', 'ICA']

# Use a method to extract the BVP
hol_bvps = RGB_sig_to_BVP(filtered_windowed_hol_sig, fps, device_type='cuda', method=cupy_CHROM) # Extract the BVP

# Apply a filter to the BVP
hol_bvps = vhr.BVP.apply_filter(hol_bvps, BPfilter, params={'order':6,'minHz':0.75,'maxHz':4.0,'fps':fps})

# Get the BPM
hol_bpmES = vhr.BPM.BVP_to_BPM_cuda(hol_bvps, fps)  # CUDA version

# Get the errors
RMSE, MAE, MAX, PCC, CCC, SNR = getErrors(hol_bvps, fps, hol_bpmES, bpmGT_ECG, timesES, timesGT_ECG)
printErrors(RMSE, MAE, MAX, PCC, CCC, SNR)
displayErrors(hol_bpmES, bpmGT_ECG, timesES, timesGT_ECG)
"""

In [None]:
"""for instance in range(len(df['INSTANCE'])):

    if df['PROCESSED'][instance] == True:
        continue

    videoFileNames = []
    sigFileNames = []

    if df['K1_DATA_PATH'][instance] == None:
        continue
    videoFileNames.append(df['K1_DATA_PATH'][instance] + "K1_Cropped_Colour.mkv")
    sigFileNames.append(df['K1_DATA_PATH'][instance] + "data.csv")

    if df['K2_DATA_PATH'][instance] == None:
        continue
    videoFileNames.append(df['K1_DATA_PATH'][instance] + "K1_Cropped_Colour.mkv")
    sigFileNames.append(df['K1_DATA_PATH'][instance] + "data.csv")"""