In [1]:
from dash_canvas import DashCanvas
from dash_canvas.utils import array_to_data_url, image_string_to_PILImage
from dash import html, dcc, Dash
from dash.dependencies import Input, Output, State
import dash_bootstrap_components as dbc
import datetime
import numpy as np
import time

# Dashboard

In [2]:
app = Dash(external_stylesheets=[dbc.themes.BOOTSTRAP], title="Head Pose Estimation")

## Navbar

In [3]:
navbar = dbc.Navbar(
    dbc.Container(
        [
            dbc.Row(
                [
                    dbc.Col(html.Img(src=app.get_asset_url('head.png'), height="80px")),
                    dbc.Col(dbc.NavbarBrand("Head Pose Estimation", className="ms-2", style={"font-family": "serif", "font-size": "45px"})),
                ],
                align="center",
                className="g-0",
            ),
        ]
    ),
    color="light"
)


## Image Upload

In [4]:
canvas_width = 500

image_upload = html.Div([
    dcc.Upload(
        id='upload-image',
        children=html.Div([
            'Drag and Drop or ',
            html.A('Select Files', href="#")
        ]),
        style={
            'width': '100%',
            'height': '60px',
            'lineHeight': '60px',
            'borderWidth': '1px',
            'borderStyle': 'dashed',
            'borderRadius': '5px',
            'textAlign': 'center',
            'margin': '10px'
        },
        # Allow multiple files to be uploaded
        multiple=True
    ),
    dcc.Loading(id="ls-loading-1", children=[html.Div(id="ls-loading-output-1")], type="default", style={'margin-top':'50px'}),
    html.Div(id='output-image-upload',className='m-auto', style={'display': 'table'}),
])


def parse_contents(contents):
    img = image_string_to_PILImage(contents)
    pix = np.array(img)
    img_content = array_to_data_url(pix)

    img_predicted = predict_image(pix)
    img_content_predicted = array_to_data_url(img_predicted)

    return dbc.Row([
            dbc.Col([    
                DashCanvas(id='canvaas_image2',
                                image_content=img_content_predicted,
                                width=canvas_width,
                                hide_buttons=["zoom", "pan", "line", "pencil","rectangle", "undo", "select", "Save"]),
            ],className='m-auto')
    ])

In [5]:
@app.callback(Output('output-image-upload', 'children'), Input('upload-image', 'contents'))
def update_output(list_of_contents):
    if list_of_contents is not None:
        children = [parse_contents(c) for c in list_of_contents]
        return children

In [6]:
@app.callback(Output("ls-loading-output-1", "children"), Input('upload-image', 'contents'))
def input_triggers_spinner(value):
    if value:
      time.sleep(5)

## Video Upload

In [7]:
canvas_width = 500

video_upload = html.Div([
    dcc.Upload(
        id='upload-video',
        children=html.Div([
            'Drag and Drop or ',
            html.A('Select Files', href="#")
        ]),
        style={
            'width': '100%',
            'height': '60px',
            'lineHeight': '60px',
            'borderWidth': '1px',
            'borderStyle': 'dashed',
            'borderRadius': '5px',
            'textAlign': 'center',
            'margin': '10px'
        },
        # Allow multiple files to be uploaded
        multiple=True
    ),
    dcc.Loading(id="ls-loading-2", children=[html.Div(id="ls-loading-output-2")], type="default", style={'margin-top':'50px'}),
    html.Div(id='output-video-upload',className='m-auto', style={'display': 'table'}),
])


def parse_vid_contents(contents):
  vid_path = predict_video(contents)
  return dbc.Row([
      dbc.Col([
          html.Video(src=vid_path)
      ], className='m-auto')
  ])


In [8]:
@app.callback(Output('output-video-upload', 'children'),
              Input('upload-video', 'contents'),)
def update_vid_output(list_of_contents):
    if list_of_contents is not None:
        children = [parse_vid_contents(str(c).replace('data:video/mp4;base64,', '')) for c in list_of_contents]
        return children

