In [1]:
import cv2
import numpy as np
import scipy.fftpack as fftpack
from scipy import signal

In [2]:
freq_min = 1 # range for Fast-Fourier Transform
freq_max = 1.8
faceCascade = cv2.CascadeClassifier("haarcascades/haarcascade_frontalface_alt0.xml")

# Support function

In [3]:
def build_video_pyramid(frames):
    lap_video = []
    for i, frame in enumerate(frames):
        pyramid = generate_laplacian_pyramid(frame, 3)
        for j in range(3):
            if i == 0:
                lap_video.append(np.zeros((len(frames), pyramid[j].shape[0], pyramid[j].shape[1], 3)))
            lap_video[j][i] = pyramid[j]

    return lap_video



def generate_laplacian_pyramid(img, levels):
    gaussian_pyramid = generate_gaussian_pyramid(img, levels)
    laplacian_pyramid = []

    for i in range(levels-1):
        upsampled = cv2.pyrUp(gaussian_pyramid[i+1])
        (height, width, depth) = upsampled.shape
        gaussian_pyramid[i] = cv2.resize(gaussian_pyramid[i], (height, width))
        diff = cv2.subtract(gaussian_pyramid[i],upsampled)
        laplacian_pyramid.append(diff)

    laplacian_pyramid.append(gaussian_pyramid[-1])

    return laplacian_pyramid


def generate_gaussian_pyramid(img, levels):
    float_img = np.ndarray(shape=img.shape, dtype="float")
    float_img[:] = img
    pyramid = [float_img]

    for i in range(levels-1):
        float_img = cv2.pyrDown(float_img)
        pyramid.append(float_img)

    return pyramid


def collapse_laplacian_pyramid(video, frame_ct):
    collapsed_video = []

    for i in range(frame_ct):
        prev_frame = video[-1][i]

        for level in range(len(video) - 1, 0, -1):
            pyr_up_frame = cv2.pyrUp(prev_frame)
            (height, width, depth) = pyr_up_frame.shape
            prev_level_frame = video[level - 1][i]
            prev_level_frame = cv2.resize(prev_level_frame, (height, width))
            prev_frame = pyr_up_frame + prev_level_frame

        # Normalize pixel values
        min_val = min(0.0, prev_frame.min())
        prev_frame = prev_frame + min_val
        max_val = max(1.0, prev_frame.max())
        prev_frame = prev_frame / max_val
        prev_frame = prev_frame * 255

        prev_frame = cv2.convertScaleAbs(prev_frame)
        collapsed_video.append(prev_frame)

    return collapsed_video

def fft_filter(video, freq_min, freq_max, fps):
    fft = fftpack.fft(video, axis=0)
    frequencies = fftpack.fftfreq(video.shape[0], d=1.0 / fps)
    bound_low = (np.abs(frequencies - freq_min)).argmin()
    bound_high = (np.abs(frequencies - freq_max)).argmin()
    fft[:bound_low] = 0
    fft[bound_high:-bound_high] = 0
    fft[-bound_low:] = 0
    iff = fftpack.ifft(fft, axis=0)
    result = np.abs(iff)
    result *= 100  # Amplification factor

    return result, fft, frequencies

def read_video(path):
    cap = cv2.VideoCapture(path)
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    video_frames = []
    face_rects = ()

    while cap.isOpened():
        ret, img = cap.read()
        if not ret:
            break
        gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
        roi_frame = img

        # Detect face
        if len(video_frames) == 0:
            face_rects = faceCascade.detectMultiScale(gray, 1.3, 5)

        # Select ROI
        if len(face_rects) > 0:
            for (x, y, w, h) in face_rects:
                roi_frame = img[y:y + h, x:x + w]
            if roi_frame.size != img.size:
                roi_frame = cv2.resize(roi_frame, (500, 500))
                frame = np.ndarray(shape=roi_frame.shape, dtype="float")
                frame[:] = roi_frame * (1. / 255)
                video_frames.append(frame)

    frame_ct = len(video_frames)
    cap.release()

    return video_frames, frame_ct, fps

def find_heart_rate(fft, freqs, freq_min, freq_max):
    fft_maximums = []

    for i in range(fft.shape[0]):
        if freq_min <= freqs[i] <= freq_max:
            fftMap = abs(fft[i])
            fft_maximums.append(fftMap.max())
        else:
            fft_maximums.append(0)

    peaks, properties = signal.find_peaks(fft_maximums)
    max_peak = -1
    max_freq = 0

    # Find frequency with max amplitude in peaks
    for peak in peaks:
        if fft_maximums[peak] > max_freq:
            max_freq = fft_maximums[peak]
            max_peak = peak

    return freqs[max_peak] * 60


# Main

In [4]:
from tqdm import tqdm
video_frames, frame_ct, fps = read_video("./data/face.mp4")

print("budiling video pyramid")
lap_video = build_video_pyramid(video_frames)

amplified_video_pyramid = []

for i, video in enumerate(lap_video):
    if i == 0 or i == len(lap_video)-1:
        continue
    print("calculating fft")
    result, fft, frequencies = fft_filter(video, freq_min, freq_max, fps)
    lap_video[i] += result

    # Calculate heart rate
    print("Calculating heart rate...")
    heart_rate = find_heart_rate(fft, frequencies, freq_min, freq_max)
print("Heart rate: ", heart_rate, "bpm")


budiling video pyramid
calculating fft
Calculating heart rate...
Heart rate:  65.78073089700997 bpm
