In [2]:
import imutils
from imutils.video import VideoStream, FileVideoStream
from imutils import face_utils
import cv2
import time
import numpy as np
import dlib
from collections import OrderedDict
from pose_estimator import PoseEstimator
from stabilizer import Stabilizer
import matplotlib.pyplot as plt
from scipy.fft import fft, ifft
from shapely.geometry import Point
from shapely.geometry.polygon import Polygon

In [9]:
class face_streamer:
    def __init__(self, predictor_path, filename = None):
        self.filename = filename
        
        self.detector = dlib.get_frontal_face_detector()
        self.predictor = dlib.shape_predictor(predictor_path)

        self.facial_landmarks_idxs = OrderedDict([
            ("face", (0, 26)),
            ("left_eye", (37, 42)),
            ("right_eye", (43, 48)),
        ])
        
        
        self.height, self.width = 300, 400
        self.pose_estimator = PoseEstimator(img_size=(self.height, self.width))
        self.pose_stabilizers = [Stabilizer(state_num=2, measure_num=1, cov_process=0.1, cov_measure=0.1) 
                                 for _ in range(6)]
        
        self.red = []
        self.green = []
        self.blue = []
        self.roll = []
        self.pitch = []
        self.yaw = []
        
        self.frames_to_collect = 1000
        
        
    def start_stream(self):
        if self.filename:
            self.vs = FileVideoStream(self.filename).start()
        else:
            self.vs = VideoStream(src=0).start()
        print("[INFO] camera sensor warming up...")
        time.sleep(2.0)
        
    def end_stream(self):
        print("[INFO] stream ended, cleaning up...")
        self.vs.stop()
        cv2.destroyAllWindows()
        self.num_frames = len(self.red)
        self.frame_vector = range(self.num_frames)
        
    
    def stream(self):
        
        self.start_stream()

        for i in range(self.frames_to_collect):
            
            frame = self.process_frame()
            cv2.imshow("Frame", frame)
            key = cv2.waitKey(1) & 0xFF
            
            if key == ord("q"):
                break
        
        self.end_stream()
                
    def process_frame(self):
        frame, gray, rects = self.find_face()
        frame = self.process_faces(frame, gray, rects)
        return frame
        
    def find_face(self):
        frame = self.vs.read()
        frame = imutils.resize(frame, width=400)

        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        rects = self.detector(gray, 0)
        return frame, gray, rects
    
    def process_faces(self, frame, gray, rects):
        for rect in rects:
            (bX, bY, bW, bH) = face_utils.rect_to_bb(rect)
            shape = face_utils.shape_to_np(self.predictor(gray, rect))
            face_points,left_eye_points,right_eye_points,custom_points = self.get_face_points(shape)
            
            self.update_rgb_fast(frame[bY:bH+bY, bX:bW+bX,:]) 

            pose = self.pose_estimator.solve_pose_by_68_points(shape.astype('float'))
            steady_pose = self.stablize_pose(pose)
            self.update_rpy(steady_pose)

            if shape.any():
                self.shape = shape
                frame = self.draw_overlay(frame, face_points,left_eye_points,right_eye_points,custom_points)
                self.frame = frame
                
        return frame
    
    def update_rgb_fast(self, mask):
        self.red.append(np.average(mask[:,:,0]) / 255)
        self.green.append(np.average(mask[:,:,1]) / 255)
        self.blue.append(np.average(mask[:,:,2]) / 255)
        
    def update_rpy(self, steady_pose):
        self.roll.append(steady_pose[0][0])
        self.pitch.append(steady_pose[0][1])
        self.yaw.append(steady_pose[0][2])
        
    def draw_overlay(self, frame, face_points,left_eye_points,right_eye_points,custom_points, colors=None, alpha=0.75):

        out_face = np.zeros_like(frame)

        feature_mask = np.zeros((frame.shape[0], frame.shape[1]))  
        l_eye_mask = np.zeros((frame.shape[0], frame.shape[1]))  
        r_eye_mask = np.zeros((frame.shape[0], frame.shape[1]))  
        custom_mask = np.zeros((frame.shape[0], frame.shape[1]))  


        hull = cv2.convexHull(face_points)
        hull_left_eye = cv2.convexHull(left_eye_points)
        hull_right_eye = cv2.convexHull(right_eye_points)
        custom_hull = cv2.convexHull(custom_points)
        
        cv2.fillConvexPoly(feature_mask, hull, 1)
        cv2.fillConvexPoly(l_eye_mask, hull_left_eye, 1)
        cv2.fillConvexPoly(r_eye_mask, hull_right_eye, 1)
        cv2.fillConvexPoly(custom_mask, custom_hull, 1)
        
        feature_mask = feature_mask.astype(np.bool)
        l_eye_mask = l_eye_mask.astype(np.bool)
        r_eye_mask = r_eye_mask.astype(np.bool)
        custom_mask = custom_mask.astype(np.bool)
        

        final_mask = np.logical_xor(feature_mask, l_eye_mask)
        final_mask = np.logical_xor(final_mask, r_eye_mask)
        
        custom_final_mask = np.logical_xor(custom_mask, l_eye_mask)
        custom_final_mask = np.logical_xor(custom_final_mask, r_eye_mask)

        
        out_face[custom_final_mask] = frame[custom_final_mask]    
        indices = final_mask.astype(float)
        
        f_1 = frame.copy()
        frame[custom_final_mask] = (0, 0, 255)
        added_image = cv2.addWeighted(frame,.75,f_1,.25,0)
        
        #out_face has the cutout face
        #frame has the overlay drawn
        return out_face

    
    def stablize_pose(self, pose):
        steady_pose = []
        pose_np = np.array(pose).flatten()
        for value, ps_stb in zip(pose_np, self.pose_stabilizers):
            ps_stb.update([value])
            steady_pose.append(ps_stb.state[0])
        steady_pose = np.reshape(steady_pose, (-1, 3))
        return steady_pose 
    
    def get_face_points(self, shape):
        face_points = shape[self.facial_landmarks_idxs['face'][0]:self.facial_landmarks_idxs['face'][1]]
        left_eye_points = shape[self.facial_landmarks_idxs['left_eye'][0]:self.facial_landmarks_idxs['left_eye'][1]]
        right_eye_points = shape[self.facial_landmarks_idxs['right_eye'][0]:self.facial_landmarks_idxs['right_eye'][1]]
        
    
        custom_roi = [0,1,2,3,13,14,15,16,17,18,19,20,21,22,23,24,25,26,33]
        avg_1 = np.asarray([np.mean([shape[3][0], shape[48][0]],dtype=np.int64), np.mean([shape[3][1],shape[48][1]],dtype=np.int64)])
        avg_2 = np.asarray([np.mean([shape[13][0], shape[54][0]],dtype=np.int64), np.mean([shape[13][1],shape[54][1]])],dtype=np.int64)
        points = [shape[i] for i in custom_roi]
        points.extend([avg_1, avg_2])
        custom_points = np.asarray(points)
                        
        return face_points,left_eye_points,right_eye_points,custom_points

In [10]:
predictor_path = "../models/shape_predictor_68_face_landmarks.dat"
fs = face_streamer(predictor_path)

fs.stream()

successful init
[INFO] camera sensor warming up...
[INFO] stream ended, cleaning up...
