<a href="https://colab.research.google.com/github/aytekin827/TIL/blob/main/pyVHR%EB%94%A5%EB%9F%AC%EB%8B%9D_%ED%8C%8C%EC%9D%B4%ED%94%84%EB%9D%BC%EC%9D%B8.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

---
# < **pyVHR 딥러닝 파이프라인** >
---

## 환경설정



### cupy 설치  


1. cuda 버전 확인  
2. cupy-cuda11x 삭제 후 재설치  
3. `import cupy`



[코렙 cuda 버전정보 확인]
- Torch version:2.0.1+cu118  
- cuda version: 11.8  
- cudnn version:8700

In [None]:
!nvcc --version

nvcc: NVIDIA (R) Cuda compiler driver
Copyright (c) 2005-2022 NVIDIA Corporation
Built on Wed_Sep_21_10:33:58_PDT_2022
Cuda compilation tools, release 11.8, V11.8.89
Build cuda_11.8.r11.8/compiler.31833905_0


In [None]:
!pip uninstall cupy-cuda11x -Y
!pip install cupy-cuda11x

In [None]:
import cupy

--------------------------------------------------------------------------------

  CuPy may not function correctly because multiple CuPy packages are installed
  in your environment:

    cupy-cuda110, cupy-cuda11x

  Follow these steps to resolve this issue:

    1. For all packages listed above, run the following command to remove all
       existing CuPy installations:

         $ pip uninstall <package_name>

      If you previously installed CuPy via conda, also run the following:

         $ conda uninstall cupy

    2. Install the appropriate CuPy package.
       Refer to the Installation Guide for detailed instructions.

         https://docs.cupy.dev/en/stable/install.html

--------------------------------------------------------------------------------



### mediapipe 설치

In [None]:
!pip install mediapipe

