In [1]:
from multiprocessing import Process, Queue

import numpy as np

import cv2
from mark_detector import MarkDetector
from os_detector import detect_os
from pose_estimator import PoseEstimator
from stabilizer import Stabilizer

In [2]:
# multiprocessing may not work on Windows and macOS, check OS for safety.
detect_os()

Linux is fine! Python multiprocessing works.


In [3]:
CNN_INPUT_SIZE = 128

In [None]:
def get_face(detector, img_queue, box_queue):
    """Get face from image queue. This function is used for multiprocessing"""
    while True:
        image = img_queue.get()
        box = detector.extract_cnn_facebox(image)
        box_queue.put(box)

In [None]:
# Video source from webcam or video file.
video_src = 'fbl3.mp4'
cam = cv2.VideoCapture(video_src)
_, sample_frame = cam.read()

In [None]:
# Introduce mark_detector to detect landmarks.
mark_detector = MarkDetector()

In [None]:
# Setup process and queues for multiprocessing.
img_queue = Queue()
box_queue = Queue()
img_queue.put(sample_frame)
box_process = Process(target=get_face, args=(
    mark_detector, img_queue, box_queue,))
box_process.start()

In [None]:
# Introduce pose estimator to solve pose. Get one frame to setup the
# estimator according to the image size.
height, width = sample_frame.shape[:2]
pose_estimator = PoseEstimator(img_size=(height, width))

In [None]:
# Introduce scalar stabilizers for pose.
pose_stabilizers = [Stabilizer(
    state_num=2,
    measure_num=1,
    cov_process=0.1,
    cov_measure=0.1) for _ in range(6)]

In [None]:
while True:
    # Read frame, crop it, flip it, suits your needs.
    frame_got, frame = cam.read()
    if frame_got is False:
        break

    # Crop it if frame is larger than expected.
    # frame = frame[0:480, 300:940]

    # If frame comes from webcam, flip it so it looks like a mirror.
    if video_src == 0:
        frame = cv2.flip(frame, 2)

    # Pose estimation by 3 steps:
    # 1. detect face;
    # 2. detect landmarks;
    # 3. estimate pose

    # Feed frame to image queue.
    img_queue.put(frame)

    # Get face from box queue.
    facebox = box_queue.get()

    if facebox is not None:
        # Detect landmarks from image of 128x128.
        face_img = frame[facebox[1]: facebox[3],
                         facebox[0]: facebox[2]]
        face_img = cv2.resize(face_img, (CNN_INPUT_SIZE, CNN_INPUT_SIZE))
        face_img = cv2.cvtColor(face_img, cv2.COLOR_BGR2RGB)
        marks = mark_detector.detect_marks(face_img)

        # Convert the marks locations from local CNN to global image.
        marks *= (facebox[2] - facebox[0])
        marks[:, 0] += facebox[0]
        marks[:, 1] += facebox[1]

        # Uncomment following line to show raw marks.
        # mark_detector.draw_marks(
        #     frame, marks, color=(0, 255, 0))

        # Try pose estimation with 68 points.
        pose = pose_estimator.solve_pose_by_68_points(marks)

        # Stabilize the pose.
        stabile_pose = []
        pose_np = np.array(pose).flatten()
        for value, ps_stb in zip(pose_np, pose_stabilizers):
            ps_stb.update([value])
            stabile_pose.append(ps_stb.state[0])
        stabile_pose = np.reshape(stabile_pose, (-1, 3))

        # Uncomment following line to draw pose annotaion on frame.
        # pose_estimator.draw_annotation_box(
        #     frame, pose[0], pose[1], color=(255, 128, 128))

        # Uncomment following line to draw stabile pose annotaion on frame.
        pose_estimator.draw_annotation_box(
            frame, stabile_pose[0], stabile_pose[1], color=(128, 255, 128))

    # Show preview.
    cv2.imshow("Preview", frame)
    if cv2.waitKey(10) == 27:
        break

In [None]:
# Clean up the multiprocessing process.
box_process.terminate()
box_process.join()