# Packages 

In [1]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import GridSearchCV,RandomizedSearchCV
import cv2
import dlib
import math
from math import cos, sin
import joblib
import mediapipe as mp

# Load Data

In [2]:
df_pose=pd.read_csv('data/head_pose.csv')
df_pose.head()

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,929,930,931,932,933,934,935,yaw,pitch,roll
0,218.0,309.0,220.0,287.0,220.0,291.0,215.0,253.0,220.0,279.0,...,221.0,243.0,225.0,299.0,212.0,304.0,208.0,1.044306,-22.874239,4.908886
1,198.0,288.0,187.0,266.0,201.0,274.0,196.0,245.0,187.0,259.0,...,230.0,225.0,231.0,263.0,225.0,267.0,222.0,68.15524,26.932743,17.24367
2,143.0,359.0,144.0,349.0,143.0,351.0,139.0,335.0,144.0,345.0,...,324.0,147.0,325.0,168.0,320.0,169.0,319.0,50.485413,-10.579652,-13.570645
3,226.0,312.0,211.0,294.0,220.0,296.0,199.0,265.0,207.0,286.0,...,229.0,220.0,233.0,268.0,205.0,271.0,200.0,17.143373,-10.048455,-21.392782
4,229.0,301.0,221.0,272.0,224.0,281.0,210.0,244.0,219.0,263.0,...,219.0,227.0,221.0,271.0,203.0,275.0,201.0,0.685565,-1.536199,-12.643008


# Feature & targets

In [3]:
y=df_pose[['yaw','pitch','roll']]
x=df_pose.drop(columns=['yaw','pitch','roll'],axis=1)

# Data preprocessing

In [4]:
x_coordinate_idx=[str(i) for i in range(x.shape[1]) if i%2==0]
y_coordinate_idx=[str(i) for i in range(x.shape[1]) if i%2!=0]

In [5]:
class SubImputer(SimpleImputer):
    def __init__(self,x_num,x_corr,y_num,y_corr):
        self.x_num= x_num
        self.y_num= y_num
        self.x_corr=x_corr
        self.y_corr=y_corr
        return 

    def fit(self, X):
        
        return self

    def transform(self, X):
        #seperate x&y points
        x_data=X[self.x_corr]
        y_data=X[self.y_corr]
        #subtract the centre point
        x_data=x_data.sub(x_data[f'{self.x_num}'],axis=0)
        y_data=y_data.sub(y_data[f'{self.y_num}'],axis=0)
        #join the two data frames
        full_data=pd.concat([x_data, y_data], axis=1)
        #resort the coloumns
        data_idx=[str(i) for i in range(X.shape[1])]
        
        return full_data[data_idx].sort_index(axis = 0)
    
    def predict(self,X):
        return self.transform(X)

In [6]:
class NormaliseImputer(SimpleImputer):
    def __init__(self,x_corr,y_corr):
        self.x_corr=x_corr
        self.y_corr=y_corr
        return 

    def fit(self, X):
        
        return self

    def transform(self, X):
        #seperate x&y points
        x_data=X[self.x_corr]
        y_data=X[self.y_corr]
        #normalise the points => p-min/max-min
        x_data=x_data.sub(x_data.min(axis=1),axis=0)
        x_data=x_data.div( (x_data.max(axis=1)-x_data.min(axis=1)) ,axis=0)
        
        y_data=y_data.sub(y_data.min(axis=1),axis=0)
        y_data=y_data.div( (y_data.max(axis=1)-y_data.min(axis=1)) ,axis=0)
        #join the two data frames
        full_data=pd.concat([x_data, y_data], axis=1)
        #resort the coloumns
        data_idx=[str(i) for i in range(X.shape[1])]
        
        return full_data[data_idx].sort_index(axis = 0)
    
    def predict(self,X):
        return self.transform(X)

In [7]:
preprocessing_pip =Pipeline([
                              ("noramalise the face points",NormaliseImputer(x_coordinate_idx,y_coordinate_idx)),
                              ("centre the coordinate arround the nose",SubImputer(2,x_coordinate_idx,3,y_coordinate_idx)),
                            ])

# Build the model