Collecting mediapipe
  Downloading mediapipe-0.10.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (33.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m33.5/33.5 MB[0m [31m38.1 MB/s[0m eta [36m0:00:00[0m
Collecting sounddevice>=0.4.4 (from mediapipe)
  Downloading sounddevice-0.4.6-py3-none-any.whl (31 kB)
Installing collected packages: sounddevice, mediapipe
Successfully installed mediapipe-0.10.3 sounddevice-0.4.6


## 필요 클래스 정의

### pyVHR/extraction/utils

In [None]:
from numba import prange, njit
import numpy as np
import cv2
import numpy as np
from scipy.signal import welch, butter, filtfilt, iirnotch, freqz
from sklearn.decomposition import PCA
from scipy.stats import iqr, median_abs_deviation

class MotionAnalysis():
  """
    Extraction MediaPipe landmarks for motion analysis
  """

  def __init__(self, sig_extractor, winsize, fps, stride=1, landmks=None, Q_notch=10):
    self.winsize = winsize
    self.fps = fps
    # Q factor of notch filter
    self.Q_notch = Q_notch
    self.stride = stride
    # selected MediaPipe landmarks
    self.select_landmarks(landmks)
    # all MediaPipe landmarks
    self.landmarks = sig_extractor.get_landmarks()
    # lanmk IDs effectively used within filter
    self.landmarks_idx = sig_extractor.ldmks
    # shapes of cropped frames
    self.shapes = sig_extractor.get_cropped_skin_im_shapes()
    # win_landmks: lanmks in a win, win_x: x components, win_y: y componets, times: win times
    self.movements_windowing()

  def select_landmarks(self, landmks=None):
    """
    Landmark selection from MediaPipe for motion analysis

      Args:
        landmarks (float32 ndarray): ndarray with shape [num_frames, num_estimators, 2-coords].
        shapes (float32 ndarray)   : ndarray with shape [2-coords, num_frames]
        wsize (float)              : window size in seconds.
        stride (float)             : stride between overlapping windows in seconds.
        fps (float)                : frames per seconds.

    """
    # default landmks selected from front to nose
    if landmks is None:
      self.landmks_selected = [10, 151, 6, 197, 5, 4]
    else:
       self.landmks_selected = landmks

  def movements_windowing(self):
    """
    Calculation of overlapping windows of landmarks coordinates.

    Uses:
        landmarks (float32 ndarray): ndarray with shape [num_frames, num_estimators, 2-coords].
        shapes (float32 ndarray)   : ndarray with shape [2-coords, num_frames]
        wsize (float)              : window size in seconds.
        stride (float)             : stride between overlapping windows in seconds.
        fps (float)                : frames per seconds.

    Provides:
        A list of ndarray (float32) with shape [wsize, num_estimators, 2-coords]
        A list of ndarray (float32) with shape [wsize]
        A list of ndarray (float32) with shape [wsize]
        and an array (float32) of times in seconds (win centers)
    """
    N = self.landmarks.shape[0]
    block_idx, timesLmks = sliding_straded_win_idx(N, self.winsize, self.stride, self.fps)
    lmks_xy = []
    win_x = []
    win_y = []
    for e in block_idx:
      st_frame = int(e[0])
      end_frame = int(e[-1])
      coords = np.copy(self.landmarks[st_frame: end_frame+1])
      x_coords = np.copy(self.shapes[1][st_frame: end_frame+1])
      y_coords = np.copy(self.shapes[0][st_frame: end_frame+1])
      lmks_xy.append(coords)
      win_x.append(x_coords)
      win_y.append(y_coords)
    self.win_landmks = np.array(lmks_xy)
    self.win_x = np.array(win_x)
    self.win_y = np.array(win_y)
    self.timesLmks = timesLmks

  def get_win_motion_filter_old(self, win):
    # loop on landmarks
    pos = [self.landmarks_idx.index(i) for i in self.landmks_selected if i in self.landmarks_idx]
    X_cords = self.win_landmks[win][:,pos,0]
    Y_cords = self.win_landmks[win][:,pos,1]
    X_cords = X_cords.T     # shape: (#lanmks, winsize)
    Y_cords = Y_cords.T     # shape: (#lanmks, winsize)
    Z_cords = np.expand_dims(self.win_x[win], axis=0)

    Wx, Hx, Ex = self.mov_notch(X_cords)
    Wy, Hy, Ey = self.mov_notch(Y_cords)
    Wz, Hz, Ez = self.mov_notch(Z_cords)

    return Hx, Hy, Hz, Ex, Ey, Ez

  def mov_notch(self, coords):

    # coords filtering
    b, a = butter(6, Wn=[.65,4.0], fs=self.fps, btype='bandpass')
    coords = filtfilt(b, a, coords, axis=1)
    F, P = Welch(coords, self.fps)  # shape: (#lanmks, winsize)
    P = P.T
    pca = PCA(n_components=1)   # PCA
    pca.fit(P)
    P = pca.fit_transform(P).T
    P = P.squeeze()

    # energy
    P_min = np.min([np.min(P), 0])
    En = np.sum(P-P_min)/(P.shape[0])

    # movement notch
    idx = np.argmax(P)
    P_max = P[idx]
    F_max = F[idx]
    b, a = iirnotch(F_max/60.0, self.Q_notch, self.fps)
    W, H = freqz(b, a, worN=1025,  fs=self.fps)
    band = np.argwhere((W > 0.65) & (W < 4.0)).flatten()
    W_notch = 60.0*W[band]
    H_notch = np.abs(H[band])

    #plt.plot(F, P)
    #plt.show()
    #plt.plot(W_notch, H_notch)
    #plt.show()

    return W_notch, H_notch, En


  def get_win_motion_filter(self, win):
    # loop on landmarks
    pos = [self.landmarks_idx.index(i) for i in self.landmks_selected if i in self.landmarks_idx]
    X_cords = self.win_landmks[win][:,pos,0]
    Y_cords = self.win_landmks[win][:,pos,1]
    X_cords = X_cords.T     # shape: (#lanmks, winsize)
    Y_cords = Y_cords.T     # shape: (#lanmks, winsize)
    Z_cords = np.expand_dims(self.win_x[win], axis=0)

    # filtering
    b, a = butter(6, Wn=[.65,4.0], fs=self.fps, btype='bandpass')

    #----- X -----
    X_cords = filtfilt(b, a, X_cords, axis=1)
    _, XPow = Welch(X_cords, self.fps)  # shape: (#lanmks, winsize)
    XPow = XPow.T
    pcax = PCA(n_components=1)   # PCA
    pcax.fit(XPow)
    XPow = pcax.fit_transform(XPow).T
    XPmov = XPow.squeeze()
    # energy
    XPmov_min = np.min([np.min(XPmov), 0])
    XEnergy = np.sum(XPmov-XPmov_min)/(XPmov.shape[0])
    XFmov = 1 - XPmov/np.max(XPmov) # X filter

    #-----Y-----
    Y_cords = filtfilt(b, a, Y_cords, axis=1)
    _, YPow = Welch(Y_cords, self.fps)  # shape: (#lanmks, winsize)
    YPow = YPow.T
    pcay = PCA(n_components=1)  #PCA
    pcay.fit(YPow)
    YPow = pcay.fit_transform(YPow).T
    YPmov = YPow.squeeze()
    # energy
    YPmov_min = np.min([np.min(YPmov), 0])
    YEnergy = np.sum(YPmov - YPmov_min) / (YPmov.shape[0])
    YFmov = 1 - YPmov/np.max(YPmov)  # Y filter

    #-----Z-----
    Z_cords = filtfilt(b, a, Z_cords, axis=1)
    _, ZPow = Welch(Z_cords, self.fps)
    ZPow = ZPow.T
    ZPmov = ZPow.squeeze()
    ZEnergy = np.sum(ZPmov) / (ZPmov.shape[0])
    ZFmov = 1 - ZPmov/np.max(ZPmov) # Z filter

    return XFmov, YFmov, ZFmov, XEnergy, YEnergy, ZEnergy

class MagicLandmarks():
    """
    This class contains usefull lists of landmarks identification numbers.
    """
    high_prio_forehead = [10, 67, 69, 104, 108, 109, 151, 299, 337, 338]
    high_prio_nose = [3, 4, 5, 6, 45, 51, 115, 122, 131, 134, 142, 174, 195, 196, 197, 198,
                      209, 217, 220, 236, 248, 275, 277, 281, 360, 363, 399, 419, 420, 429, 437, 440, 456]
    high_prio_left_cheek = [36, 47, 50, 100, 101, 116, 117,
                            118, 119, 123, 126, 147, 187, 203, 205, 206, 207, 216]
    high_prio_right_cheek = [266, 280, 329, 330, 346, 347,
                             347, 348, 355, 371, 411, 423, 425, 426, 427, 436]

    mid_prio_forehead = [8, 9, 21, 68, 103, 251,
                         284, 297, 298, 301, 332, 333, 372, 383]
    mid_prio_nose = [1, 44, 49, 114, 120, 121, 128, 168, 188, 351, 358, 412]
    mid_prio_left_cheek = [34, 111, 137, 156, 177, 192, 213, 227, 234]
    mid_prio_right_cheek = [340, 345, 352, 361, 454]
    mid_prio_chin = [135, 138, 169, 170, 199, 208, 210, 211,
                     214, 262, 288, 416, 428, 430, 431, 432, 433, 434]
    mid_prio_mouth = [92, 164, 165, 167, 186, 212, 322, 391, 393, 410]
    # more specific areas
    forehead_left = [21, 71, 68, 54, 103, 104, 63, 70,
                     53, 52, 65, 107, 66, 108, 69, 67, 109, 105]
    forehead_center = [10, 151, 9, 8, 107, 336, 285, 55, 8]
    forehoead_right = [338, 337, 336, 296, 285, 295, 282,
                       334, 293, 301, 251, 298, 333, 299, 297, 332, 284]
    eye_right = [283, 300, 368, 353, 264, 372, 454, 340, 448,
                 450, 452, 464, 417, 441, 444, 282, 276, 446, 368]
    eye_left = [127, 234, 34, 139, 70, 53, 124,
                35, 111, 228, 230, 121, 244, 189, 222, 143]
    nose = [193, 417, 168, 188, 6, 412, 197, 174, 399, 456,
            195, 236, 131, 51, 281, 360, 440, 4, 220, 219, 305]
    mounth_up = [186, 92, 167, 393, 322, 410, 287, 39, 269, 61, 164]
    mounth_down = [43, 106, 83, 18, 406, 335, 273, 424, 313, 194, 204]
    chin = [204, 170, 140, 194, 201, 171, 175,
            200, 418, 396, 369, 421, 431, 379, 424]
    cheek_left_bottom = [215, 138, 135, 210, 212, 57, 216, 207, 192]
    cheek_right_bottom = [435, 427, 416, 364,
                          394, 422, 287, 410, 434, 436]
    cheek_left_top = [116, 111, 117, 118, 119, 100, 47, 126, 101, 123,
                      137, 177, 50, 36, 209, 129, 205, 147, 177, 215, 187, 207, 206, 203]
    cheek_right_top = [349, 348, 347, 346, 345, 447, 323,
                       280, 352, 330, 371, 358, 423, 426, 425, 427, 411, 376]
    # dense zones used for convex hull masks
    left_eye = [157,144, 145, 22, 23, 25, 154, 31, 160, 33, 46, 52, 53, 55, 56, 189, 190, 63, 65, 66, 70, 221, 222, 223, 225, 226, 228, 229, 230, 231, 232, 105, 233, 107, 243, 124]
    right_eye = [384, 385, 386, 259, 388, 261, 265, 398, 276, 282, 283, 285, 413, 293, 296, 300, 441, 442, 445, 446, 449, 451, 334, 463, 336, 464, 467, 339, 341, 342, 353, 381, 373, 249, 253, 255]
    mounth = [391, 393, 11, 269, 270, 271, 287, 164, 165, 37, 167, 40, 43, 181, 313, 314, 186, 57, 315, 61, 321, 73, 76, 335, 83, 85, 90, 106]
    # equispaced facial points - mouth and eyes are excluded.
    equispaced_facial_points = [2, 3, 4, 5, 6, 8, 9, 10, 18, 21, 32, 35, 36, 43, 46, 47, 48, 50, 54, \
             58, 67, 68, 69, 71, 92, 93, 101, 103, 104, 108, 109, 116, 117, \
             118, 123, 132, 134, 135, 138, 139, 142, 148, 149, 150, 151, 152, 182, 187, 188, 193, 197, 201, 205, 206, 207, \
             210, 211, 212, 216, 234, 248, 251, 262, 265, 266, 273, 277, 278, 280, \
             284, 288, 297, 299, 322, 323, 330, 332, 333, 337, 338, 345, \
             346, 361, 363, 364, 367, 368, 371, 377, 379, 411, 412, 417, 421, 425, 426, 427, 430, 432, 436]

def get_magic_landmarks():
    """ returns high_priority and mid_priority list of landmarks identification number """
    return [*MagicLandmarks.forehead_center, *MagicLandmarks.cheek_left_bottom, *MagicLandmarks.cheek_right_bottom], [*MagicLandmarks.forehoead_right, *MagicLandmarks.forehead_left, *MagicLandmarks.cheek_left_top, *MagicLandmarks.cheek_right_top]

@njit(parallel=True)
def draw_rects(image, xcenters, ycenters, xsides, ysides, color):
    """
    This method is used to draw N rectangles on a image.
    """
    for idx in prange(len(xcenters)):
        leftx = int(xcenters[idx] - xsides[idx]/2)
        rightx = int(xcenters[idx] + xsides[idx]/2)
        topy = int(ycenters[idx] - ysides[idx]/2)
        bottomy = int(ycenters[idx] + ysides[idx]/2)
        for x in prange(leftx, rightx):
            if topy >= 0 and x >= 0 and x < image.shape[1]:
                image[topy, x, 0] = color[0]
                image[topy, x, 1] = color[1]
                image[topy, x, 2] = color[2]
            if bottomy < image.shape[0] and x >= 0 and x < image.shape[1]:
                image[bottomy, x, 0] = color[0]
                image[bottomy, x, 1] = color[1]
                image[bottomy, x, 2] = color[2]
        for y in prange(topy, bottomy):
            if leftx >= 0 and y >= 0 and y < image.shape[0]:
                image[y, leftx, 0] = color[0]
                image[y, leftx, 1] = color[1]
                image[y, leftx, 2] = color[2]
            if rightx < image.shape[1] and y >= 0 and y < image.shape[0]:
                image[y, rightx, 0] = color[0]
                image[y, rightx, 1] = color[1]
                image[y, rightx, 2] = color[2]
    return image

def sig_windowing(sig, wsize, stride, fps):
    """
    This method is used to divide a RGB signal into overlapping windows.

    Args:
        sig (float32 ndarray): ndarray with shape [num_frames, num_estimators, rgb_channels].
        wsize (float): window size in seconds.
        stride (float): stride between overlapping windows in seconds.
        fps (float): frames per seconds.

    Returns:
        A list of ndarray (float32) with shape [num_estimators, rgb_channels, window_frames],
        an array (float32) of times in seconds (win centers)
    """
    N = sig.shape[0]
    block_idx, timesES = sliding_straded_win_idx(N, wsize, stride, fps)
    block_signals = []
    for e in block_idx:
        st_frame = int(e[0])
        end_frame = int(e[-1])
        wind_signal = np.copy(sig[st_frame: end_frame+1])
        wind_signal = np.swapaxes(wind_signal, 0, 1)
        wind_signal = np.swapaxes(wind_signal, 1, 2)
        block_signals.append(wind_signal)
    return block_signals, timesES

    """
    This method is used to divide a Raw signal into overlapping windows.

    Args:
        sig (float32 ndarray): ndarray of images with shape [num_frames, rows, columns, rgb_channels].
        wsize (float): window size in seconds.
        stride (float): stride between overlapping windows in seconds.
        fps (float): frames per seconds.

    Returns:
        windowed signal as a list of length num_windows of float32 ndarray with shape [num_frames, rows, columns, rgb_channels],
        and a 1D ndarray of times in seconds,where each one is the center of a window.
    """
    N = raw_signal.shape[0]
    block_idx, timesES = sliding_straded_win_idx(N, wsize, stride, fps)
    block_signals = []
    for e in block_idx:
        st_frame = int(e[0])
        end_frame = int(e[-1])
        wind_signal = np.copy(raw_signal[st_frame: end_frame+1])
        # check for zero traces
        sum_wind = np.sum(wind_signal, axis=(1,2))
        zero_idx = np.argwhere(sum_wind == 0).squeeze()
        est_idx = np.ones(wind_signal.shape[0], dtype=bool)
        est_idx[zero_idx] = False
        # append traces
        block_signals.append(wind_signal[est_idx])
    return block_signals, timesES

def sliding_straded_win_idx(N, wsize, stride, fps):
    """
    This method is used to compute the indices for creating an overlapping windows signal.

    Args:
        N (int): length of the signal.
        wsize (float): window size in seconds.
        stride (float): stride between overlapping windows in seconds.
        fps (float): frames per seconds.

    Returns:
        List of ranges, each one contains the indices of a window, and a 1D ndarray of times in seconds, where each one is the center of a window.
    """
    wsize_fr = wsize*fps
    stride_fr = stride*fps
    idx = []
    timesES = []
    num_win = int((N-wsize_fr)/stride_fr)+1
    s = 0
    for i in range(num_win):
        idx.append(np.arange(s, s+wsize_fr))
        s += stride_fr
        timesES.append(wsize/2+stride*i)
    return idx, np.array(timesES, dtype=np.float32)

def get_fps(videoFileName):
    """
    This method returns the fps of a video file name or path.
    """
    vidcap = cv2.VideoCapture(videoFileName)
    fps = vidcap.get(cv2.CAP_PROP_FPS)
    vidcap.release()
    return fps

def Welch(bvps, fs):
  _, n = bvps.shape
  if n < 256:
      seglength = n
      overlap = int(0.6*n)  # fixed overlapping
  else:
      seglength = 256
      overlap = 200
  # -- periodogram by Welch

  if np.isnan(bvps).any():
    print('OK bvp')
  F, P = welch(bvps, nperseg=seglength, noverlap=overlap, fs=fs, nfft=2048)
  F = F.astype(np.float32)
  P = P.astype(np.float32)
  # -- freq subband (0.65 Hz - 4.0 Hz)
  band = np.argwhere((F > 0.65) & (F < 4.0)).flatten()
  Pfreqs = 60*F[band]
  Power = P[:, band]
  return Pfreqs, Power

def extract_frames_yield(videoFileName):
    """
    This method yield the frames of a video file name or path.
    """
    vidcap = cv2.VideoCapture(videoFileName)
    success, image = vidcap.read()
    while success:
        yield image
        success, image = vidcap.read()
    vidcap.release()

def med_mad(x):
  MED = np.median(x)
  MAD = median_abs_deviation(x)
  return MED, MAD

def adjust_BMPs(bpmES, bpmES0, bpmES1, wsize, thr=10):
  """
    adjust the final estimate by evaluating whether to replace the chosen value
    with the discarded one based on the distance between the current value and the median
    Args:
        bpmES: the estimates chosen.
        bpmES0: the estimates given by the first cluster.
        bpmES1: the estimates given by the second cluster.
        wsize: thr size of the windows used (in seconds)
        thr: a multiplier factor for the MAD.
    Returns:
        bpmES: possibly redefined bpm estimates.
    """
  T = thr
  N = len(bpmES)
  bpmES = np.array(bpmES)
  bpmES0 = np.array(bpmES0)
  bpmES1 = np.array(bpmES1)
  for i in range(N):
    L = max(0,i-wsize)
    R = min(N,i+wsize)
    MED, MAD = med_mad(bpmES[L:R])
    diff = np.abs(bpmES[i]-MED)      # all diffs with MED
    if diff > T+MAD:
      if abs(bpmES[i]-bpmES0[i]) < 10e-9 and abs(MED-bpmES1[i]) < T+MAD:
        bpmES[i] = bpmES1[i]
      elif abs(bpmES[i]-bpmES1[i]) < 10e-9 and abs(MED-bpmES0[i]) < T+MAD:
        bpmES[i] = bpmES0[i]
      else:
        bpmES[i] = MED
  return bpmES


### pyVHR/extraction/sig_extraction_methods

In [None]:
from numba import njit, prange, float32
import math
import time
import numpy as np
import PIL.Image
import torchvision.transforms as transforms
import os
import matplotlib.pyplot as plt

"""
This module defines classes or methods used for signal extraction.
"""

class SignalProcessingParams():
    """
        This class contains usefull parameters used by this module.

        RGB_LOW_TH (numpy.int32): RGB low-threshold value.

        RGB_HIGH_TH (numpy.int32): RGB high-threshold value.
    """
    RGB_LOW_TH = np.int32(55)
    RGB_HIGH_TH = np.int32(200)


@njit(['float32[:,:](uint8[:,:,:], int32, int32)', ], parallel=True, fastmath=True, nogil=True)
def holistic_mean(im, RGB_LOW_TH, RGB_HIGH_TH):
    """
    This method computes the RGB-Mean Signal excluding 'im' pixels
    that are outside the RGB range [RGB_LOW_TH, RGB_HIGH_TH] (extremes are included).

    Args:
        im (uint8 ndarray): ndarray with shape [rows, columns, rgb_channels].
        RGB_LOW_TH (numpy.int32): RGB low threshold value.
        RGB_HIGH_TH (numpy.int32): RGB high threshold value.

    Returns:
        RGB-Mean Signal as float32 ndarray with shape [1,3], where 1 is the single estimator,
        and 3 are r-mean, g-mean and b-mean.
    """
    mean = np.zeros((1, 3), dtype=np.float32)
    mean_r = np.float32(0.0)
    mean_g = np.float32(0.0)
    mean_b = np.float32(0.0)
    num_elems = np.float32(0.0)
    for x in prange(im.shape[0]):
        for y in prange(im.shape[1]):
            if not((im[x, y, 0] <= RGB_LOW_TH and im[x, y, 1] <= RGB_LOW_TH and im[x, y, 2] <= RGB_LOW_TH)
                    or (im[x, y, 0] >= RGB_HIGH_TH and im[x, y, 1] >= RGB_HIGH_TH and im[x, y, 2] >= RGB_HIGH_TH)):
                mean_r += im[x, y, 0]
                mean_g += im[x, y, 1]
                mean_b += im[x, y, 2]
                num_elems += 1.0
    if num_elems > 1.0:
        mean[0, 0] = mean_r / num_elems
        mean[0, 1] = mean_g / num_elems
        mean[0, 2] = mean_b / num_elems
    else:
        mean[0, 0] = mean_r
        mean[0, 1] = mean_g
        mean[0, 2] = mean_b
    return mean


@njit(['float32[:,:](float32[:,:],uint8[:,:,:],float32, int32, int32)', ], parallel=True, fastmath=True, nogil=True)
def landmarks_mean(ldmks, im, square, RGB_LOW_TH, RGB_HIGH_TH):
    """
    This method computes the RGB-Mean Signal excluding 'im' pixels
    that are outside the RGB range [RGB_LOW_TH, RGB_HIGH_TH] (extremes are included).

    Args:
        ldmks (float32 ndarray): landmakrs as ndarray with shape [num_landmarks, 5],
             where the second dimension contains y-coord, x-coord, r-mean (value is not important), g-mean (value is not important), b-mean (value is not important).
        im (uint8 ndarray): ndarray with shape [rows, columns, rgb_channels].
        square (numpy.float32): side size of square patches.
        RGB_LOW_TH (numpy.int32): RGB low threshold value.
        RGB_HIGH_TH (numpy.int32): RGB high threshold value.

    Returns:
        RGB-Mean Signal as float32 ndarray with shape [num_landmarks, 5], where the second dimension contains y-coord, x-coord, r-mean, g-mean, b-mean.
    """
    r_ldmks = ldmks.astype(np.float32)
    width = im.shape[1]
    height = im.shape[0]
    S = math.floor(square/2)
    lds_mean = np.zeros((ldmks.shape[0], 3), dtype=np.float32)
    num_elems = np.zeros((ldmks.shape[0], ), dtype=np.float32)
    for ld_id in prange(0, r_ldmks.shape[0]):
        if r_ldmks[ld_id, 0] >= 0.0:
            for x in prange(int(r_ldmks[ld_id, 0] - S), int(r_ldmks[ld_id, 0] + S + 1)):
                for y in prange(int(r_ldmks[ld_id, 1] - S), int(r_ldmks[ld_id, 1] + S + 1)):
                    if x >= 0 and x < height and y >= 0 and y < width:
                        if not((im[x, y, 0] <= RGB_LOW_TH and im[x, y, 1] <= RGB_LOW_TH and im[x, y, 2] <= RGB_LOW_TH) or
                               (im[x, y, 0] >= RGB_HIGH_TH and im[x, y, 1] >= RGB_HIGH_TH and im[x, y, 2] >= RGB_HIGH_TH)):
                            lds_mean[ld_id, 0] += np.float32(im[x, y, 0])
                            lds_mean[ld_id, 1] += np.float32(im[x, y, 1])
                            lds_mean[ld_id, 2] += np.float32(im[x, y, 2])
                            num_elems[ld_id] += 1.0
            if num_elems[ld_id] > 1.0:
                r_ldmks[ld_id, 2] = lds_mean[ld_id, 0] / num_elems[ld_id]
                r_ldmks[ld_id, 3] = lds_mean[ld_id, 1] / num_elems[ld_id]
                r_ldmks[ld_id, 4] = lds_mean[ld_id, 2] / num_elems[ld_id]
    return r_ldmks


@njit(['float32[:,:](float32[:,:],uint8[:,:,:],float32, int32, int32)', ], parallel=True, fastmath=True, nogil=True)
def landmarks_median(ldmks, im, square, RGB_LOW_TH, RGB_HIGH_TH):
    """
    This method computes the RGB-Median Signal excluding 'im' pixels
    that are outside the RGB range [RGB_LOW_TH, RGB_HIGH_TH] (extremes are included).

    Args:
        ldmks (float32 ndarray): landmakrs as ndarray with shape [num_landmarks, 5],
             where the second dimension contains y-coord, x-coord, r-mean (value is not important), g-mean (value is not important), b-mean (value is not important).
        im (uint8 ndarray): ndarray with shape [rows, columns, rgb_channels].
        square (numpy.float32): side size of square patches.
        RGB_LOW_TH (numpy.int32): RGB low threshold value.
        RGB_HIGH_TH (numpy.int32): RGB high threshold value.

    Returns:
        RGB-Median Signal as float32 ndarray with shape [num_landmarks, 5], where the second dimension contains y-coord, x-coord, r-mean, g-mean, b-mean.
    """
    r_ldmks = ldmks.astype(np.float32)
    width = float32(im.shape[1])
    height = float32(im.shape[0])
    S = math.floor(square/2.0)
    for ld_id in prange(r_ldmks.shape[0]):
        if r_ldmks[ld_id, 0] >= 0.0:
            x_s = r_ldmks[ld_id, 0] - S
            if x_s < 0.0:
                x_s = 0
            x_e = r_ldmks[ld_id, 0] + S
            if x_e >= height:
                x_e = height
            y_s = r_ldmks[ld_id, 1] - S
            if y_s < 0.0:
                y_s = 0
            y_e = r_ldmks[ld_id, 1] + S
            if y_e >= width:
                y_e = width
            ar = np.copy(im[int(x_s):int(x_e), int(y_s):int(y_e), :])
            f_ar = ar.flatten()
            r = ar[:, :, 0].flatten()
            g = ar[:, :, 1].flatten()
            b = ar[:, :, 2].flatten()
            goodidx = np.ones((ar.shape[0]*ar.shape[1],), dtype=np.int32)
            targets = np.arange(3, f_ar.shape[0], 3)
            for idx in prange(targets.shape[0]):
                i = targets[idx]
                if ((f_ar[i-2] <= RGB_LOW_TH and f_ar[i-1] <= RGB_LOW_TH and f_ar[i] <= RGB_LOW_TH) or
                        (f_ar[i-2] >= RGB_HIGH_TH and f_ar[i-1] >= RGB_HIGH_TH and f_ar[i] >= RGB_HIGH_TH)):
                    goodidx[i % 3] = 0
            goodidx = np.argwhere(goodidx).flatten()
            if goodidx.size < 1 or r.size < 1:
                r_ldmks[ld_id, 2] = np.float32(0.0)
                r_ldmks[ld_id, 3] = np.float32(0.0)
                r_ldmks[ld_id, 4] = np.float32(0.0)
            else:
                r_ldmks[ld_id, 2] = np.float32(np.median(r[goodidx]))
                r_ldmks[ld_id, 3] = np.float32(np.median(g[goodidx]))
                r_ldmks[ld_id, 4] = np.float32(np.median(b[goodidx]))
    return r_ldmks


@njit(['float32[:,:](float32[:,:],uint8[:,:,:],float32[:,:], int32, int32)', ], parallel=True, fastmath=True, nogil=True)
def landmarks_mean_custom_rect(ldmks, im, rects, RGB_LOW_TH, RGB_HIGH_TH):
    """
    This method computes the RGB-Mean Signal excluding 'im' pixels
    that are outside the RGB range [RGB_LOW_TH, RGB_HIGH_TH] (extremes are included).

    Args:
        ldmks (float32 ndarray): landmakrs as ndarray with shape [num_landmarks, 5],
             where the second dimension contains y-coord, x-coord, r-mean (value is not important), g-mean (value is not important), b-mean (value is not important).
        im (uint8 ndarray): ndarray with shape [rows, columns, rgb_channels].
        rects (float32 ndarray): positive float32 np.ndarray of shape [num_landmarks, 2]. If the list of used landmarks is [1,2,3]
            and rects_dim is [[10,20],[12,13],[40,40]] then the landmark number 2 will have a rectangular patch of xy-dimension 12x13.
        RGB_LOW_TH (numpy.int32): RGB low threshold value.
        RGB_HIGH_TH (numpy.int32): RGB high threshold value.

    Returns:
        RGB-Mean Signal as float32 ndarray with shape [num_landmarks, 5], where the second dimension contains y-coord, x-coord, r-mean, g-mean, b-mean.
    """
    r_ldmks = ldmks.astype(np.float32)
    width = im.shape[1]
    height = im.shape[0]
    lds_mean = np.zeros((ldmks.shape[0], 3), dtype=np.float32)
    num_elems = np.zeros((ldmks.shape[0], ), dtype=np.float32)
    for ld_id in prange(0, r_ldmks.shape[0]):
        if r_ldmks[ld_id, 0] >= 0:
            Sx = math.floor(rects[ld_id, 1]/2)
            Sy = math.floor(rects[ld_id, 0]/2)
            for x in prange(r_ldmks[ld_id, 0] - Sx, r_ldmks[ld_id, 0] + Sx + 1):
                for y in prange(r_ldmks[ld_id, 1] - Sy, r_ldmks[ld_id, 1] + Sy + 1):
                    if x >= 0 and x < height and y >= 0 and y < width:
                        if not((im[x, y, 0] <= RGB_LOW_TH and im[x, y, 1] <= RGB_LOW_TH and im[x, y, 2] <= RGB_LOW_TH) or
                               (im[x, y, 0] >= RGB_HIGH_TH and im[x, y, 1] >= RGB_HIGH_TH and im[x, y, 2] >= RGB_HIGH_TH)):
                            lds_mean[ld_id, 0] += im[x, y, 0]
                            lds_mean[ld_id, 1] += im[x, y, 1]
                            lds_mean[ld_id, 2] += im[x, y, 2]
                            num_elems[ld_id] += 1.0
            if num_elems[ld_id] > 1.0:
                r_ldmks[ld_id, 2] = lds_mean[ld_id, 0] / num_elems[ld_id]
                r_ldmks[ld_id, 3] = lds_mean[ld_id, 1] / num_elems[ld_id]
                r_ldmks[ld_id, 4] = lds_mean[ld_id, 2] / num_elems[ld_id]
    return r_ldmks


@njit(['float32[:,:](float32[:,:],uint8[:,:,:],float32[:,:], int32, int32)', ], parallel=True, fastmath=True, nogil=True)
def landmarks_median_custom_rect(ldmks, im, rects, RGB_LOW_TH, RGB_HIGH_TH):
    """
    This method computes the RGB-Median Signal excluding 'im' pixels
    that are outside the RGB range [RGB_LOW_TH, RGB_HIGH_TH] (extremes are included).

    Args:
        ldmks (float32 ndarray): landmakrs as ndarray with shape [num_landmarks, 5],
             where the second dimension contains y-coord, x-coord, r-mean (value is not important), g-mean (value is not important), b-mean (value is not important).
        im (uint8 ndarray): ndarray with shape [rows, columns, rgb_channels].
        rects (float32 ndarray): positive float32 np.ndarray of shape [num_landmarks, 2]. If the list of used landmarks is [1,2,3]
            and rects_dim is [[10,20],[12,13],[40,40]] then the landmark number 2 will have a rectangular patch of xy-dimension 12x13.
        RGB_LOW_TH (numpy.int32): RGB low threshold value.
        RGB_HIGH_TH (numpy.int32): RGB high threshold value.

    Returns:
        RGB-Median Signal as float32 ndarray with shape [num_landmarks, 5], where the second dimension contains y-coord, x-coord, r-mean, g-mean, b-mean.
    """
    r_ldmks = ldmks.astype(np.float32)
    width = float32(im.shape[1])
    height = float32(im.shape[0])
    for ld_id in prange(ldmks.shape[0]):
        if r_ldmks[ld_id, 0] >= 0.0:
            Sx = math.floor(rects[ld_id, 1]/2)
            Sy = math.floor(rects[ld_id, 0]/2)
            x_s = r_ldmks[ld_id, 0] - Sx
            if x_s < 0.0:
                x_s = 0
            x_e = r_ldmks[ld_id, 0] + Sx
            if x_e >= height:
                x_e = height
            y_s = r_ldmks[ld_id, 1] - Sy
            if y_s < 0.0:
                y_s = 0
            y_e = r_ldmks[ld_id, 1] + Sy
            if y_e >= width:
                y_e = width
            ar = np.copy(im[int(x_s):int(x_e), int(y_s):int(y_e), :])
            f_ar = ar.flatten()
            r = ar[:, :, 0].flatten()
            g = ar[:, :, 1].flatten()
            b = ar[:, :, 2].flatten()
            goodidx = np.ones((ar.shape[0]*ar.shape[1],), dtype=np.int32)
            targets = np.arange(3, f_ar.shape[0], 3)
            for idx in prange(targets.shape[0]):
                i = targets[idx]
                if ((f_ar[i-2] <= RGB_LOW_TH and f_ar[i-1] <= RGB_LOW_TH and f_ar[i] <= RGB_LOW_TH) or
                        (f_ar[i-2] >= RGB_HIGH_TH and f_ar[i-1] >= RGB_HIGH_TH and f_ar[i] >= RGB_HIGH_TH)):
                    goodidx[i % 3] = 0
            goodidx = np.argwhere(goodidx).flatten()
            if goodidx.size < 1 or r.size < 1:
                r_ldmks[ld_id, 2] = np.int32(0)
                r_ldmks[ld_id, 3] = np.int32(0)
                r_ldmks[ld_id, 4] = np.int32(0)
            else:
                r_ldmks[ld_id, 2] = np.int32(np.median(r[goodidx]))
                r_ldmks[ld_id, 3] = np.int32(np.median(g[goodidx]))
                r_ldmks[ld_id, 4] = np.int32(np.median(b[goodidx]))
    return r_ldmks

### pyVHR/extraction/skin_extraction_methods

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class BiSeNet(nn.Module):
    def __init__(self, n_classes, *args, **kwargs):
        super(BiSeNet, self).__init__()
        self.cp = ContextPath()
        ## here self.sp is deleted
        self.ffm = FeatureFusionModule(256, 256)
        self.conv_out = BiSeNetOutput(256, 256, n_classes)
        self.conv_out16 = BiSeNetOutput(128, 64, n_classes)
        self.conv_out32 = BiSeNetOutput(128, 64, n_classes)
        self.init_weight()

    def forward(self, x):
        H, W = x.size()[2:]
        feat_res8, feat_cp8, feat_cp16 = self.cp(x)  # here return res3b1 feature
        feat_sp = feat_res8  # use res3b1 feature to replace spatial path feature
        feat_fuse = self.ffm(feat_sp, feat_cp8)

        feat_out = self.conv_out(feat_fuse)
        feat_out16 = self.conv_out16(feat_cp8)
        feat_out32 = self.conv_out32(feat_cp16)

        feat_out = F.interpolate(feat_out, (H, W), mode='bilinear', align_corners=True)
        feat_out16 = F.interpolate(feat_out16, (H, W), mode='bilinear', align_corners=True)
        feat_out32 = F.interpolate(feat_out32, (H, W), mode='bilinear', align_corners=True)
        return feat_out, feat_out16, feat_out32

    def init_weight(self):
        for ly in self.children():
            if isinstance(ly, nn.Conv2d):
                nn.init.kaiming_normal_(ly.weight, a=1)
                if not ly.bias is None: nn.init.constant_(ly.bias, 0)

    def get_params(self):
        wd_params, nowd_params, lr_mul_wd_params, lr_mul_nowd_params = [], [], [], []
        for name, child in self.named_children():
            child_wd_params, child_nowd_params = child.get_params()
            if isinstance(child, FeatureFusionModule) or isinstance(child, BiSeNetOutput):
                lr_mul_wd_params += child_wd_params
                lr_mul_nowd_params += child_nowd_params
            else:
                wd_params += child_wd_params
                nowd_params += child_nowd_params
        return wd_params, nowd_params, lr_mul_wd_params, lr_mul_nowd_params

In [None]:
from numba import cuda, njit, prange
import cupy
import math
import numpy as np
import torchvision.transforms as transforms
import torch
from numba import prange, njit, cuda
# from pyVHR.resources.faceparsing.model import BiSeNet
import os
# import pyVHR
from scipy.spatial import ConvexHull
from PIL import Image, ImageDraw
import requests

"""
This module defines classes or methods used for skin extraction.
"""

### functions and parameters ###

class SkinProcessingParams():
    """
        This class contains usefull parameters used by this module.

        RGB_LOW_TH (numpy.int32): RGB low-threshold value.

        RGB_HIGH_TH (numpy.int32): RGB high-threshold value.
    """
    RGB_LOW_TH = np.int32(55)
    RGB_HIGH_TH = np.int32(200)


def bbox2_CPU(img):
    """
    Args:
        img (ndarray): ndarray with shape [rows, columns, rgb_channels].

    Returns:
        Four cropping coordinates (row, row, column, column) for removing black borders (RGB [O,O,O]) from img.
    """
    rows = np.any(img, axis=1)
    cols = np.any(img, axis=0)
    nzrows = np.nonzero(rows)
    nzcols = np.nonzero(cols)
    if nzrows[0].size == 0 or nzcols[0].size == 0:
        return -1, -1, -1, -1
    rmin, rmax = np.nonzero(rows)[0][[0, -1]]
    cmin, cmax = np.nonzero(cols)[0][[0, -1]]
    return rmin, rmax, cmin, cmax


def bbox2_GPU(img):
    """
    Args:
        img (cupy.ndarray): cupy.ndarray with shape [rows, columns, rgb_channels].

    Returns:
        Four cropping coordinates (row, row, column, column) for removing black borders (RGB [O,O,O]) from img.
        The returned variables are on GPU.
    """
    rows = cupy.any(img, axis=1)
    cols = cupy.any(img, axis=0)
    nzrows = cupy.nonzero(rows)
    nzcols = cupy.nonzero(cols)
    if nzrows[0].size == 0 or nzcols[0].size == 0:
        return -1, -1, -1, -1
    rmin, rmax = cupy.nonzero(rows)[0][[0, -1]]
    cmin, cmax = cupy.nonzero(cols)[0][[0, -1]]
    return rmin, rmax, cmin, cmax


### SKIN EXTRACTION CLASSES ###

class SkinExtractionFaceParsing():
    """
        This class performs skin extraction on CPU/GPU using Face Parsing.
        https://github.com/zllrunning/face-parsing.PyTorch
    """
    def __init__(self, device='CPU'):
        """
        Args:
            device (str): This class can execute code on 'CPU' or 'GPU'.
        """
        self.device = device
        n_classes = 19
        self.net = BiSeNet(n_classes=n_classes)
        if self.device == 'GPU':
            self.net.cuda()
            self.kernel_cuda_skin_copy_and_filter = kernel_cuda_skin_copy_and_filter()
        # save_pth = os.path.dirname(pyVHR.resources.faceparsing.model.__file__) + "/79999_iter.pth"
        save_pth = '/content/drive/MyDrive/rPPG/pyVHR/resources/faceparsing/79999_iter.pth'
        if not os.path.isfile(save_pth):
            url = "https://github.com/phuselab/pyVHR/raw/master/resources/faceparsing/79999_iter.pth"
            print('Downloading faceparsing model...')
            r = requests.get(url, allow_redirects=True)
            open(save_pth, 'wb').write(r.content)
        self.net.load_state_dict(torch.load(save_pth))
        self.net.eval()
        self.to_tensor = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),
        ])

    def extract_skin(self, image, ldmks):
        """
        This method extract the skin from an image using Face Parsing.
        Landmarks (ldmks) are used to create a facial bounding box for cropping the face; this way
        the network used in Face Parsing is more accurate.

        Args:
            image (uint8 ndarray): ndarray with shape [rows, columns, rgb_channels].
            ldmks (float32 ndarray): ndarray with shape [num_landmarks, xy_coordinates].

        Returns:
            Cropped skin-image and non-cropped skin-image; both are uint8 ndarray with shape [rows, columns, rgb_channels].
        """
        # crop with bounding box of ldmks; the network works better if the bounding box is bigger
        aviable_ldmks = ldmks[ldmks[:,0] >= 0][:,:2]
        min_y, min_x = np.min(aviable_ldmks, axis=0)
        max_y, max_x = np.max(aviable_ldmks, axis=0)
        min_y *= 0.90
        min_x *= 0.90
        max_y = max_y * 1.10 if max_y * 1.10 < image.shape[0] else image.shape[0]
        max_x = max_x * 1.10 if max_x * 1.10 < image.shape[1] else image.shape[1]
        cropped_image = np.copy(image[int(min_y):int(max_y),int(min_x):int(max_x) ,:])
        nda_im = np.array(cropped_image)
        # prepare the image for the bisenet network
        cropped_image = self.to_tensor(cropped_image)
        cropped_image = torch.unsqueeze(cropped_image, 0)
        cropped_skin_img = self.extraction(cropped_image, nda_im)
        # recreate full image using cropped_skin_img
        full_skin_image = np.zeros_like(image)
        full_skin_image[int(min_y):int(max_y),int(min_x):int(max_x) ,:] = cropped_skin_img
        return cropped_skin_img, full_skin_image

    def extraction(self, im, nda_im):
        """
        This method performs skin extraction using Face Parsing.

        Args:
            im (torch.Tensor): torch.Tensor with size [rows, columns, rgb_channels]
            nda_im (uint8 ndarray): ndarray with shape [rows, columns, rgb_channels].

        Returns:
            skin-image as uint8 ndarray with shape [rows, columns, rgb_channels].
        """
        with torch.no_grad():
            if self.device == 'CPU':
                ### bisenet skin detection ###
                out = self.net(im)[0]
                ### gpu cuda skin copy ###
                parsing = out.squeeze(0).argmax(0).numpy()
                parsing = parsing.astype(np.int32)
                nda_im = nda_im.astype(np.uint8)
                # kernel skin copy
                return kernel_skin_copy_and_filter(nda_im, parsing, np.int32(SkinProcessingParams.RGB_LOW_TH), np.int32(SkinProcessingParams.RGB_HIGH_TH))
            else:
                ### bisenet skin detection ###
                im = im.cuda()
                out = self.net(im)[0]
                ### gpu cuda skin copy ###
                dev_parsing = out.squeeze(0).argmax(0).type(torch.int32)
                dev_nda_im = cuda.to_device(nda_im)
                dev_new_im = cuda.to_device(np.zeros_like(nda_im))
                low_high_f = cuda.to_device(
                    np.array([SkinProcessingParams.RGB_LOW_TH, SkinProcessingParams.RGB_HIGH_TH], dtype=np.int32))
                # define number of blocks and threads per block
                threadsperblock = (16, 16)
                blockspergrid_x = math.ceil(nda_im.shape[0] / threadsperblock[0])
                blockspergrid_y = math.ceil(nda_im.shape[1] / threadsperblock[1])
                blockspergrid = (blockspergrid_x, blockspergrid_y)
                # kernel invoke
                self.kernel_cuda_skin_copy_and_filter[blockspergrid, threadsperblock](
                    dev_nda_im, dev_parsing, dev_new_im, low_high_f)
                # copy to CPU result
                newimg = dev_new_im.copy_to_host()
                # free memory
                im = None
                out = None
                dev_nda_im = None
                dev_parsing = None
                dev_new_im = None
                return newimg

# SkinExtractionFaceParsing class kernels #
def kernel_cuda_skin_copy_and_filter():
    """
    Return a Numba cuda.jit kernel defined as:

    @cuda.jit('void(uint8[:,:,:], int32[:,:], uint8[:,:,:], int32[:])')
    def __kernel_cuda_skin_copy_and_filter(orig, pars, new, low_high_filter):
        ''
        This method removes pixels from the image 'orig' that are not skin, or
        that are outside the RGB range [low_high_filter[0], low_high_filter[1]] (extremes are included).
        ''

    This method is important for users who do not use a GPU, beacause they can't compile @cuda.jit.
    """

    @cuda.jit('void(uint8[:,:,:], int32[:,:], uint8[:,:,:], int32[:])')
    def __kernel_cuda_skin_copy_and_filter(orig, pars, new, low_high_filter):
        """
        This method removes pixels from the image 'orig' that are not skin, or
        that are outside the RGB range [low_high_filter[0], low_high_filter[1]] (extremes are included).
        """
        x, y = cuda.grid(2)
        if x < orig.shape[0] and y < orig.shape[1]:
            # skin class = 1, nose = 10
            if pars[x, y] == 1 or pars[x, y] == 10:
                if not ((orig[x, y, 0] <= low_high_filter[0] and orig[x, y, 1] <= low_high_filter[0] and orig[x, y, 2] <= low_high_filter[0]) or
                        (orig[x, y, 0] >= low_high_filter[1] and orig[x, y, 1] >= low_high_filter[1] and orig[x, y, 2] >= low_high_filter[1])):
                    new[x, y, 0] = orig[x, y, 0]
                    new[x, y, 1] = orig[x, y, 1]
                    new[x, y, 2] = orig[x, y, 2]

    return __kernel_cuda_skin_copy_and_filter


@njit('uint8[:,:,:](uint8[:,:,:], int32[:,:], int32, int32)', parallel=True, nogil=True)
def kernel_skin_copy_and_filter(orig, pars, RGB_LOW_TH, RGB_HIGH_TH):
    """
    This method removes pixels from the image 'orig' that are not skin, or
    that are outside the RGB range [RGB_LOW_TH, RGB_HIGH_TH] (extremes are included).
    """
    new = np.zeros_like(orig)
    for x in prange(orig.shape[0]):
        for y in prange(orig.shape[1]):
            # skin class = 1, nose = 10
            if pars[x, y] == 1 or pars[x, y] == 10:
                if not ((orig[x, y, 0] <= RGB_LOW_TH and orig[x, y, 1] <= RGB_LOW_TH and orig[x, y, 2] <= RGB_LOW_TH) or
                        (orig[x, y, 0] >= RGB_HIGH_TH and orig[x, y, 1] >= RGB_HIGH_TH and orig[x, y, 2] >= RGB_HIGH_TH)):
                    new[x, y, 0] = orig[x, y, 0]
                    new[x, y, 1] = orig[x, y, 1]
                    new[x, y, 2] = orig[x, y, 2]
    return new

class SkinExtractionConvexHull:
    """
        This class performs skin extraction on CPU/GPU using a Convex Hull segmentation obtained from facial landmarks.
    """
    def __init__(self,device='CPU'):
        """
        Args:
            device (str): This class can execute code on 'CPU' or 'GPU'.
        """
        self.device = device

    def extract_skin(self,image, ldmks):
        """
        This method extract the skin from an image using Convex Hull segmentation.

        Args:
            image (uint8 ndarray): ndarray with shape [rows, columns, rgb_channels].
            ldmks (float32 ndarray): landmarks used to create the Convex Hull; ldmks is a ndarray with shape [num_landmarks, xy_coordinates].

        Returns:
            Cropped skin-image and non-cropped skin-image; both are uint8 ndarray with shape [rows, columns, rgb_channels].
        """
        from pyVHR.extraction.sig_processing import MagicLandmarks
        aviable_ldmks = ldmks[ldmks[:,0] >= 0][:,:2]
        # face_mask convex hull
        hull = ConvexHull(aviable_ldmks)
        verts = [(aviable_ldmks[v,0], aviable_ldmks[v,1]) for v in hull.vertices]
        img = Image.new('L', image.shape[:2], 0)
        ImageDraw.Draw(img).polygon(verts, outline=1, fill=1)
        mask = np.array(img)
        mask = np.expand_dims(mask,axis=0).T

        # left eye convex hull
        left_eye_ldmks = ldmks[MagicLandmarks.left_eye]
        aviable_ldmks = left_eye_ldmks[left_eye_ldmks[:,0] >= 0][:,:2]
        if len(aviable_ldmks) > 3:
            hull = ConvexHull(aviable_ldmks)
            verts = [(aviable_ldmks[v,0], aviable_ldmks[v,1]) for v in hull.vertices]
            img = Image.new('L', image.shape[:2], 0)
            ImageDraw.Draw(img).polygon(verts, outline=1, fill=1)
            left_eye_mask = np.array(img)
            left_eye_mask = np.expand_dims(left_eye_mask,axis=0).T
        else:
            left_eye_mask = np.ones((image.shape[0], image.shape[1],1),dtype=np.uint8)

        # right eye convex hull
        right_eye_ldmks = ldmks[MagicLandmarks.right_eye]
        aviable_ldmks = right_eye_ldmks[right_eye_ldmks[:,0] >= 0][:,:2]
        if len(aviable_ldmks) > 3:
            hull = ConvexHull(aviable_ldmks)
            verts = [(aviable_ldmks[v,0], aviable_ldmks[v,1]) for v in hull.vertices]
            img = Image.new('L', image.shape[:2], 0)
            ImageDraw.Draw(img).polygon(verts, outline=1, fill=1)
            right_eye_mask = np.array(img)
            right_eye_mask = np.expand_dims(right_eye_mask,axis=0).T
        else:
            right_eye_mask = np.ones((image.shape[0], image.shape[1],1),dtype=np.uint8)

        # mounth convex hull
        mounth_ldmks = ldmks[MagicLandmarks.mounth]
        aviable_ldmks = mounth_ldmks[mounth_ldmks[:,0] >= 0][:,:2]
        if len(aviable_ldmks) > 3:
            hull = ConvexHull(aviable_ldmks)
            verts = [(aviable_ldmks[v,0], aviable_ldmks[v,1]) for v in hull.vertices]
            img = Image.new('L', image.shape[:2], 0)
            ImageDraw.Draw(img).polygon(verts, outline=1, fill=1)
            mounth_mask = np.array(img)
            mounth_mask = np.expand_dims(mounth_mask,axis=0).T
        else:
            mounth_mask = np.ones((image.shape[0], image.shape[1],1),dtype=np.uint8)

        # apply masks and crop
        if self.device == 'GPU':
            image = cupy.asarray(image)
            mask = cupy.asarray(mask)
            left_eye_mask = cupy.asarray(left_eye_mask)
            right_eye_mask = cupy.asarray(right_eye_mask)
            mounth_mask = cupy.asarray(mounth_mask)
        skin_image = image * mask * (1-left_eye_mask) * (1-right_eye_mask) * (1-mounth_mask)

        if self.device == 'GPU':
            rmin, rmax, cmin, cmax = bbox2_GPU(skin_image)
        else:
            rmin, rmax, cmin, cmax = bbox2_CPU(skin_image)

        cropped_skin_im = skin_image
        if rmin >= 0 and rmax >= 0 and cmin >= 0 and cmax >= 0 and rmax-rmin >= 0 and cmax-cmin >= 0:
            cropped_skin_im = skin_image[int(rmin):int(rmax), int(cmin):int(cmax)]

        if self.device == 'GPU':
            cropped_skin_im = cupy.asnumpy(cropped_skin_im)
            skin_image = cupy.asnumpy(skin_image)

        return cropped_skin_im, skin_image

### pyVHR/extraction/sig_processing

In [None]:
import cv2
import mediapipe as mp
import numpy as np
# from pyVHR.extraction.utils import *
# from pyVHR.extraction.skin_extraction_methods import *
# from pyVHR.extraction.sig_extraction_methods import *
# from pyVHR.utils.cuda_utils import *

"""
This module defines classes or methods used for Signal extraction and processing.
"""

class SignalProcessing():
    """
        This class performs offline signal extraction with different methods:

        - holistic.

        - squared / rectangular patches.
    """

    def __init__(self):
        # Common parameters #
        self.tot_frames = None
        self.visualize_skin_collection = []
        self.skin_extractor = SkinExtractionConvexHull('CPU')
        # Patches parameters #
        high_prio_ldmk_id, mid_prio_ldmk_id = get_magic_landmarks()
        self.ldmks = high_prio_ldmk_id + mid_prio_ldmk_id
        self.square = None
        self.rects = None
        self.visualize_skin = False
        self.visualize_landmarks = False
        self.visualize_landmarks_number = False
        self.visualize_patch = False
        self.font_size = 0.3
        self.font_color = (255, 0, 0, 255)
        self.visualize_skin_collection = []
        self.visualize_landmarks_collection = []

    def choose_cuda_device(self, n):
        """
        Choose a CUDA device.

        Args:
            n (int): number of a CUDA device.

        """
        select_cuda_device(n)

    def display_cuda_device(self):
        """
        Display your CUDA devices.
        """
        cuda_info()

    def set_total_frames(self, n):
        """
        Set the total frames to be processed; if you want to process all the possible frames use n = 0.

        Args:
            n (int): number of frames to be processed.

        """
        if n < 0:
            print("[ERROR] n must be a positive number!")
        self.tot_frames = int(n)

    def set_skin_extractor(self, extractor):
        """
        Set the skin extractor that will be used for skin extraction.

        Args:
            extractor: instance of a skin_extraction class (see :py:mod:`pyVHR.extraction.skin_extraction_methods`).

        """
        self.skin_extractor = extractor

    def set_visualize_skin_and_landmarks(self, visualize_skin=False, visualize_landmarks=False, visualize_landmarks_number=False, visualize_patch=False):
        """
        Set visualization parameters. You can retrieve visualization output with the
        methods :py:meth:`pyVHR.extraction.sig_processing.SignalProcessing.get_visualize_skin`
        and :py:meth:`pyVHR.extraction.sig_processing.SignalProcessing.get_visualize_patches`

        Args:
            visualize_skin (bool): The skin and the patches will be visualized.
            visualize_landmarks (bool): The landmarks (centers of patches) will be visualized.
            visualize_landmarks_number (bool): The landmarks number will be visualized.
            visualize_patch (bool): The patches outline will be visualized.

        """
        self.visualize_skin = visualize_skin
        self.visualize_landmarks = visualize_landmarks
        self.visualize_landmarks_number = visualize_landmarks_number
        self.visualize_patch = visualize_patch

    def get_visualize_skin(self):
        """
        Get the skin images produced by the last processing. Remember to
        set :py:meth:`pyVHR.extraction.sig_processing.SignalProcessing.set_visualize_skin_and_landmarks`
        correctly.

        Returns:
            list of ndarray: list of cv2 images; each image is a ndarray with shape [rows, columns, rgb_channels].
        """
        return self.visualize_skin_collection

    def get_visualize_patches(self):
        """
        Get the 'skin+patches' images produced by the last processing. Remember to
        set :py:meth:`pyVHR.extraction.sig_processing.SignalProcessing.set_visualize_skin_and_landmarks`
        correctly.

        Returns:
            list of ndarray: list of cv2 images; each image is a ndarray with shape [rows, columns, rgb_channels].
        """
        return self.visualize_landmarks_collection

    def extract_raw(self, videoFileName):
        """
        Extracts raw frames from video.

        Args:
            videoFileName (str): video file name or path.

        Returns:
            ndarray: raw frames with shape [num_frames, height, width, rgb_channels].
        """

        frames = []
        for frame in extract_frames_yield(videoFileName):
                frames.append(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))   # convert to RGB

        return np.array(frames)

    ### HOLISTIC METHODS ###

    def extract_raw_holistic(self, videoFileName):
        """
        Locates the skin pixels in each frame. This method is intended for rPPG methods that use raw video signal.

        Args:
            videoFileName (str): video file name or path.

        Returns:
            float32 ndarray: raw signal as float32 ndarray with shape [num_frames, rows, columns, rgb_channels].
        """

        skin_ex = self.skin_extractor

        mp_drawing = mp.solutions.drawing_utils
        mp_face_mesh = mp.solutions.face_mesh
        PRESENCE_THRESHOLD = 0.5
        VISIBILITY_THRESHOLD = 0.5

        sig = []
        processed_frames_count = 0

        with mp_face_mesh.FaceMesh(
                max_num_faces=1,
                min_detection_confidence=0.5,
                min_tracking_confidence=0.5) as face_mesh:
            for frame in extract_frames_yield(videoFileName):
                # convert the BGR image to RGB.
                image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                processed_frames_count += 1
                width = image.shape[1]
                height = image.shape[0]
                # [landmarks, info], with info->x_center ,y_center, r, g, b
                ldmks = np.zeros((468, 5), dtype=np.float32)
                ldmks[:, 0] = -1.0
                ldmks[:, 1] = -1.0
                ### face landmarks ###
                results = face_mesh.process(image)
                if results.multi_face_landmarks:
                    face_landmarks = results.multi_face_landmarks[0]
                    landmarks = [l for l in face_landmarks.landmark]
                    for idx in range(len(landmarks)):
                        landmark = landmarks[idx]
                        if not ((landmark.HasField('visibility') and landmark.visibility < VISIBILITY_THRESHOLD)
                                or (landmark.HasField('presence') and landmark.presence < PRESENCE_THRESHOLD)):
                            coords = mp_drawing._normalized_to_pixel_coordinates(
                                landmark.x, landmark.y, width, height)
                            if coords:
                                ldmks[idx, 0] = coords[1]
                                ldmks[idx, 1] = coords[0]
                    ### skin extraction ###
                    cropped_skin_im, full_skin_im = skin_ex.extract_skin(
                        image, ldmks)
                else:
                    cropped_skin_im = np.zeros_like(image)
                    full_skin_im = np.zeros_like(image)
                if self.visualize_skin == True:
                    self.visualize_skin_collection.append(full_skin_im)
                ### sig computing ###
                sig.append(full_skin_im)
                ### loop break ###
                if self.tot_frames is not None and self.tot_frames > 0 and processed_frames_count >= self.tot_frames:
                    break
        sig = np.array(sig, dtype=np.float32)
        return sig

    def extract_holistic(self, videoFileName):
        """
        This method compute the RGB-mean signal using the whole skin (holistic);

        Args:
            videoFileName (str): video file name or path.

        Returns:
            float32 ndarray: RGB signal as ndarray with shape [num_frames, 1, rgb_channels]. The second dimension is 1 because
            the whole skin is considered as one estimators.
        """
        self.visualize_skin_collection = []

        skin_ex = self.skin_extractor

        mp_drawing = mp.solutions.drawing_utils
        mp_face_mesh = mp.solutions.face_mesh
        PRESENCE_THRESHOLD = 0.5
        VISIBILITY_THRESHOLD = 0.5

        sig = []
        processed_frames_count = 0

        with mp_face_mesh.FaceMesh(
                max_num_faces=1,
                min_detection_confidence=0.5,
                min_tracking_confidence=0.5) as face_mesh:
            for frame in extract_frames_yield(videoFileName):
                # convert the BGR image to RGB.
                image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                processed_frames_count += 1
                width = image.shape[1]
                height = image.shape[0]
                # [landmarks, info], with info->x_center ,y_center, r, g, b
                ldmks = np.zeros((468, 5), dtype=np.float32)
                ldmks[:, 0] = -1.0
                ldmks[:, 1] = -1.0
                ### face landmarks ###
                results = face_mesh.process(image)
                if results.multi_face_landmarks:
                    face_landmarks = results.multi_face_landmarks[0]
                    landmarks = [l for l in face_landmarks.landmark]
                    for idx in range(len(landmarks)):
                        landmark = landmarks[idx]
                        if not ((landmark.HasField('visibility') and landmark.visibility < VISIBILITY_THRESHOLD)
                                or (landmark.HasField('presence') and landmark.presence < PRESENCE_THRESHOLD)):
                            coords = mp_drawing._normalized_to_pixel_coordinates(
                                landmark.x, landmark.y, width, height)
                            if coords:
                                ldmks[idx, 0] = coords[1]
                                ldmks[idx, 1] = coords[0]
                    ### skin extraction ###
                    cropped_skin_im, full_skin_im = skin_ex.extract_skin(
                        image, ldmks)
                else:
                    cropped_skin_im = np.zeros_like(image)
                    full_skin_im = np.zeros_like(image)
                if self.visualize_skin == True:
                    self.visualize_skin_collection.append(full_skin_im)
                ### sig computing ###
                sig.append(holistic_mean(
                    cropped_skin_im, np.int32(SignalProcessingParams.RGB_LOW_TH), np.int32(SignalProcessingParams.RGB_HIGH_TH)))
                ### loop break ###
                if self.tot_frames is not None and self.tot_frames > 0 and processed_frames_count >= self.tot_frames:
                    break
        sig = np.array(sig, dtype=np.float32)
        return sig

    ### PATCHES METHODS ###

    def set_landmarks(self, landmarks_list):
        """
        Set the patches centers (landmarks) that will be used for signal processing. There are 468 facial points you can
        choose; for visualizing their identification number please use :py:meth:`pyVHR.plot.visualize.visualize_landmarks_list`.

        Args:
            landmarks_list (list): list of positive integers between 0 and 467 that identify patches centers (landmarks).
        """
        if not isinstance(landmarks_list, list):
            print("[ERROR] landmarks_set must be a list!")
            return
        self.ldmks = landmarks_list

    def set_square_patches_side(self, square_side):
        """
        Set the dimension of the square patches that will be used for signal processing. There are 468 facial points you can
        choose; for visualizing their identification number please use :py:meth:`pyVHR.plot.visualize.visualize_landmarks_list`.

        Args:
            square_side (float): positive float that defines the length of the square patches.
        """
        if not isinstance(square_side, float) or square_side <= 0.0:
            print("[ERROR] square_side must be a positive float!")
            return
        self.square = square_side

    def set_rect_patches_sides(self, rects_dim):
        """
        Set the dimension of each rectangular patch. There are 468 facial points you can
        choose; for visualizing their identification number please use :py:meth:`pyVHR.plot.visualize.visualize_landmarks_list`.

        Args:
            rects_dim (float32 ndarray): positive float32 np.ndarray of shape [num_landmarks, 2]. If the list of used landmarks is [1,2,3]
                and rects_dim is [[10,20],[12,13],[40,40]] then the landmark number 2 will have a rectangular patch of xy-dimension 12x13.
        """
        if type(rects_dim) != type(np.array([])):
            print("[ERROR] rects_dim must be an np.ndarray!")
            return
        if rects_dim.shape[0] != len(self.ldmks) and rects_dim.shape[1] != 2:
            print("[ERROR] incorrect rects_dim shape!")
            return
        self.rects = rects_dim

    def extract_patches(self, videoFileName, region_type, sig_extraction_method):
        """
        This method compute the RGB-mean signal using specific skin regions (patches).

        Args:
            videoFileName (str): video file name or path.
            region_type (str): patches types can be  "squares" or "rects".
            sig_extraction_method (str): RGB signal can be computed with "mean" or "median". We recommend to use mean.

        Returns:
            float32 ndarray: RGB signal as ndarray with shape [num_frames, num_patches, rgb_channels].
        """
        if self.square is None and self.rects is None:
            print(
                "[ERROR] Use set_landmarks_squares or set_landmarkds_rects before calling this function!")
            return None
        if region_type != "squares" and region_type != "rects":
            print("[ERROR] Invalid landmarks region type!")
            return None
        if sig_extraction_method != "mean" and sig_extraction_method != "median":
            print("[ERROR] Invalid signal extraction method!")
            return None

        ldmks_regions = None
        if region_type == "squares":
            ldmks_regions = np.float32(self.square)
        elif region_type == "rects":
            ldmks_regions = np.float32(self.rects)

        sig_ext_met = None
        if sig_extraction_method == "mean":
            if region_type == "squares":
                sig_ext_met = landmarks_mean
            elif region_type == "rects":
                sig_ext_met = landmarks_mean_custom_rect
        elif sig_extraction_method == "median":
            if region_type == "squares":
                sig_ext_met = landmarks_median
            elif region_type == "rects":
                sig_ext_met = landmarks_median_custom_rect

        self.visualize_skin_collection = []
        self.visualize_landmarks_collection = []

        skin_ex = self.skin_extractor

        mp_drawing = mp.solutions.drawing_utils
        mp_face_mesh = mp.solutions.face_mesh
        PRESENCE_THRESHOLD = 0.5
        VISIBILITY_THRESHOLD = 0.5

        sig = []
        processed_frames_count = 0
        self.patch_landmarks = []
        self.cropped_skin_im_shapes = [[], []]
        with mp_face_mesh.FaceMesh(
                max_num_faces=1,
                min_detection_confidence=0.5,
                min_tracking_confidence=0.5) as face_mesh:
            for frame in extract_frames_yield(videoFileName):
                # convert the BGR image to RGB.
                image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                processed_frames_count += 1
                width = image.shape[1]
                height = image.shape[0]
                # [landmarks, info], with info->x_center ,y_center, r, g, b
                ldmks = np.zeros((468, 5), dtype=np.float32)
                ldmks[:, 0] = -1.0
                ldmks[:, 1] = -1.0
                magic_ldmks = []
                ### face landmarks ###
                results = face_mesh.process(image)
                if results.multi_face_landmarks:
                    face_landmarks = results.multi_face_landmarks[0]
                    landmarks = [l for l in face_landmarks.landmark]
                    for idx in range(len(landmarks)):
                        landmark = landmarks[idx]
                        if not ((landmark.HasField('visibility') and landmark.visibility < VISIBILITY_THRESHOLD)
                                or (landmark.HasField('presence') and landmark.presence < PRESENCE_THRESHOLD)):
                            coords = mp_drawing._normalized_to_pixel_coordinates(
                                landmark.x, landmark.y, width, height)
                            if coords:
                                ldmks[idx, 0] = coords[1]
                                ldmks[idx, 1] = coords[0]

                    ### skin extraction ###
                    cropped_skin_im, full_skin_im = skin_ex.extract_skin(image, ldmks)

                    self.cropped_skin_im_shapes[0].append(cropped_skin_im.shape[0])
                    self.cropped_skin_im_shapes[1].append(cropped_skin_im.shape[1])

                else:
                    cropped_skin_im = np.zeros_like(image)
                    full_skin_im = np.zeros_like(image)
                    self.cropped_skin_im_shapes[0].append(cropped_skin_im.shape[0])
                    self.cropped_skin_im_shapes[1].append(cropped_skin_im.shape[1])

                ### sig computing ###
                for idx in self.ldmks:
                    magic_ldmks.append(ldmks[idx])
                magic_ldmks = np.array(magic_ldmks, dtype=np.float32)
                temp = sig_ext_met(magic_ldmks, full_skin_im, ldmks_regions,
                                   np.int32(SignalProcessingParams.RGB_LOW_TH),
                                   np.int32(SignalProcessingParams.RGB_HIGH_TH))
                sig.append(temp)

                # save landmarks coordinates
                self.patch_landmarks.append(magic_ldmks[:,0:3])

                # visualize patches and skin
                if self.visualize_skin == True:
                    self.visualize_skin_collection.append(full_skin_im)
                if self.visualize_landmarks == True:
                    annotated_image = full_skin_im.copy()
                    color = np.array([self.font_color[0],
                                      self.font_color[1], self.font_color[2]], dtype=np.uint8)
                    for idx in self.ldmks:
                        cv2.circle(
                            annotated_image, (int(ldmks[idx, 1]), int(ldmks[idx, 0])), radius=0, color=self.font_color, thickness=-1)
                        if self.visualize_landmarks_number == True:
                            cv2.putText(annotated_image, str(idx),
                                        (int(ldmks[idx, 1]), int(ldmks[idx, 0])), cv2.FONT_HERSHEY_SIMPLEX, self.font_size,  self.font_color,  1)
                    if self.visualize_patch == True:
                        if region_type == "squares":
                            sides = np.array([self.square] * len(magic_ldmks))
                            annotated_image = draw_rects(
                                annotated_image, np.array(magic_ldmks[:, 1]), np.array(magic_ldmks[:, 0]), sides, sides, color)
                        elif region_type == "rects":
                            annotated_image = draw_rects(
                                annotated_image, np.array(magic_ldmks[:, 1]), np.array(magic_ldmks[:, 0]), np.array(self.rects[:, 0]), np.array(self.rects[:, 1]), color)
                    self.visualize_landmarks_collection.append(
                        annotated_image)
                ### loop break ###
                if self.tot_frames is not None and self.tot_frames > 0 and processed_frames_count >= self.tot_frames:
                    break
        sig = np.array(sig, dtype=np.float32)
        return np.copy(sig[:, :, 2:])

    def get_landmarks(self):
        """
        Returns landmarks ndarray with shape [num_frames, num_estimators, 2-coords] or empty array
        """
        if hasattr(self, 'patch_landmarks'):
            return np.array(self.patch_landmarks)
        else:
            return np.empty(0)

    def get_cropped_skin_im_shapes(self):
        """
        Returns cropped skin shapes with shape [height, width, rgb] or empty array
        """
        if hasattr(self, "cropped_skin_im_shapes"):
            return np.array(self.cropped_skin_im_shapes)
        else:
            return np.empty((0, 0, 0))



### pyVHR/utils/cuda_utils

In [None]:
from numba import cuda
import torch
import os

def cuda_info():
    if torch.cuda.is_available():
        print("# CUDA devices: ", torch.cuda.device_count())
        for e in range(torch.cuda.device_count()):
            print("# device number ", e, ": ", torch.cuda.get_device_name(e))


def select_cuda_device(n):
    torch.cuda.device(n)
    cuda.select_device(n)

### 딥러닝 파이프라인 클래스 선언

In [None]:
import configparser
import ast
from numpy.lib.arraysetops import isin
import pandas as pd
import numpy as np
from importlib import import_module, util
# from pyVHR.datasets.dataset import datasetFactory
# from pyVHR.utils.errors import getErrors, printErrors, displayErrors, BVP_windowing
# from pyVHR.extraction.sig_processing import *
# from pyVHR.extraction.sig_extraction_methods import *
# from pyVHR.extraction.skin_extraction_methods import *
# from pyVHR.BVP.BVP import *
# from pyVHR.BPM.BPM import *
# from pyVHR.BVP.methods import *
# from pyVHR.BVP.filters import *
from inspect import getmembers, isfunction
import os.path
# from pyVHR.deepRPPG.mtts_can import *
# from pyVHR.deepRPPG.hr_cnn import *
# from pyVHR.extraction.utils import *

In [None]:
class Pipeline():
    """
    This class runs the pyVHR pipeline on a single video or dataset
    """

    minHz = 0.65 # min heart frequency in Hz
    maxHz = 4.0  # max heart frequency in Hz

    def __init__(self):
        pass


    def run_on_video_multimethods(self, videoFileName,
                    winsize,
                    ldmks_list=None,
                    cuda=True,
                    roi_method='convexhull',
                    roi_approach='holistic',
                    methods=['cpu_CHROM, cpu_POS, cpu_LGI'],
                    estimate='holistic',
                    movement_thrs=[10, 5, 2],
                    patch_size=30,
                    RGB_LOW_HIGH_TH = (75,230),
                    Skin_LOW_HIGH_TH = (75, 230),
                    pre_filt=False,
                    post_filt=True,
                    verb=True):
        """
        Runs the pipeline on a specific video file.

        Args:
            videoFileName:
                - The video filenane to analyse
            winsize:
                - The size of the window in frame
            ldmks_list:
                - (default None) a list of MediaPipe's landmarks to use, the range is: [0:467]
            cuda:
                - True - Enable computations on GPU
            roi_method:
                - 'convexhull', uses MediaPipe's lanmarks to compute the convex hull on the face skin
                - 'faceparsing', uses BiseNet to parse face components and segment the skin
            roi_approach:
                - 'holistic', uses the holistic approach, i.e. the whole face skin
                - 'patches', uses multiple patches as Regions of Interest
            methods:
                - A collection of rPPG methods defined in pyVHR
            estimate:
                - if patches: 'medians', 'clustering', the method for BPM estimate on each window
            movement_thrs:
                - Thresholds for movements filtering (eg.:[10, 5, 2])
            patch_size:
                - the size of the square patch, in pixels
            RGB_LOW_HIGH_TH:
                - default (75,230), thresholds for RGB channels
            Skin_LOW_HIGH_TH:
                - default (75,230), thresholds for skin pixel values
            pre_filt:
                - True, uses bandpass filter on the windowed RGB signal
            post_filt:
                - True, uses bandpass filter on the estimated BVP signal
            verb:
                - True, shows the main steps
        """

        # set landmark list
        if not ldmks_list:
            ldmks_list = [2, 3, 4, 5, 6, 8, 9, 10, 18, 21, 32, 35, 36, 43, 46, 47, 48, 50, 54, 58, 67, 68, 69, 71, 92, 93, 101, 103, 104, 108, 109, 116, 117, 118, 123, 132, 134, 135, 138, 139, 142, 148, 149, 150, 151, 152, 182, 187, 188, 193, 197, 201, 205, 206, 207, 210, 211, 212, 216, 234, 248, 251, 262, 265, 266, 273, 277, 278, 280, 284, 288, 297, 299, 322, 323, 330, 332, 333, 337, 338, 345, 346, 361, 363, 364, 367, 368, 371, 377, 379, 411, 412, 417, 421, 425, 426, 427, 430, 432, 436]

        # test video filename
        assert os.path.isfile(videoFileName), "The video file does not exists!"

        sig_processing = SignalProcessing()
        av_meths = getmembers(pyVHR.BVP.methods, isfunction)
        available_methods = [am[0] for am in av_meths]

        for m in methods:
            assert m in available_methods, "\nrPPG method not recognized!!"

        if cuda:
            sig_processing.display_cuda_device()
            sig_processing.choose_cuda_device(0)

        ## 1. set skin extractor
        target_device = 'GPU' if cuda else 'CPU'
        if roi_method == 'convexhull':
            sig_processing.set_skin_extractor(SkinExtractionConvexHull(target_device))
        elif roi_method == 'faceparsing':
            sig_processing.set_skin_extractor(SkinExtractionFaceParsing(target_device))
        else:
            raise ValueError("Unknown 'roi_method'")

        ## 2. set patches
        if roi_approach == 'patches':
            sig_processing.set_landmarks(ldmks_list)
            sig_processing.set_square_patches_side(np.float(patch_size))

        # set sig-processing and skin-processing params
        SignalProcessingParams.RGB_LOW_TH = RGB_LOW_HIGH_TH[0]
        SignalProcessingParams.RGB_HIGH_TH = RGB_LOW_HIGH_TH[1]
        SkinProcessingParams.RGB_LOW_TH = Skin_LOW_HIGH_TH[0]
        SkinProcessingParams.RGB_HIGH_TH = Skin_LOW_HIGH_TH[1]

        if verb:
            print('\nProcessing Video ' + videoFileName)
        fps = get_fps(videoFileName)
        sig_processing.set_total_frames(0)

        ## 3. ROI selection
        if verb:
            print('\nRoi processing...')
        sig = []
        if roi_approach == 'holistic':
            # SIG extraction with holistic
            sig = sig_processing.extract_holistic(videoFileName)
        elif roi_approach == 'patches':
            # SIG extraction with patches
            sig = sig_processing.extract_patches(videoFileName, 'squares', 'mean')
        if verb:
            print(' - Extraction approach: ' + roi_approach)

        ## 4. sig windowing
        windowed_sig, timesES = sig_windowing(sig, winsize, 1, fps)
        if verb:
            print(f' - Number of windows: {len(windowed_sig)}')
            print(' - Win size: (#ROI, #landmarks, #frames) = ', windowed_sig[0].shape)


        ## 5. PRE FILTERING
        if verb:
            print('\nPre filtering...')
        filtered_windowed_sig = windowed_sig

        # -- color threshold - applied only with patches
        #if roi_approach == 'patches':
        #    filtered_windowed_sig = apply_filter(windowed_sig,
        #                                        rgb_filter_th,
        #                                        params={'RGB_LOW_TH': RGB_LOW_HIGH_TH[0],
        #                                                'RGB_HIGH_TH': RGB_LOW_HIGH_TH[1]})

        if pre_filt:
            module = import_module('pyVHR.BVP.filters')
            method_to_call = getattr(module, 'BPfilter')
            filtered_windowed_sig = apply_filter(filtered_windowed_sig,
                                                    method_to_call,
                                                    fps=fps,
                                                    params={'minHz':Pipeline.minHz,
                                                            'maxHz':Pipeline.maxHz,
                                                            'fps':'adaptive',
                                                            'order':6})
        if verb:
            print(f' - Pre-filter applied: {method_to_call.__name__}')

        ## 6. BVP extraction multimethods
        bvps_win = []
        for method in methods:
            if verb:
                print("\nBVP extraction...")
                print(" - Extraction method: " + method)
            module = import_module('pyVHR.BVP.methods')
            method_to_call = getattr(module, method)

            if 'cpu' in method:
                method_device = 'cpu'
            elif 'torch' in method:
                method_device = 'torch'
            elif 'cupy' in method:
                method_device = 'cuda'

            if 'POS' in method:
                pars = {'fps':'adaptive'}
            elif 'PCA' in method or 'ICA' in method:
                pars = {'component': 'all_comp'}
            else:
                pars = {}

            bvps_win_m = RGB_sig_to_BVP(filtered_windowed_sig,
                                    fps, device_type=method_device,
                                    method=method_to_call, params=pars)

            ## 7. POST FILTERING
            if post_filt:
                module = import_module('pyVHR.BVP.filters')
                method_to_call = getattr(module, 'BPfilter')
                bvps_win_m = apply_filter(bvps_win_m,
                                    method_to_call,
                                    fps=fps,
                                    params={'minHz':Pipeline.minHz, 'maxHz':Pipeline.maxHz, 'fps':'adaptive', 'order':6})
            if verb:
                print(f' - Post-filter applied: {method_to_call.__name__}')
            # collect
            if len(bvps_win) == 0:
                bvps_win = bvps_win_m
            else:
                for i in range(len(bvps_win_m)):
                    bvps_win[i] = np.concatenate((bvps_win[i], bvps_win_m[i]))
                    if i == 0: print(bvps_win[i].shape)


        ## 8. BPM extraction
        if verb:
            print("\nBPM estimation...")
            print(f" - roi appproach: {roi_approach}")

        if roi_approach == 'holistic':
            if cuda:
                bpmES = BVP_to_BPM_cuda(bvps_win, fps, minHz=Pipeline.minHz, maxHz=Pipeline.maxHz)
            else:
                bpmES = BVP_to_BPM(bvps_win, fps, minHz=Pipeline.minHz, maxHz=Pipeline.maxHz)

        elif roi_approach == 'patches':
            if estimate == 'clustering':
                #if cuda and False:
                #    bpmES = BVP_to_BPM_PSD_clustering_cuda(bvps_win, fps, minHz=Pipeline.minHz, maxHz=Pipeline.maxHz)
                #else:
                #bpmES = BPM_clustering(sig_processing, bvps_win, winsize, movement_thrs=[15, 15, 15], fps=fps, opt_factor=0.5)
                ma = MotionAnalysis(sig_processing, winsize, fps)
                bpmES = BPM_clustering(ma, bvps_win, fps, winsize, movement_thrs=movement_thrs, opt_factor=0.5)



            elif estimate == 'median':
                if cuda:
                    bpmES = BVP_to_BPM_cuda(bvps_win, fps, minHz=Pipeline.minHz, maxHz=Pipeline.maxHz)
                else:
                    bpmES = BVP_to_BPM(bvps_win, fps, minHz=Pipeline.minHz, maxHz=Pipeline.maxHz)
                bpmES,_ = BPM_median(bpmES)
            if verb:
                print(f" - BPM estimation with: {estimate}")
        else:
            raise ValueError("Estimation approach unknown!")

        if verb:
            print('\n...done!\n')

        return bvps_win, timesES, bpmES

In [None]:
class DeepPipeline(Pipeline):
    """
    This class runs the pyVHR Deep pipeline on a single video or dataset
    """

    def __init__(self):
        pass

    def run_on_video(self, videoFileName, cuda=True, method='MTTS_CAN', bpm_type='welch', post_filt=False, verb=True, crop_face=False):
        """
        Runs the pipeline on a specific video file.

        Args:
            videoFileName:
                - The path to the video file to analyse
            cuda:
                - True - Enable computations on GPU
                - False - Use CPU only
            method:
                - One of the rPPG methods defined in pyVHR
            bpm_type:
                - the method for computing the BPM estimate on a time window
            post_filt:
                - True - Use Band pass filtering on the estimated BVP signal
                - False - No post-filtering
            verb:
               - False - not verbose
               - True - show the main steps
        """

        if verb:
            print('\nProcessing Video: ' + videoFileName)
        fps = get_fps(videoFileName)
        wsize = 6

        sp = SignalProcessing()
        frames = sp.extract_raw(videoFileName)
        print('Frames shape:', frames.shape)

        # -- BVP extraction
        if verb:
            print("\nBVP extraction with method: %s" % (method))
        if method == 'MTTS_CAN':
            bvps_pred = MTTS_CAN_deep(frames, fps, verb=1, filter_pred=True)
            bvps, timesES = BVP_windowing(bvps_pred, wsize, fps, stride=1)
        elif method == 'HR_CNN':
            bvps_pred = HR_CNN_bvp_pred(frames)
            bvps, timesES = BVP_windowing(bvps_pred, wsize, fps, stride=1)
        else:
            print("Deep Method unsupported!")
            return

        if post_filt:
            module = import_module('pyVHR.BVP.filters')
            method_to_call = getattr(module, 'BPfilter')
            bvps = apply_filter(bvps,
                                method_to_call,
                                fps=fps,
                                params={'minHz':0.65, 'maxHz':4.0, 'fps':'adaptive', 'order':6})

        # -- BPM extraction
        if verb:
            print("\nBPM estimation with: %s" % (bpm_type))
        if bpm_type == 'welch':
            if cuda:
                bpmES = BVP_to_BPM_cuda(bvps, fps, minHz=minHz, maxHz=maxHz)
            else:
                bpmES = BVP_to_BPM(bvps, fps, minHz=minHz, maxHz=maxHz)
        else:
            raise ValueError("The only 'bpm_type' supported for deep models is 'welch'")

        # median BPM from multiple estimators BPM
        median_bpmES, mad_bpmES = BPM_median(bpmES)

        if verb:
            print('\n...done!\n')

        return timesES, median_bpmES

    def run_on_dataset(self, configFilename, verb=True):
        """
        Runs the tests as specified in the loaded config file.

        Args:
            configFilename:
                - The path to the configuration file
            verb:
                - False - not verbose
                - True - show the main steps

               (use also combinations)
        """
        self.configFilename = configFilename
        self.parse_cfg(self.configFilename)
        # -- cfg parser
        parser = configparser.ConfigParser(
            inline_comment_prefixes=('#', ';'))
        parser.optionxform = str
        if not parser.read(self.configFilename):
            raise FileNotFoundError(self.configFilename)

        # -- verbose prints
        if verb:
            self.__verbose('a')

        # -- dataset & cfg params
        if 'path' in self.datasetdict and self.datasetdict['path'] != 'None':
            dataset = datasetFactory(
                self.datasetdict['dataset'], videodataDIR=self.datasetdict['videodataDIR'], BVPdataDIR=self.datasetdict['BVPdataDIR'], path=self.datasetdict['path'])
        else:
            dataset = datasetFactory(
                self.datasetdict['dataset'], videodataDIR=self.datasetdict['videodataDIR'], BVPdataDIR=self.datasetdict['BVPdataDIR'])

        # -- catch data (object)
        res = TestResult()

        # load all the videos
        if self.videoIdx == []:
            self.videoIdx = [int(v)
                             for v in range(len(dataset.videoFilenames))]

        # -- loop on videos
        for v in self.videoIdx:
            # multi-method -> list []

            # -- verbose prints
            if verb:
                print("\n## videoID: %d" % (v))

            # -- ground-truth signal
            try:
                fname = dataset.getSigFilename(v)
                sigGT = dataset.readSigfile(fname)
            except:
                continue
            winSizeGT = int(self.bvpdict['winSize'])
            bpmGT, timesGT = sigGT.getBPM(winSizeGT)

            # -- video file name
            videoFileName = dataset.getVideoFilename(v)
            print(videoFileName)
            fps = get_fps(videoFileName)

            sp = SignalProcessing()
            frames = sp.extract_raw(videoFileName)

            # -- loop on methods
            for m in self.methods:
                if verb:
                    print("## method: %s" % (str(m)))

                # -- BVP extraction
                if str(m) == 'MTTS_CAN':
                    bvps_pred = MTTS_CAN_deep(frames, fps, verb=1, filter_pred=True)
                    bvps, timesES = BVP_windowing(bvps_pred, winSizeGT, fps, stride=1)
                elif str(m) == 'HR_CNN':
                    bvps_pred = HR_CNN_bvp_pred(frames)
                    bvps, timesES = BVP_windowing(bvps_pred, wsize, fps, stride=1)
                else:
                    print("Deep Method unsupported!")
                    return

                # POST FILTERING
                postfilter_list = ast.literal_eval(
                    self.bvpdict['post_filtering'])
                if len(postfilter_list) > 0:
                    for f in postfilter_list:
                        if verb:
                            print("  post-filter: %s" % f)
                        fdict = dict(parser[f].items())
                        if fdict['path'] != 'None':
                            # custom path
                            spec = util.spec_from_file_location(
                                fdict['name'], fdict['path'])
                            mod = util.module_from_spec(spec)
                            spec.loader.exec_module(mod)
                            method_to_call = getattr(
                                mod, fdict['name'])
                        else:
                            # package path
                            module = import_module(
                                'pyVHR.BVP.filters')
                            method_to_call = getattr(
                                module, fdict['name'])
                        bvps_win = apply_filter(bvps_win, method_to_call, fps=fps, params=ast.literal_eval(fdict['params']))

                # -- BPM extraction
                if self.bpmdict['type'] == 'welch':
                    bpmES = BVP_to_BPM_cuda(bvps_win, fps, minHz=float(
                        self.bpmdict['minHz']), maxHz=float(self.bpmdict['maxHz']))
                elif self.bpmdict['type'] == 'clustering':
                    bpmES = BVP_to_BPM_PSD_clustering_cuda(bvps_win, fps, minHz=float(
                        self.bpmdict['minHz']), maxHz=float(self.bpmdict['maxHz']))

                # median BPM from multiple estimators BPM
                median_bpmES, mad_bpmES = BPM_median(bpmES)

                # -- error metrics
                RMSE, MAE, MAX, PCC, CCC, SNR = getErrors(bvps_win, fps, median_bpmES, bpmGT, timesES, timesGT)

                # -- save results
                res.newDataSerie()
                res.addData('dataset', str(self.datasetdict['dataset']))
                res.addData('method', str(m))
                res.addData('videoIdx', v)
                res.addData('RMSE', RMSE)
                res.addData('MAE', MAE)
                res.addData('MAX', MAX)
                res.addData('PCC', PCC)
                res.addData('CCC', CCC)
                res.addData('SNR', SNR)
                res.addData('bpmGT', bpmGT)
                res.addData('bpmES', median_bpmES)
                res.addData('bpmES_mad', mad_bpmES)
                res.addData('timeGT', timesGT)
                res.addData('timeES', timesES)
                res.addData('videoFilename', videoFileName)
                res.addDataSerie()

                if verb:
                    printErrors(RMSE, MAE, MAX, PCC, CCC, SNR)
        return res

    def parse_cfg(self, configFilename):
        """ parses the given configuration file for loading the test's parameters.

        Args:
            configFilename: configuation file (.cfg) name of path .

        """

        self.parser = configparser.ConfigParser(
            inline_comment_prefixes=('#', ';'))
        self.parser.optionxform = str
        if not self.parser.read(configFilename):
            raise FileNotFoundError(configFilename)

        # load paramas
        self.datasetdict = dict(self.parser['DATASET'].items())
        self.bvpdict = dict(self.parser['BVP'].items())
        self.bpmdict = dict(self.parser['BPM'].items())

        # video idx list extraction
        if isinstance(ast.literal_eval(self.datasetdict['videoIdx']), list):
            self.videoIdx = [int(v) for v in ast.literal_eval(
                self.datasetdict['videoIdx'])]

        # method list extraction
        if isinstance(ast.literal_eval(self.bvpdict['methods']), list):
            self.methods = ast.literal_eval(
                    self.bvpdict['methods'])

    def __merge(self, dict1, dict2):
        for key in dict2:
            if key not in dict1:
                dict1[key] = dict2[key]

    def __verbose(self, verb):
        if verb == 'a':
            print("** Run the test with the following config:")
            print("      dataset: " + self.datasetdict['dataset'].upper())
            print("      methods: " + str(self.methods))