In [2]:
import cv2 as cv
import numpy as np
import cupy as cp
import cusignal
import cupyx
import time
import csv
import random
import cupyx.scipy.ndimage as cuimg
import matplotlib.pyplot as plt
import matplotlib
import os


import argparse

In [4]:
chroma_similarity = 10**2

video_width = 1920
video_height = 1080

capture_device = 0

buffer_size = 1024
display_mode = 'normal'
fps_display = True

processing_mode = 'average_ppg'

In [5]:
parser = argparse.ArgumentParser()
parser.add_argument("-f", "--file", dest="filename",
                    help="Process a video file rather than a capture device.", metavar="FILE")
# This argument currently doesn't do anything, I wanted to make this user-friendly but never got around to it!
parser.add_argument('-w','--welch', dest='welch_flag', action=argparse.BooleanOptionalAction,
                    help='Compute heart rate using the Welch estimator.')
args = parser.parse_args()

In [6]:
def csv_xyz(filename, data, names):
    csv_begin(filename, names)
    csv_append(filename, data)


In [7]:
def mouseRGB(event, x, y, flags, params):
    global skin_chroma
    if event == cv.EVENT_LBUTTONDOWN:
        skin_chroma = cp.array(cv.cvtColor(np.array([[frame[y,x]]]), cv.COLOR_BGR2YUV), dtype=cp.float32)[0,0,1:3]
        print('RGB = ', frame[y,x], 'chroma = ', skin_chroma)

In [8]:
def chroma_key(frame, chroma):
    key = frame[:,:,1:3] - chroma
    key = cp.less(cp.sum(cp.square(key), axis=2), chroma_similarity)
    return key

In [9]:
def chroma_key_display(frame, chroma):
    """
    Convenience function to display the chroma key
    """
    key = chroma_key(frame, chroma)
    return cp.asnumpy(key*255).astype(np.uint8)


In [10]:
def moving_average(a, n=3, axis=None):
    # If it's not None, we're not gonna flatten the array...
    if axis is not None:
        # ...so temporarily swap the axis to the end
        ret = np.swapaxes(a, 0, axis)
    else:
        ret = a
    # take the cumulative sum of the input vector
    ret = cp.cumsum(ret, axis=axis)
    # subtract the cumsum, offset by n, to get the moving average via kludge
    ret[n:,...] = ret[n:,...] - ret[:-n,...]
    # Concatenate together 0 ..the numbers... 0 0 to pad it to the original length
    ret = cp.concatenate((
        # Following what R does, return fewer 0s at the start if n is even...
        cp.zeros((int(np.floor((n-1)/2)), *ret.shape[1:])),
        # ...then some numbers...
        ret[(n - 1):,...] / n,
        # ...then more 0s at the end if n is even (both equal if odd!)
        cp.zeros((int(np.ceil((n-1)/2)), *ret.shape[1:]))
    ))
    # Swap the axis back if we swapped it at the start
    if axis is not None:
        ret = np.swapaxes(ret, 0, axis)
    return ret

In [11]:
def average_keyed(frame, key):
    """
    Return the average YUV of the pixels which are True in key.
    Args:
        frame: a cupy array containing the frame
        key: a cupy array of booleans
    Returns:
        A cupy array of [Y, U, V]
    """
    output = cp.mean(frame[key], axis=0)
    return output

In [12]:
def csv_begin(filename, data):
    with open(filename, 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(data)

In [13]:
def csv_append(filename, data):
    with open(filename, 'a', newline='') as f:
        writer = csv.writer(f)
        writer.writerows(data)

In [14]:
def magnify_colour_ma(ppg, delta=50, n_bg_ma=60, n_smooth_ma=3):
    # Remove slow-moving background component
    ppg = ppg - moving_average(ppg, n_bg_ma, 0)
    # Smooth the resulting PPG
    ppg = moving_average(ppg, n_smooth_ma, 0)
    # Remove the NaNs or cp.max won't like it
    ppg = cp.nan_to_num(ppg)
    # Make it have a max delta of delta by normalising by the biggest deviation
    return delta*ppg/cp.max(cp.abs(ppg))


In [15]:
def magnify_colour_ma_masked(ppg, mask, delta=50, n_bg_ma=60, n_smooth_ma=3):
    # Remove slow-moving background component
    ppg = ppg - moving_average(ppg, n_bg_ma, 0)
    mask = moving_average(mask, n_bg_ma, 0)
    # Smooth the resulting PPG
    ppg = moving_average(ppg, n_smooth_ma, 0)
    mask = moving_average(mask, n_smooth_ma, 0)
    # Expand the mask to allow it to be used to, er, mask the ppg
    # Remove any pixels in ppg that go to zero at any point in the windows, found because the mask which has been
    # equivalently moving-averaged above drops below 1
    ppg = np.where(mask[:,:,:, cp.newaxis] == 1., ppg, cp.zeros_like(ppg))
    # Remove the NaNs or cp.max won't like it
    ppg = cp.nan_to_num(ppg)
    # Set the Y component to 0 (could've just done calcs on only U and V earlier, but I'm lazy)
    ppg[:,:,:,0] = 0
    # Make it have a max delta of delta by normalising by the biggest deviation
    return delta*ppg/cp.max(cp.abs(ppg))