In [1]:
!pip install mediapipe



You should consider upgrading via the 'C:\Users\User\AppData\Local\Programs\Python\Python310\python.exe -m pip install --upgrade pip' command.


In [2]:
import numpy as np
import matplotlib.pyplot as plt
import os,cv2,math,glob,random
import scipy.io as sio
from math import cos, sin
from pathlib import Path
import pandas as pd
import mediapipe
import warnings
warnings.filterwarnings('ignore')
#from google.colab.patches import cv2_imshow

In [3]:
#%%capture
#if os.path.isfile(r'C:\Users\User\Downloads\ml1proj\AFLW2000-3D.zip') == False:
#  !gdown --id  1fP3zvSCYjll_o_m7S12nvQLZ9MnsEoap
#  !unzip r"C:\Users\User\Downloads\ml1proj\AFLW2000-3D.zip"


Getting features and labels

In [23]:
folder_path = "D:\ITIContent\ml1proj\AFLW2000-3D\AFLW2000"
landmark_points = []
pose_parameters = []
i=0
# Loop through each file in the folder
for filename in os.listdir(folder_path):
    # Check if the file is an image (you can add more extensions if needed)
    if filename.endswith(".jpg") :
        # Construct the full path to the image file
        image_path = os.path.join(folder_path, filename)
        faceModule = mediapipe.solutions.face_mesh
        # loading image and its correspinding mat file
        with faceModule.FaceMesh(static_image_mode=True) as faces:
            # loading the image
            image = cv2.imread(image_path)
        
            # processing the face to extract the landmark points (468 point) for each x,y,z
            results = faces.process(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
            j = 0
            if results.multi_face_landmarks != None: 
              # looping over the faces in the image
                for face in results.multi_face_landmarks:
                    for landmark in face.landmark:
                        x = landmark.x
                        y = landmark.y
                        if j==0:
                            landmark_points.append([x,y])
                            j=1
                        else:
                            landmark_points[i].extend([x,y])

                i+=1
        
    elif filename.endswith(".mat") and results.multi_face_landmarks != None :
        mat_path = os.path.join(folder_path, filename)
        
        # loading the mat file to extract the 3 angels of the face (pitch,yaw,roll)
        mat_file = sio.loadmat(mat_path)
        # extracting the labels 3 angels
        pose_para = mat_file["Pose_Para"][0][:3]
        
        pose_parameters.append(mat_file["Pose_Para"][0][:3]) 

Preprocessing

In [15]:
def pre_process(matrix):
    full_mat_arr = np.array(matrix)
    x = full_mat_arr[:,::2]
    y = full_mat_arr[:,1::2]

    mean_x = x.mean(axis=1).reshape(-1,1) 
    mean_y = y.mean(axis=1).reshape(-1,1) 

    x_normalized = (x-mean_x)/np.max(x-mean_x , axis=1).reshape(-1,1)
    y_normalized = (y-mean_y)/np.max(y-mean_y , axis=1).reshape(-1,1)

    matrix_normalized = np.concatenate((x_normalized, y_normalized), axis=1)
    return matrix_normalized

In [26]:
features = pre_process(landmark_points)

In [27]:
features.shape

(1853, 936)

In [28]:
featuresdf = pd.DataFrame(features)
featuresdf.head()

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,926,927,928,929,930,931,932,933,934,935
0,-0.054009,-0.04037,-0.038028,-0.076798,-0.035119,-0.023282,0.013516,-0.5235,0.034438,0.041904,...,0.247213,0.224374,0.229533,0.260067,0.260768,-0.382411,-0.351421,-0.320908,-0.438123,-0.467719
1,-0.215842,-0.311608,-0.186841,-0.237612,-0.3176,-0.27403,-0.126127,-0.12529,-0.086885,-0.095023,...,0.110457,0.086568,0.16554,0.113408,0.138896,-0.413988,-0.387064,-0.369057,-0.451886,-0.479313
2,0.060703,0.099913,0.066462,-0.014083,0.101372,0.089088,0.044666,-0.723454,0.02602,0.024092,...,0.142616,0.116409,0.123669,0.153813,0.161707,-0.38957,-0.362216,-0.340198,-0.431603,-0.460729
3,0.018921,-0.119623,-0.039146,-0.220663,-0.150898,-0.16959,-0.192763,-0.554353,-0.247986,-0.28664,...,0.24569,0.207056,0.172181,0.264689,0.26653,-0.452726,-0.406874,-0.368635,-0.668241,-0.720458
4,0.052235,-0.02631,0.005212,-0.140076,-0.046625,-0.067283,-0.10912,-0.630865,-0.134262,-0.155106,...,0.086542,0.053946,0.087525,0.097064,0.114359,-0.421979,-0.394538,-0.378136,-0.54091,-0.557696


Splitting dataset

In [32]:
from sklearn.model_selection import train_test_split

In [33]:
seed = 42
features_train, features_test, labels_train, labels_test = train_test_split(features, pose_parameters, test_size=0.2, random_state=42)
print(features_train.shape,'\n',features_test.shape)

(1482, 936) 
 (371, 936)


Modelling

In [34]:
from sklearn.linear_model import LinearRegression
from sklearn.linear_model import Ridge
from sklearn import linear_model
from sklearn.linear_model import ElasticNet
from sklearn.ensemble import RandomForestRegressor
from sklearn.tree import DecisionTreeRegressor
from sklearn.svm import SVR
from sklearn.ensemble import AdaBoostRegressor
import xgboost as xg
from sklearn.multioutput import MultiOutputRegressor
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import mean_squared_error, make_scorer

In [35]:
import joblib

In [36]:
#Linear Regression
reg = LinearRegression()
reg.fit(features_train, labels_train)
print('Train : ', reg.score(features_train, labels_train))
print('Test : ', reg.score(features_test, labels_test))

Train :  0.9846539869111393
Test :  -3.264839781541092


##########################

In [37]:
#Lasso
lasso_model = linear_model.Lasso()
param_grid = {
    'alpha': [0.0001, 0.0005, 0.001, 0.01, 0.1, 1.0]}
mse_scorer = make_scorer(mean_squared_error, greater_is_better=False)
grid_search = GridSearchCV(estimator=lasso_model, param_grid=param_grid, cv=5, scoring=mse_scorer)
grid_search.fit(features_train, labels_train)
print("Best Hyperparameters:", grid_search.best_params_)

best_lasso_norm_nopca = grid_search.best_estimator_
print('Train : ', best_lasso_norm_nopca.score(features_train, labels_train))
print('Test : ', best_lasso_norm_nopca.score(features_test, labels_test))

Best Hyperparameters: {'alpha': 0.01}
Train :  0.3895526105356888
Test :  0.7463863984892932


In [94]:
joblib.dump(best_lasso_norm_nopca,"best_lasso_norm_nopca.pkl")

['best_lasso_norm_nopca.pkl']

###########################

In [38]:
#Ridge
ridge_model = Ridge()
param_grid = {
    'alpha': [0.0001, 0.001, 0.01, 0.1, 0.5,1.0] }
mse_scorer = make_scorer(mean_squared_error, greater_is_better=False)
grid_search = GridSearchCV(estimator=ridge_model, param_grid=param_grid, cv=5, scoring=mse_scorer)
grid_search.fit(features_train, labels_train)

print("Best Hyperparameters:", grid_search.best_params_)
best_ridge_norm_nopca = grid_search.best_estimator_
print('Train : ', best_ridge_norm_nopca.score(features_train, labels_train))
print('Test : ', best_ridge_norm_nopca.score(features_test, labels_test))

Best Hyperparameters: {'alpha': 1.0}
Train :  0.4872541189368162
Test :  0.6190794447441481


In [95]:
joblib.dump(best_ridge_norm_nopca,"best_ridge_norm_nopca.pkl")

['best_ridge_norm_nopca.pkl']

###############################

In [39]:
#Elastic net
elastic_net = ElasticNet()
param_grid = {
    'alpha': [0.0001, 0.001, 0.01, 0.1, 1.0],
    'l1_ratio': [0.1, 0.3, 0.5, 0.7, 0.9]}
mse_scorer = make_scorer(mean_squared_error, greater_is_better=False)
grid_search = GridSearchCV(estimator=elastic_net, param_grid=param_grid, cv=5, scoring=mse_scorer)

grid_search.fit(features_train, labels_train)
print("Best Hyperparameters:", grid_search.best_params_)
best_elasticnet_norm_nopca = grid_search.best_estimator_
print('Train : ', best_elasticnet_norm_nopca.score(features_train, labels_train))
print('Test : ', best_elasticnet_norm_nopca.score(features_test, labels_test))


Best Hyperparameters: {'alpha': 0.01, 'l1_ratio': 0.7}
Train :  0.3968584588041409
Test :  0.7496882123462729


In [96]:
joblib.dump(best_elasticnet_norm_nopca,"best_elasticnet_norm_nopca.pkl") 

['best_elasticnet_norm_nopca.pkl']

#############################

In [40]:
#Random forest regressor
rfg_norm_nopca= RandomForestRegressor(n_estimators = 10, max_depth = 5, random_state = 77)
rfg_norm_nopca.fit(features_train, labels_train)
print('Train : ', rfg_norm_nopca.score(features_train, labels_train))
print('Test : ', rfg_norm_nopca.score(features_test, labels_test))

Train :  0.7910764852356836
Test :  0.396031656513211


In [93]:
joblib.dump(rfg_norm_nopca,"rfg_norm_nopca.pkl") 

['rfg_norm_nopca.pkl']

#######################################

In [43]:
#Decision tree regressor
dt_reg = DecisionTreeRegressor(max_depth=20, min_samples_split=20,random_state=42)
dt_reg.fit(features_train, labels_train)
print('Train : ', dt_reg.score(features_train, labels_train))
print('Test : ', dt_reg.score(features_test, labels_test))

Train :  0.6978691447799069
Test :  -1.3598724535758013


###########################

In [49]:
#Support vector regressor  
svr_default = SVR()
svr_norm_nopca_grid = MultiOutputRegressor(svr_default)

param_grid = {
    'estimator__kernel': ['rbf'],
    'estimator__C': [ 0.1, 1, 5, 10],
    'estimator__epsilon': [0.01, 0.1,1],
    'estimator__gamma': ['scale', 'auto', 0.1, 0.01]
}

grid_search = GridSearchCV(svr_norm_nopca_grid, param_grid, cv=5, n_jobs=-1, scoring = mse_scorer)
grid_search.fit(features_train, labels_train)

print("Best parameters found:")
print(grid_search.best_params_)
best_model_svr_norm_nopca = grid_search.best_estimator_

print('Train Score:',  best_model_svr_norm_nopca.score(features_train, labels_train))
print('Test Score:', best_model_svr_norm_nopca.score(features_test, labels_test))

Best parameters found:
{'estimator__C': 10, 'estimator__epsilon': 0.01, 'estimator__gamma': 'scale', 'estimator__kernel': 'rbf'}
Train Score: 0.44825533290257064
Test Score: 0.8611375008440966


In [55]:
joblib.dump(best_model_svr_norm_nopca,"best_model_svr_final.pkl") 

['best_model_svr_final.pkl']

##############################

In [56]:
#Adaboost
ada_norm_nopca = MultiOutputRegressor(AdaBoostRegressor(n_estimators=50,learning_rate=0.01))
ada_norm_nopca.fit(features_train, labels_train)
print('Train : ', ada_norm_nopca.score(features_train, labels_train))
print('Test : ', ada_norm_nopca.score(features_test, labels_test))

Train :  0.9020809804205904
Test :  0.73384485597618


In [137]:
joblib.dump(ada_norm_nopca,"ada_norm_nopca.pkl")  

['ada_norm_nopca.pkl']

##########################

In [57]:
#Xgboost 
xgb_norm_nopca = xg.XGBRegressor(learning_rate=0.3, max_depth=30 , n_estimators=30, gamma=0.5, reg_lambda= 100, reg_alpha = 10)
xgb_norm_nopca.fit(features_train, labels_train)
print('Train : ', xgb_norm_nopca.score(features_train, labels_train))
print('Test : ', xgb_norm_nopca.score(features_test, labels_test))

Train :  0.48381976660857945
Test :  0.7855533920048933


In [132]:
joblib.dump(xgb_norm_nopca,"xgb_norm_nopca.pkl") 

['xgb_norm_nopca.pkl']

Let's try SVR on a video

In [58]:
loaded_model = joblib.load(r"D:\ITIContent\ml1proj\best_model_svr_final.pkl")

Preparing for face trials

In [59]:
def draw_axis(img, pitch,yaw,roll, tdx=None, tdy=None, size = 100):

    yaw = -yaw
    if tdx != None and tdy != None:
        tdx = tdx
        tdy = tdy
    else:
        height, width = img.shape[:2]
        tdx = width / 2
        tdy = height / 2

    # X-Axis pointing to right. drawn in red
    x1 = size * (cos(yaw) * cos(roll)) + tdx
    y1 = size * (cos(pitch) * sin(roll) + cos(roll) * sin(pitch) * sin(yaw)) + tdy

    # Y-Axis | drawn in green
    #        v
    x2 = size * (-cos(yaw) * sin(roll)) + tdx
    y2 = size * (cos(pitch) * cos(roll) - sin(pitch) * sin(yaw) * sin(roll)) + tdy

    # Z-Axis (out of the screen) drawn in blue
    x3 = size * (sin(yaw)) + tdx
    y3 = size * (-cos(yaw) * sin(pitch)) + tdy

    cv2.line(img, (int(tdx), int(tdy)), (int(x1),int(y1)),(0,0,255),3)
    cv2.line(img, (int(tdx), int(tdy)), (int(x2),int(y2)),(0,255,0),3)
    cv2.line(img, (int(tdx), int(tdy)), (int(x3),int(y3)),(255,0,0),2)

    return img

In [60]:
def get_prediction(image , features ,model , flag=1):
    
    features2 = pre_process(features)
    pose_para = model.predict(features2)
    pitch = pose_para[0][0]
    yaw = pose_para[0][1]
    roll = pose_para[0][2]
    
    mat_arr = np.array(features)
    
    tx = mat_arr[0][10]*image.shape[1] 
    ty = mat_arr[0][11]*image.shape[0]
    if flag ==1:
        cv2.imshow('image' , draw_axis(image,pitch,yaw,roll , tx,ty))
        cv2.waitKey(0)
        plt.figure(figsize=(8,8))
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        pixels = np.array(image)
        plt.imshow(pixels)
    
    return draw_axis(image,pitch,yaw,roll,tx,ty)

In [65]:
input_video = cv2.VideoCapture("pexels_videos_2675511 (1080p).mp4")

# Get the video properties
fps = input_video.get(cv2.CAP_PROP_FPS)
width = int(input_video.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(input_video.get(cv2.CAP_PROP_FRAME_HEIGHT))

# Create VideoWriter object to write the processed frames
output_video = cv2.VideoWriter('svr_video_2.mp4', 
                                cv2.VideoWriter_fourcc(*'mp4v'), 
                                fps, 
                                (width, height))

# Process each frame
while input_video.isOpened():
    ret, frame = input_video.read()

    if not ret:
        break

    processed_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    
    matrix = []
    faceModule = mediapipe.solutions.face_mesh
    # loading image and its correspinding mat file
    with faceModule.FaceMesh(static_image_mode=True) as faces:
        results = faces.process(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))

        j=0
        if results.multi_face_landmarks != None: 
          # looping over the faces in the image
            for face in results.multi_face_landmarks:
                for landmark in face.landmark:
                    x = landmark.x
                    y = landmark.y
                    #matrix.append([x,y])
                    if j==0:
                        matrix.append([x,y])
                        j=1
                    else:
                        matrix[0].extend([x,y])
    

            processed_frame = get_prediction(frame , matrix ,loaded_model , 0) 
        
        else:
            processed_frame = frame

    # Write the processed frame to the output video
    output_video.write(processed_frame)

# Release video objects
input_video.release()
output_video.release()
cv2.destroyAllWindows()

________________________________________________________________________________________________________________________________________________