In [8]:
para_grid_r=[
            {'bootstrap':[True]},
            {'max_leaf_nodes':[i for i in range(90,101)] },
            {'n_estimators':[i for i in range(75,81)]},
            {'max_samples':[i for i in range(1300,1350)]},
            { 'n_jobs':[-1]}
]

para_grid_y=[
            {'bootstrap':[True]},
            {'max_leaf_nodes':[i for i in range(100,121)] },
            {'n_estimators':[i for i in range(90,100)]},
            {'max_samples':[i for i in range(1350,1400)]},
            { 'n_jobs':[-1]}
]

para_grid_p=[
            {'bootstrap':[True]},
            {'max_leaf_nodes':[i for i in range(100,111)] },
            {'n_estimators':[i for i in range(90,100)]},
            {'max_samples':[i for i in range(1230,1260)]},
            { 'n_jobs':[-1]}
]

In [8]:
x_p= preprocessing_pip.fit_transform(x)

In [19]:
#choose best parameters using grid search

In [84]:
rand_search_yaw=RandomizedSearchCV(RandomForestRegressor(),para_grid_y,cv=5,scoring="neg_mean_squared_error")
rand_search_pitch=RandomizedSearchCV(RandomForestRegressor(),para_grid_p,cv=5,scoring="neg_mean_squared_error")
rand_search_roll=RandomizedSearchCV(RandomForestRegressor(),para_grid_r,cv=5,scoring="neg_mean_squared_error")

In [85]:
_=rand_search_yaw.fit(x_p,y.yaw)

In [88]:
_=rand_search_roll.fit(x_p,y.roll)

In [91]:
_=rand_search_pitch.fit(x_p,y.pitch)

In [None]:
#test the choosen models

In [86]:
yaw_model=rand_search_yaw.estimator
_=yaw_model.fit(x_p,y.yaw)
score = yaw_model.score(x_p, y.yaw)
print("R-squared:", score) 

R-squared: 0.9753221423337376


In [89]:
roll_model=rand_search_roll.estimator
_=roll_model.fit(x_p,y.roll)
score = roll_model.score(x_p, y.roll)
print("R-squared:", score) 

R-squared: 0.8765854961001549


In [92]:
pitch_model=rand_search_pitch.estimator
_=pitch_model.fit(x_p,y.pitch)
score = pitch_model.score(x_p, y.pitch)
print("R-squared:", score) 

R-squared: 0.8175894898994657


In [17]:
#save the modles

In [87]:
_=joblib.dump(yaw_model,"yaw_model.pkl")

In [90]:
_=joblib.dump(roll_model,"roll_model.pkl")

In [93]:
_=joblib.dump(pitch_model,"pitch_model.pkl")

In [8]:
#load the models

In [9]:
yaw_model =joblib.load("yaw_model.pkl")
roll_model =joblib.load("roll_model.pkl")
pitch_model =joblib.load("pitch_model.pkl")

# Full pipeline

In [10]:
yaw_pip=Pipeline([
                    ('preprocessing',preprocessing_pip),
                    ('yaw model',yaw_model)
                 ])

In [11]:
pitch_pip=Pipeline([
                    ('preprocessing',preprocessing_pip),
                    ('yaw model',pitch_model)
                 ])

In [12]:
roll_pip=Pipeline([
                    ('preprocessing',preprocessing_pip),
                    ('yaw model',roll_model)
                 ])

# Generate & draw landmarks

In [42]:
def generate_landmarks(landmarks,w,h):
    #array to hold  the points for the image 
    points=np.zeros((1,468*2))

    j=0 #col (x,y)
    for landmark in landmarks.landmark:
        x = landmark.x
        y = landmark.y
    
        relative_x = int(w * x)
        relative_y = int(h * y)
            
        points[0,j]=relative_x
        points[0,j+1]=relative_y
        j+=2
    

    data=pd.DataFrame(points,columns=x_p.columns)
    
    return data
        

