In [None]:
import os
import cv2
import time
import numpy as np
from scipy import signal
import scipy.fftpack as fftpack
from scipy.signal import detrend 
from scipy.interpolate import interp1d
from sklearn.decomposition import FastICA

In [None]:
def build_laplacian_pyramid(img, levels):
    gaussian_pyramid = cv2.pyrDown(img)

    upsampled = cv2.pyrUp(gaussian_pyramid)
    (height, width, depth) = upsampled.shape
    gaussian_pyramid = cv2.resize(gaussian_pyramid, (height, width))

    return cv2.subtract(gaussian_pyramid, upsampled)

In [None]:
def read_video(path):
    cap = cv2.VideoCapture(path)
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    video_frames = []
    check = True

    while cap.isOpened():
        ret, img = cap.read()
        if not ret:
            break
        
        channel_avg = list(img.mean(axis=(0, 1)))

        # Detect face
        if check:
            face_detector = faceCascade.detectMultiScale(cv2.cvtColor(img, cv2.COLOR_RGB2GRAY), 1.3, 5)
            if len(face_detector) > 0:
                print(face_detector)
                (x, y, w, h) = face_detector[0]
                check = False
        
        if not check:
            img = cv2.resize(img[y:y + h, x:x + w], (500, 500)) * (1. / 255)
            video_frames.append(build_laplacian_pyramid(img, 3))
    
    cap.release()
    
    return np.array(video_frames), fps

In [None]:
def moving_average(a, n=3) :
    '''simple moving average, n=kernel size'''
    ret = np.cumsum(a, dtype=float)
    ret[n:] = ret[n:] - ret[:-n]
    return ret[n - 1:] / n

def preprocess(arr):
    '''arg = 1D array - smooth, remove trend, normalize array'''
    ma = moving_average(arr, 4) #smooth raw color signals 
    detrended = detrend(ma)     #scipy detrend 
    normalized = (detrended-np.mean(detrended))/np.std(detrended)
    return normalized 

def ICA(arr):
    ica = FastICA(n_components=3, max_iter=1000)
    ica_transformed = ica.fit_transform(arr)
    return ica_transformed

def full_process(mixed_signal): 
    '''full signal processing pipeline'''
    mixed_signal = np.array(mixed_signal) #prep (len, color)
    normalized = np.apply_along_axis(preprocess, 0, mixed_signal) #detrend and normalize 
    ICA_transformed = ICA(normalized) #ICA 
    return ICA_transformed

def get_pulse_signal(frames):
    pulse_signals = []
    for frame in frames:
        pulse_signals.append(list(frame.mean(axis=(0, 1))))
        # lap_video.append(sum(channel_avg) / len(channel_avg))
    
    return pulse_signals

def bandpass(x, y, low_pass=3, high_pass=0):
    '''bandpass filter for fourier'''
    y = y[np.where((x<low_pass) & (x>high_pass))]
    x = x[np.where((x<low_pass) & (x>high_pass))]
    return x, y

def fourier_transform(x, y, channel=1):
    '''perform fourier transform'''
    y = y[:, channel] #pick ICA color channel to process on 
    
    N = len(y)                            # Number of data points
    T = 1./fps                            # delta between frames (s)
    yf = fftpack.fft(y)                           # perform scipy fourier 
    xf = np.linspace(0.0, 1/(T*2), N//2)  # replot complex data over freq domain

    y_transform = 2.0/N * np.abs(yf[0:N//2])
    freq_spec, power_spec = bandpass(xf, y_transform, 4., 0.8)
    bpm = freq_spec*60    #convert Hz to bpm 
    
    heart_rate = bpm[np.argmax(power_spec)] #highest peak == HR
    
    return heart_rate, bpm, power_spec

In [None]:
faceCascade = cv2.CascadeClassifier("haarcascade_frontalface_alt0.xml")

freq_min = 1
freq_max = 1.8

base = '../data'
# file_name : fileNumber_tureHR.avi

In [None]:
for file in os.listdir(base):
    video, fps = read_video(os.path.join(base, file))

    singals = get_pulse_signal(video)
    y_in = full_process(singals)
    heart_rate, bpm, power_spec = fourier_transform(fps, y_in)

    file_name = file.split('.')[0]
    print(f"{file_name.split('_')[0]}: {file_name.split('_')[1]} {heart_rate},")

# mixed ica and eulerian

In [None]:
def fft_filter_ica(y, fps, freq_min, freq_max):
    fft = fftpack.fft(y)
    frequencies = fftpack.fftfreq(y.shape[0], d=1.0 / fps)
    
    bound_low = (np.abs(frequencies - freq_min)).argmin()
    bound_high = (np.abs(frequencies - freq_max)).argmin()
    fft[:bound_low] = 0
    fft[bound_high:-bound_high] = 0
    fft[-bound_low:] = 0
    
    iff = fftpack.ifft(fft, axis=0)

    return fft, frequencies

def find_heart_rate(fft, freqs, freq_min, freq_max):
    fft_maximums = []

    for i in range(fft.shape[0]):
        if freq_min <= freqs[i] <= freq_max:
            fftMap = abs(fft[i])
            fft_maximums.append(fftMap.max())
        else:
            fft_maximums.append(0)

    peaks, properties = signal.find_peaks(fft_maximums)
    max_peak = -1
    max_freq = 0

    for peak in peaks:
        if fft_maximums[peak] > max_freq:
            max_freq = fft_maximums[peak]
            max_peak = peak

    return freqs[max_peak] * 60

In [None]:
for file in os.listdir(base):
    video, fps = read_video(os.path.join(base, file))

    singals = get_pulse_signal(video)
    y_in = full_process(singals)
    
    fft, frequencies = fft_filter_ica(y_in, fps, freq_min, freq_max)
    heart_rate = find_heart_rate(fft, frequencies, freq_min, freq_max)

    file_name = file.split('.')[0]
    print(f"{file_name.split('_')[0]}: {file_name.split('_')[1]} {heart_rate},")