In [9]:
@app.callback(Output("ls-loading-output-2", "children"), Input('upload-video', 'contents'))
def input_triggers_spinner2(value):
    if value:
      time.sleep(40)

## Tabs

In [10]:
tab1_content = dbc.Card(
    dbc.CardBody(
        [
            image_upload
        ]
    ),
    className="mt-3",
)

tab2_content = dbc.Card(
    dbc.CardBody(
        [
            video_upload
        ]
    ),
    className="mt-3",
)

tabs = dbc.Tabs(
    [
        dbc.Tab(tab1_content, label="Image Prediction"),
        dbc.Tab(tab2_content, label="Video Prediction")
    ]
)

In [11]:
app.layout = html.Div(children=[
    navbar,
    html.Div(tabs, style={'margin': '30px'})
])

# Model Prediction

In [12]:
import cv2
import numpy as np
import mediapipe as mp
import os
import joblib
from math import cos, sin
import joblib
import base64



In [13]:
model = joblib.load('content/svr_model_norm.pkl')

## Image Prediction

In [14]:
def get_2d_features_predict(image):
  mp_face_mesh = mp.solutions.face_mesh
  with mp_face_mesh.FaceMesh(
    static_image_mode=True,
    max_num_faces=1,
    refine_landmarks=True,
    min_detection_confidence=0.3) as face_mesh:
    results = face_mesh.process(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
    if results.multi_face_landmarks:
      face_landmarks = list(results.multi_face_landmarks[0].landmark)
      face_landmarks_2d = list(map(lambda landmark: [int(landmark.x * image.shape[0]), int(landmark.y * image.shape[1])], face_landmarks))
      return np.array(face_landmarks_2d, dtype=int).flatten()
    
    return None
  
def get_dist(x1, y1, x2, y2):
  return np.linalg.norm([x1-x2, y1-y2])

def draw_axis(img, pitch,yaw,roll, tdx=None, tdy=None, size = 100):

    yaw = -yaw
    if tdx != None and tdy != None:
        tdx = tdx
        tdy = tdy
    else:
        height, width = img.shape[:2]
        tdx = width / 2
        tdy = height / 2

    # X-Axis pointing to right. drawn in red
    x1 = size * (cos(yaw) * cos(roll)) + tdx
    y1 = size * (cos(pitch) * sin(roll) + cos(roll) * sin(pitch) * sin(yaw)) + tdy

    # Y-Axis | drawn in green
    #        v
    x2 = size * (-cos(yaw) * sin(roll)) + tdx
    y2 = size * (cos(pitch) * cos(roll) - sin(pitch) * sin(yaw) * sin(roll)) + tdy

    # Z-Axis (out of the screen) drawn in blue
    x3 = size * (sin(yaw)) + tdx
    y3 = size * (-cos(yaw) * sin(pitch)) + tdy

    cv2.line(img, (int(tdx), int(tdy)), (int(x1),int(y1)),(0,0,255),3)
    cv2.line(img, (int(tdx), int(tdy)), (int(x2),int(y2)),(0,255,0),3)
    cv2.line(img, (int(tdx), int(tdy)), (int(x3),int(y3)),(255,0,0),2)

    return img
  
def pipeline(frame):
    landmarks = get_2d_features_predict(frame)
    if landmarks is not None:
        tdx, tdy = landmarks[2].copy(), landmarks[3].copy()
        # subtract from the nose
        landmarks[::2] = landmarks[::2] - tdx
        landmarks[1::2] = landmarks[1::2] - tdy

        # divide by the distance
        distance = get_dist(landmarks[10], landmarks[11], landmarks[152], landmarks[153])
        landmarks = landmarks / distance
        
        predicted_pose = model.predict(landmarks.reshape(1, -1))
        annotated_image = draw_axis(frame, predicted_pose[0][0], predicted_pose[0][1], predicted_pose[0][2], tdx, tdy, size = 100)
        return annotated_image
    
    return frame

In [15]:
def predict_image(img):
    img = pipeline(img)
    return img

## Video Prediction

In [16]:
tmp_path = 'assets/'
tmp_vid_in = "assets/tmp_in.mp4"
tmp_vid_out = "assets/output.mp4"

In [17]:
def check_tmp_dir():
    if not os.path.exists(tmp_path):
        os.mkdir(tmp_path)
    else:
      if os.path.exists(tmp_vid_in):
        os.remove(tmp_vid_in)

      if os.path.exists(tmp_vid_out):
        os.remove(tmp_vid_out)

def save_vid(vid_encode):
  with open(tmp_vid_in, "wb") as videoFile:
    videoFile.write(base64.b64decode(vid_encode))

In [18]:
def output_predicted_vid():
  # Create a VideoCapture object
  cap = cv2.VideoCapture(tmp_vid_in)
  
  # Check if camera opened successfully
  if (cap.isOpened() == False): 
    print("Error opening video stream or file")
  
  # Default resolutions of the frame are obtained.The default resolutions are system dependent.
  # We convert the resolutions from float to integer.
  frame_width = int(cap.get(3))
  frame_height = int(cap.get(4))
  video_fps = cap.get(cv2.CAP_PROP_FPS),

  fourcc = cv2.VideoWriter_fourcc(*'X264')

  # Define the codec and create VideoWriter object.The output is stored in 'outpy.avi' file.
  out = cv2.VideoWriter(tmp_vid_out, apiPreference=0, fourcc=fourcc,
                      fps=video_fps[0], frameSize=(450, 450))

  while(True):
    ret, frame = cap.read()
  
    if ret == True: 
      # Write the frame into the file 'output.avi'
      frame = cv2.resize(frame, (450, 450), interpolation = cv2.INTER_AREA)
      frame = predict_image(frame)
      out.write(frame)
  
      # Display the resulting frame    
      # cv2.imshow('frame',frame)
  
      # Press Q on keyboard to stop recording
      if cv2.waitKey(1) & 0xFF == ord('q'):
        break
  
    # Break the loop
    else:
      break 
  
  # When everything done, release the video capture and video write objects
  cap.release()
  out.release()
  
  # Closes all the frames
  cv2.destroyAllWindows()

In [19]:
def predict_video(vid_encode):
    check_tmp_dir()
    save_vid(vid_encode)
    output_predicted_vid()
    return tmp_vid_out

# Run Server

In [21]:
app.run_server()

Dash is running on http://127.0.0.1:8050/

 * Serving Flask app "__main__" (lazy loading)
 * Environment: production
[2m   Use a production WSGI server instead.[0m
 * Debug mode: off


 * Running on http://127.0.0.1:8050/ (Press CTRL+C to quit)
127.0.0.1 - - [23/Feb/2023 21:27:31] "[37mGET / HTTP/1.1[0m" 200 -
127.0.0.1 - - [23/Feb/2023 21:27:32] "[37mGET /_dash-layout HTTP/1.1[0m" 200 -
127.0.0.1 - - [23/Feb/2023 21:27:32] "[37mGET /_dash-dependencies HTTP/1.1[0m" 200 -
127.0.0.1 - - [23/Feb/2023 21:27:32] "[37mGET /_dash-component-suites/dash/dcc/async-upload.js HTTP/1.1[0m" 200 -
127.0.0.1 - - [23/Feb/2023 21:27:32] "[37mPOST /_dash-update-component HTTP/1.1[0m" 200 -
127.0.0.1 - - [23/Feb/2023 21:27:32] "[37mPOST /_dash-update-component HTTP/1.1[0m" 200 -
127.0.0.1 - - [23/Feb/2023 21:27:32] "[37mPOST /_dash-update-component HTTP/1.1[0m" 200 -
127.0.0.1 - - [23/Feb/2023 21:27:32] "[37mPOST /_dash-update-component HTTP/1.1[0m" 200 -

X does not have valid feature names, but SVR was fitted with feature names


X does not have valid feature names, but SVR was fitted with feature names


X does not have valid feature names, but SVR was fitted with fea