In [43]:
def draw_axis(landmark,img,tdx=None, tdy=None, size = 100):
    
    #predict the angles
    landmarks=generate_landmarks(landmark,img.shape[1],img.shape[0])
    
    ##########
    tdx=landmarks['2']
    tdy=landmarks['3']
    #########
    
    yaw=yaw_pip.predict(landmarks)
    pitch=pitch_pip.predict(landmarks)
    roll=roll_pip.predict(landmarks)
    
    pitch = pitch * np.pi / 180
    yaw = -(yaw * np.pi / 180)
    roll = roll * np.pi / 180
    

    # X-Axis pointing to right. drawn in red
    x1 = size * (cos(yaw) * cos(roll)) + tdx
    y1 = size * (cos(pitch) * sin(roll) + cos(roll) * sin(pitch) * sin(yaw)) + tdy

    # Y-Axis | drawn in green
    #        v
    x2 = size * (-cos(yaw) * sin(roll)) + tdx
    y2 = size * (cos(pitch) * cos(roll) - sin(pitch) * sin(yaw) * sin(roll)) + tdy

    # Z-Axis (out of the screen) drawn in blue
    x3 = size * (sin(yaw)) + tdx
    y3 = size * (-cos(yaw) * sin(pitch)) + tdy

    
    cv2.line(img, (int(tdx), int(tdy)), (int(x1),int(y1)),(0,0,255),3)
    cv2.line(img, (int(tdx), int(tdy)), (int(x2),int(y2)),(0,255,0),3)
    cv2.line(img, (int(tdx), int(tdy)), (int(x3),int(y3)),(255,0,0),2)
    

    return img

In [44]:
def draw_axis_cam():
    #face mesh model
    mp_face_mesh = mp.solutions.face_mesh
    face_mesh = mp_face_mesh.FaceMesh(static_image_mode = False)
    
    vid = cv2.VideoCapture(0)
    while(True):
        
        if cv2.waitKey(1) & 0xFF == ord('q'):
            vid.release()
            cv2.destroyAllWindows()
            return
        
        ret, frame = vid.read()
        results = face_mesh.process(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))

        #check if we detect any faces
        if not results.multi_face_landmarks:
            print("no face found")
            cv2.imshow('webCam', frame)
      
        else:
            #get only first face
            landmarks = results.multi_face_landmarks[0]
            nframe=draw_axis(landmarks,frame)
            cv2.imshow('webCam', nframe)
 
        
        

In [52]:
def draw_axis_video(path):
    cap = cv2.VideoCapture(path)
    # Check if camera opened successfully
    if (cap.isOpened()== False):
        print("Error opening video stream or file")

    #face mesh model
    mp_face_mesh = mp.solutions.face_mesh
    face_mesh = mp_face_mesh.FaceMesh(static_image_mode = False)
    
    frame_width = int(cap.get(3))
    frame_height = int(cap.get(4))
    
    out = cv2.VideoWriter('outpy2.avi',cv2.VideoWriter_fourcc(*'MJPG'), 60, (frame_width,frame_height))
                          
    while (cap.isOpened()):                                          
        ret, frame = cap.read()
        if ret == True:
            #get landmarks
            results = face_mesh.process(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
            #check if we detect any faces
            if not results.multi_face_landmarks:
                print("no face found")
                out.write(frame)
                
            else:
                #get only first face
                landmarks = results.multi_face_landmarks[0]
                nframe=draw_axis(landmarks,frame)
                out.write(nframe)
                
            
            if cv2.waitKey(25) & 0xFF == ord('q'):
                  break

            
        else:
            break

    
    out.release()
    cap.release()
    cv2.destroyAllWindows()
    return

In [53]:
def draw_axis_img (img):
    #face mesh model
    mp_face_mesh = mp.solutions.face_mesh
    face_mesh = mp_face_mesh.FaceMesh(static_image_mode = True)
    #get landmarks
    results = face_mesh.process(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
    #check if we detect any faces
    if not results.multi_face_landmarks:
        print("no face found")
        return
    
    #get only first face
    landmarks = results.multi_face_landmarks[0]
    img=draw_axis(landmarks,img)
    cv2.imshow('img',img)
    cv2.waitKey(0)
    cv2.destroyAllWindows()
    cv2.imwrite('outimg.jpg', img)
    
    

In [49]:
###################### test the result ###################################

In [50]:
#image
img=cv2.imread('testimg.jpg')
draw_axis_img(img)

In [51]:
draw_axis_cam()

In [54]:
draw_axis_video('video1.mp4')

In [17]:
##########################################