# Deep Learning Eye Gaze Pipeline Experiment 1

Max Tran | Originally Published 6/13/2024

**What does this experiment do?**

This experiment creates a realtime end-to-end pipeline that uses a raw webcam frame to predict the (x, y) coordinate a user is looking at on their screen.

**How does this experiment work?**

To create a coordinate prediction pipeline, three pre-trained OpenVINO models and two HAAR Cascade Classifiers are used in stages to progressively extract facial data from a raw webcam frame. The results of these networks are fed into a basic Dense DNN (Deep Neural Network) that estimates the on-screen coordinate a user is looking at. This final network is created and trained in this experiment. 

Data for the final Dense network is collected using an OpenCV window. When this window is clicked it will turn white and enter data collection mode. In this mode a user moves their mouse pointer around the screen and follows it with their eyes. Simulatenously, the coordiantes of the mouse are recorded and the pre-trained networks are used to extract and save facial data. This collected data can then be used to train the coordinate estimation model.

**Here is a diagram of how the pipeline works:**

![](https://mermaid.ink/img/pako:eNqtVF1v2jAU_StXfgJENgraCw-T-Gwr8SU6odFlaq-Sm2DNiSPHGaXAf5-T8JGm7cam-cW27rk-x773eMsc6RJrM0_ItbNCpWE0t0Mwo_PNZo_5Mh212oj_JFhwlyQMidxarRDsXs_hNkCfYKgwoEPo0Wbf81WsN4KgAx4Xoi0Vhj7ZYR7qVko8Q3To4w2hC33S5Gguwxdc04jCxe1kCtPxPYyNfFGIeibZco95liKNXFiNRuPTSVI1Xx35e1dlARn3TMYEg1jzAP9OwcpkW5HJtuiUbaGLcariqqjiwN8s84_I0zDY0G_u31vATaczh57AOOYeJ1VUgKgcjB106UGYs2hDD804ElzHH54C8YaEVlnCnPur_6ZBpYf9UUS_rOEanw09Vzn9P9XCN0e8VYbma_rBtkTfk1K5PERdbIMDQ1FAL4m1DKwvCnlIac-GcfpqFMGEEo3CTHot1Q-o9CeT6tkZ-yPzsGy0aWjdOYoohIKGmSKXvy5E5WsdltV3_DZ86bdi13fAsj5D92DBbLPLrAs9JaPIXEQbmxsz7YxBLkI1L0K1zrbLgZnZlriuw4xrZ1WHuRRiB4NDU-SgrBXMTbN5eZjvT6hBhhpeeHT_6LscdWpw6Hbl0znceifM6iwgFSB3zbe5TcE20ysKyGZts3TJw0Rom9nh3kAx0fJuEzqsrVVCdaZk4q9Y20MRm10Suaa2fY6-ea4jxNRZSzXO_-Xse97_AruOp28?type=png)

<!-- (https://mermaid.live/edit#pako:eNqtVF1v2jAU_StXfgJENgraCw-T-Gwr8SU6odFlaq-Sm2DNiSPHGaXAf5-T8JGm7cam-cW27rk-x773eMsc6RJrM0_ItbNCpWE0t0Mwo_PNZo_5Mh212oj_JFhwlyQMidxarRDsXs_hNkCfYKgwoEPo0Wbf81WsN4KgAx4Xoi0Vhj7ZYR7qVko8Q3To4w2hC33S5Gguwxdc04jCxe1kCtPxPYyNfFGIeibZco95liKNXFiNRuPTSVI1Xx35e1dlARn3TMYEg1jzAP9OwcpkW5HJtuiUbaGLcariqqjiwN8s84_I0zDY0G_u31vATaczh57AOOYeJ1VUgKgcjB106UGYs2hDD804ElzHH54C8YaEVlnCnPur_6ZBpYf9UUS_rOEanw09Vzn9P9XCN0e8VYbma_rBtkTfk1K5PERdbIMDQ1FAL4m1DKwvCnlIac-GcfpqFMGEEo3CTHot1Q-o9CeT6tkZ-yPzsGy0aWjdOYoohIKGmSKXvy5E5WsdltV3_DZ86bdi13fAsj5D92DBbLPLrAs9JaPIXEQbmxsz7YxBLkI1L0K1zrbLgZnZlriuw4xrZ1WHuRRiB4NDU-SgrBXMTbN5eZjvT6hBhhpeeHT_6LscdWpw6Hbl0znceifM6iwgFSB3zbe5TcE20ysKyGZts3TJw0Rom9nh3kAx0fJuEzqsrVVCdaZk4q9Y20MRm10Suaa2fY6-ea4jxNRZSzXO_-Xse97_AruOp28) -->

**A few steps are required to run this experiment:**

1. I would recommend creating a virtual environment (venv) to isolate the environment for this experiment from your system installation of python. First, navigate in a terminal to the location you would like to create the virtual environment and then enter:

        python -m venv .venv


2. Follow Python's directions on how to [activate the venv for your platform](https://docs.python.org/3/library/venv.html#how-venvs-work). Then re-open your terminal and navigate back to your selected folder.

2. Install Package Dependencies

        pip install -r ./requirements.txt


3. Download Pre-trained OpenVINO Models using the now installed Open Model Zoo (OMZ) downloader

        omz_downloader --name face-detection-retail-0005


        omz_downloader --name head-pose-estimation-adas-0001


        omz_downloader --name gaze-estimation-adas-0002
        

4. Download a copy of the *haarcascade_lefteye_2splits.xml* and *haarcascade_righteye_2splits.xml* HAAR classifiers that can be be used to detect eyes. Copies can be found from the list of [OpenCV HAAR Classifiers](https://github.com/opencv/opencv/tree/4.x/data/haarcascades). Place these files in the same working directory as this notebook.

5. To run the experiment, run all of the cells once from top to bottom. When you need to change modes, stop and start **the last cell only** to avoid erasing training data and trained models from your earlier runs. When in and *data collection and test* mode, press "q" on any window to exit and save your data.

**Because this code is an experiment, it has not been fully formatted or cleaned up. It is also not yet production-ready.**

**Additional work is required.**

### Import Dependency Packages

In [None]:
# Python Standard Library Imports
from dataclasses import dataclass
import os

# Basic Data Handling and Display
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# Computer Vision
import cv2 as cv

# Machine Learning
from tensorflow import keras
import tensorflow as tf

# Machine Learning Inference Acceleration
import openvino as ov

### Global Variables

In [None]:
# Model Variable
# Stores a pre-trained model that predicts an (x, y)
# coordinate on the screen based on a user's gaze.

# In this experiment, the model is trained to accept the following parameters:
# gx, gy, gz, yaw, pitch, roll

# It then outputs a tensor with two normalized values (between 0 and 1) that can be
# de-normalized and used as a coordinate on the screen.

# In this current iteration of the experiment:
# - The default screen size is assumed to be 1920x1080
model = None


# OpenCV window callbacks are used to record the user's cursor location for training 
# the coordinate prediction model.
# Global variables are used by these callbacks to share this information.
most_recent_x = None
most_recent_y = None
tracking = False

screen_size = (1920, 1080)
screen_size = (screen_size[1], screen_size[0])

black_frame = np.zeros(screen_size)
white_frame = np.ones(screen_size) * 255.0
current_frame = black_frame


# Global Variables are also used to store data collected for training.
gx_collected_data = []
gy_collected_data = []
gz_collected_data = []

yaw_collected_data = []
pitch_collected_data = []
roll_collected_data = []

x_collected_data = []
y_collected_data = []

frames_collected_data = []

# A path at which to save collected data
DATA_SAVE_PATH = "Data.csv"

# A Tunable Training Configuration
@dataclass(frozen=True)
class Training_Configuration():
    batch_size : int = 256
    epochs : int = 75

    save_path = "./Models"

    optimizer = tf.keras.optimizers.Adam
    
    loss_function = tf.keras.losses.MeanSquaredError

    proportion_of_data_for_validation = 0.2
    shuffle = True

### Define Setup and Helper Functions

In [None]:
def get_pretrained_xml_and_bin(model_name, precision):
    """Retrieve the .xml and .bin files (OpenVINO IR Format) from pretrained OpenVINO
    models downloaded from the official model downloader.
    """
    
    base_path = f"./intel"

    xml_file = os.path.join(base_path, model_name, precision, model_name + ".xml")
    bin_file = os.path.join(base_path, model_name, precision, model_name + ".bin")
    
    return xml_file, bin_file

#### Image Processing

In [None]:
def bgr_to_rgb(image) -> np.ndarray:
    """Convert a BGR image to an RGB image."""
    
    processed_image = cv.cvtColor(image, cv.COLOR_BGR2RGB)
    return processed_image

def show_image_inline(image):
    """Prepare and display a BGR image inside a Jupyter Notebook."""

    image = bgr_to_rgb(image)
    image = image.astype(np.uint8)

    plt.imshow(image)

In [None]:
def add_square_to_center(image, square_size, color=(255, 255, 255)):
    """Returns a copy of an image with an arbitrarily sized square drawn over its center."""

    y, x, channels = image.shape
    half = square_size / 2

    cord1 = int(x/2-half), int(y/2-half)
    cord2 = int(x/2+half), int(y/2+half)

    return cv.rectangle(image, cord1, cord2, color=color)

In [None]:
def predict_from_model(gx, gy, gz, yaw, pitch, roll, integer_response=False):
    """Inference on a pretrained model and convert its prediction to an (x, y) coordinate pair.

    Currently, this function is hard-coded for the format of 
    """

    prediction = model.predict(np.array([
        [gx, gy, gz, yaw, pitch, roll]
    ]))[0]

    x, y = prediction

    x = x * 1920
    y = y * 1080

    if integer_response:
        x = int(x)
        y = int(y)

    return x, y

In [None]:
number_of_predictions_to_average = 5
current_prediction_number = 0
# This tensor stores accumulated predictions.
predictions = np.zeros(shape=(20, 2))

def average_predictions(gx, gy, gz, yaw, pitch, roll):
    """Performs a given number of model inference predictions and averages them before returning the result.

    Each time this function is called it performs a single model inference and saves the result.
    If a threshold number of predictions "number_of_predictions_to_average" is reached, all of the previously
    stored predictions are averaged and the average is returned as an (x, y) coordinate pair.

    Otherwise, the tuple (None, None) is returned.
    """

    global number_of_predictions_to_average
    global current_prediction_number
    global predictions

    x, y = predict_from_model(gx, gy, gz, yaw, pitch, roll)
    predictions[current_prediction_number][0] = x
    predictions[current_prediction_number][1] = y

    # Accounting for zero-based indexing, check to see if the current prediction
    # has filled the prediction buffer.
    # If so, average the saved predictions and clear the buffer.
    if current_prediction_number == number_of_predictions_to_average - 1:
        x_mean = predictions[:, 0].mean()
        y_mean = predictions[:, 1].mean()

        current_prediction_number = 0
        predictions = np.zeros(shape=(20, 2))

        return int(x_mean), int(y_mean)

    else:
        current_prediction_number += 1
        return None, None

### Classes to Interact with Pre-trained Models

#### A General and Extensible Class to Interact with OpenVINO Models

In [None]:
class basic_model:

    _MODEL_NAME = ""
    _ANNOTATE_WITH_BBOXES = True

    _compiled_model = None
    _precision = "FP32"

    _threshold = 0.5
    _annotation_color = (255, 255, 255)
    

    def __init__(self, core, precision="FP32", device="AUTO", threshold=0.5, annotation_color=(255, 255, 255), return_annotated_frame=False):
        
        xml_file, _bin_file = get_pretrained_xml_and_bin(self._MODEL_NAME, precision)
        self._compiled_model = core.compile_model(xml_file, device)
        self._precision = precision

        self._threshold = threshold
        self._annotation_color = annotation_color
        self._ANNOTATE_WITH_BBOXES = return_annotated_frame

    def _preprocess_frame(self, frame):

        # Make sure to take into account the precision here!
        
        return frame
    
    def inference(self, frame):
        # Returns bboxes and an annotated preview frame

        # OpenVino Inference Code
        inference_request = self._compiled_model.create_infer_request()

        processed_frame = self._preprocess_frame(frame)

        input_tensor = ov.Tensor(processed_frame)
        inference_request.set_input_tensor(input_tensor)

        inference_request.start_async()
        inference_request.wait()

        output = inference_request.get_output_tensor()

        # For models that return bboxes, this will be bboxes.
        processed_output_data = self._retrieve_data_from_output(output)


        if self._ANNOTATE_WITH_BBOXES:
            return processed_output_data, self._annotate_frame_with_bboxes(frame, processed_output_data)
        
        else:
            return processed_output_data
    
    def _retrieve_data_from_output(self, raw_output, scale_value=300):

        # Returns bboxes if the model does so.
        
        # Use _threshold here
        bboxes = []

        return bboxes
    
    def _annotate_frame_with_bboxes(self, frame, bboxes):

        preview_frame = frame
        
        for bbox in bboxes:
            cv.rectangle(preview_frame, bbox[0], bbox[1], color=self._annotate_color)

        return preview_frame

### Specific Model Classes Used in this Experiment

In [None]:
# TODO: Refactor the face_detection_retail Class Code
# The code for this model class was originally written in a previous experiment
# and is not as cleanly organized as the other model classes.

class face_detection_retail:

    _compiled_model = None

    # A cropped-only frame that can still be used for previews.
    cropped_frame = None

    def __init__(self, core, precision, device="AUTO"):

        # Locate model files, compile the model, and save a reference inside the class.
        xml_file, _bin_file = get_pretrained_xml_and_bin("face-detection-retail-0005", precision)
        self._compiled_model = core.compile_model(xml_file, device)

    def _preprocess_frame(self, image):
        """Returns a frame ready for inference, and a preview-ready frame too."""

        # Original Shape is (480, 640, 3)
        # Target input shape is (1,3,300,300)

        # Crop a 300x300 Square from the center of the picture
        y, x, channels = image.shape
        image = image[int(y/2-150):int(y/2+150), int(x/2-150):int(x/2+150)]

        # TODO: Figure out a cleaner way to allow this cropped frame to exit
        self.cropped_frame = image

        # During testing, OpenCV returns a UINT8 frame.
        # Because OpenVINO is looking for FP32 frames, convert the frame here.
        rearranged_image = image.astype(np.float32)

        # Convert the dimension arrangement of the frame to the one the OpenVINO model is expecting.
        # Move the channel dimension up and create a wrapper dimension at the front.
        rearranged_image = np.moveaxis(rearranged_image, 2, 0)
        rearranged_image = np.expand_dims(rearranged_image, 0)

        return rearranged_image, image
        

    def inference(self, frame):
        """Returns bboxes and an annotated preview frame"""

        # OpenVino Inference Code
        inference_request = self._compiled_model.create_infer_request()

        processed_frame, preview_frame = self._preprocess_frame(frame)

        input_tensor = ov.Tensor(processed_frame)
        inference_request.set_input_tensor(input_tensor)

        inference_request.start_async()
        inference_request.wait()

        output = inference_request.get_output_tensor()
        bboxes = self._retrieve_boxes_from_output(output)

        return bboxes, self._annotate_frame(preview_frame, bboxes)
        

    def _retrieve_boxes_from_output(self, raw_output, threshold=0.5, scale_value=300):
        """Returns a list of tuples, each representing two coordinates for bounding boxes."""
        
        output_buffer = raw_output.data
        condensed_output = np.reshape(output_buffer, (200, 7)) 

        # Item 2 is the confidence level
        thresholded_predictions = [pred for pred in condensed_output if pred[2] > threshold]

        boxes = []

        def prepare_cord(x, y, scale_value):
            return (x * scale_value).astype(np.uint8), (y * scale_value).astype(np.uint8)

        for prediction in thresholded_predictions:
            cord1 = prepare_cord(prediction[3], prediction[4], scale_value)
            cord2 = prepare_cord(prediction[5], prediction[6], scale_value)
            coordinate_tuple = (cord1, cord2)

            boxes.append(coordinate_tuple)
        return boxes
    
    def _annotate_frame(self, frame, bboxes):
        """Annotate a frame with bounding box detections."""

        preview_frame = frame
        
        for bbox in bboxes:
            cv.rectangle(preview_frame, bbox[0], bbox[1], color=(255, 255, 255))

        return preview_frame

In [None]:
class head_pose_estimation(basic_model):

    # This model requires an input tensor of shape (1, 3, 60, 60).

    _MODEL_NAME = "head-pose-estimation-adas-0001"

    def _preprocess_frame(self, frame):
        """Preprocess incoming frames for inference."""

        # Resize the frame
        frame = cv.resize(frame, (60, 60))
        
        # Rearrange dimensions
        rearranged_frame = np.moveaxis(frame, 2, 0)
        rearranged_frame = np.expand_dims(rearranged_frame, 0)

        # Cast to a floating point integer type
        # TODO: Take into account user-selected model precision.
        rearranged_frame = rearranged_frame.astype(np.float32)

        return rearranged_frame
    
    def inference(self, frame):
        """Returns bboxes and an annotated preview frame."""

        # OpenVino Inference Code
        inference_request = self._compiled_model.create_infer_request()

        processed_frame = self._preprocess_frame(frame)

        input_tensor = ov.Tensor(processed_frame)
        inference_request.set_input_tensor(input_tensor)

        inference_request.start_async()
        inference_request.wait()

        # Retrieve inference results without wrapping dimensions.
        yaw = inference_request.get_output_tensor(0).data[0][0]
        pitch = inference_request.get_output_tensor(1).data[0][0]
        roll = inference_request.get_output_tensor(2).data[0][0]

        return yaw, pitch, roll
    

In [None]:
def locate_eyes(image, x1_y1_x2_y2_face_bounding_box=None):
    """Returns a tuple of tuples of tuples representing bounding box coordinates for the left and right eyes.
     
     This takes the format of (((x1, y1), (x2, y2)), ((x1, y1), (x2, y2)))
     with the left eye bounding box coordinates listed followed by the right eye bounding box coordinates.
     """
    
    left_eye_cascade_filepath = r"./haarcascade_lefteye_2splits.xml"
    right_eye_cascade_filepath = r"./haarcascade_righteye_2splits.xml"

    left_eye_cascade = cv.CascadeClassifier()
    right_eye_cascade = cv.CascadeClassifier()

    # Load the classifiers from the OpenCV Sample Files
    try:
        left_eye_cascade.load(left_eye_cascade_filepath)
        right_eye_cascade.load(right_eye_cascade_filepath)

    except:
        print("There was an error locating the HAAR classifiers.")
        return None
    
    # If necessary, crop the provided bounding box.
    if x1_y1_x2_y2_face_bounding_box is not None:
        (cord1, cord2) = x1_y1_x2_y2_face_bounding_box
        x1, y1 = cord1
        x1, y1 = cord2
    
        image = image[min(y1, y1):max(y1, y1), min(x1, x1):max(x1, x1)]
    
    # Detect eyes using the cascade classifiers.
    left_eye_result = left_eye_cascade.detectMultiScale(image)
    right_eye_result = right_eye_cascade.detectMultiScale(image)

    # Extract more traditional coordinates to return
    # ((x1, y1), (x2, y2))
    left_eye_bbox_coordinates = None
    right_eye_bbox_coordinates = None

    if left_eye_result is not None:
        x1, y1, w, h = left_eye_result[0]
        left_eye_bbox_coordinates = ((x1, y1), (x1 + w, y1 + h))     

    if right_eye_result is not None:
        x1, y1, w, h = right_eye_result[0]
        right_eye_bbox_coordinates = ((x1, y1), (x1 + w, y1 + h))

    return (left_eye_bbox_coordinates, right_eye_bbox_coordinates)

In [None]:
class gaze_estimation(basic_model):

    # TODO: Describe required model inputs.

    _MODEL_NAME = "gaze-estimation-adas-0002"

    def _preprocess_frame(self, frame):

        # Resize the frame
        frame = cv.resize(frame, (60, 60))
        
        # Rearrange dimensions
        rearranged_frame = np.moveaxis(frame, 2, 0)
        rearranged_frame = np.expand_dims(rearranged_frame, 0)

        # TODO: Take into account the user-selected model precision.
        # Cast the numpy array to a floating point integer type
        rearranged_frame = rearranged_frame.astype(np.float32)

        return rearranged_frame
    
    def _preprocess_eye(self, eye_image):
        eye_image = cv.resize(eye_image, (60, 60))

        # Rearrange the array's dimensions into the format expected by the OpenVINO model.
        rearranged_frame = np.moveaxis(eye_image, 2, 0)
        rearranged_frame = np.expand_dims(rearranged_frame, 0)

        rearranged_frame = rearranged_frame.astype(np.float32)

        # Convert the numpy frame to an OpenVINO tensor.
        tensor = ov.Tensor(rearranged_frame)

        return tensor
    
    def inference(self, left_eye, right_eye, head_pose_data):
        """Returns bboxes and an annotated preview frame"""

        # OpenVino Inference Code
        inference_request = self._compiled_model.create_infer_request()

        left_eye = self._preprocess_eye(left_eye)
        right_eye = self._preprocess_eye(right_eye)

        head_pose_data = np.array(head_pose_data)
        head_pose_data = np.expand_dims(head_pose_data, 0)
        head_pose_data = ov.Tensor(head_pose_data)

        inference_request.set_input_tensor(0, left_eye)
        inference_request.set_input_tensor(1, right_eye)
        inference_request.set_input_tensor(2, head_pose_data)

        inference_request.start_async()
        inference_request.wait()

        # Obtain the inference result.
        raw_output = inference_request.get_output_tensor()

        # Example output of data:
        # array([[ 0.02891541, -0.28149414, -0.9379883 ]], dtype=float32)
        return raw_output.data[0]

### A Coordinate Prediction Pipeline

This pipeline uses specified pre-trained models and the user-trained (x, y) coordinate model to create an end-to-end gaze coordinate prediction pipeline.

This pipeline is under heavy development; the version below was written as an experiment and is not yet production-ready.

As pieces have been added and removed over the course of the experiment, comments and code pieces have not yet been cleaned up or formatted for helpful viewing.

In [None]:
class coordinate_prediction_pipeline:

    # TODO: Clean up try and except block handling.

    core = None

    face_detection_model = None
    head_pose_estimation_model = None
    gaze_estimation_model = None

    def __init__(self, core):
        self.core = core

        self.face_detection_model = face_detection_retail(core, "FP32", device="AUTO")
        self.head_pose_estimation_model = head_pose_estimation(core, "FP32", device="AUTO")
        self. gaze_estimation_model = gaze_estimation(core, "FP32", device="GPU")

    def inference(self, frame):

        # 
        # Stage 1: Locate Faces
        # 
        bboxes, annotated_frame = self.face_detection_model.inference(frame)

        # Continue early if no faces were detected
        if len(bboxes) <= 0:
            return (None, None, None, None)

        # Display an annotated frame with the first located face
        # TODO: Optimize the face detection model class
        cv.imshow("Face Detections", annotated_frame)

        unannotated_cropped_frame = self.face_detection_model.cropped_frame

        # Crop the unannotated frame to the first face.
        # TODO: Handle multiple faces
        # Images are stored y, x.

        # Decompose the classes's bbox format.
        (cord1, cord2) = bboxes[0]
        x1, y1 = cord1
        x2, y2 = cord2

        frame_cropped_to_face = unannotated_cropped_frame[min(y1, y2):max(y1, y2), min(x1, x2):max(x1, x2)]

        try:
            yaw, pitch, roll = self.head_pose_estimation_model.inference(frame_cropped_to_face)
        except:
            return (None, None, None, None)

        try:
            left_bbox_coordinates, right_bbox_coordinates = locate_eyes(frame_cropped_to_face)
            if (left_bbox_coordinates is None) and (right_bbox_coordinates is None):
                return (None, None, None, None)
            
        except:
            return (None, None, None, None)

        # Try to display the image cropped to the active user's face.
        # TODO: Determine why this sometimes fails.
        try:
            annotation = cv.rectangle(frame_cropped_to_face, left_bbox_coordinates[0], left_bbox_coordinates[1], color=(255, 255, 255))
            annotation = cv.rectangle(frame_cropped_to_face, right_bbox_coordinates[0], right_bbox_coordinates[1], color=(255, 255, 255))
            cv.imshow("Active Face", annotation)
        except:
            return (None, None, None, None)

        (x1, y1), (x2, y2) = left_bbox_coordinates
        eye1_image = frame_cropped_to_face[min(y1, y2):max(y1, y2), min(x1, x2):max(x1, x2)]
        
        (x1, y1), (x2, y2) = right_bbox_coordinates
        eye2_image = frame_cropped_to_face[min(y1, y2):max(y1, y2), min(x1, x2):max(x1, x2)]

        gaze_direction = self.gaze_estimation_model.inference(eye1_image, eye2_image, (yaw, pitch, roll))

        return (gaze_direction, yaw, pitch, roll)

### Data Collection and Testing/Training Modes

In [None]:
def window_event_handler(event, x, y, flags, param):
    """An OpenCV Window Callback to Collect Mouse Data"""

    global most_recent_x
    global most_recent_y
    global tracking

    if event == cv.EVENT_LBUTTONDOWN:

        # Invert the tracking variable
        tracking = not tracking
    
    if tracking:
        most_recent_x = x
        most_recent_y = y

def collect_data_and_test_current_model():

    global model
    global tracking

    pipeline = coordinate_prediction_pipeline(core)

    WINDOW_NAME = "Frame"

    tracking = False

    current_x_mean = 0
    current_y_mean = 0

    while True:
        window = cv.namedWindow(WINDOW_NAME, cv.WND_PROP_FULLSCREEN)
        cv.setWindowProperty(WINDOW_NAME, cv.WND_PROP_FULLSCREEN, cv.WND_PROP_FULLSCREEN)

        cv.setMouseCallback(WINDOW_NAME, window_event_handler)

        ret, frame = cap.read()

        if not ret:
            print("There was an error accessing your camera. Additional investigation is required.")
            break

        data_present = False
        if (not frame.any()) or frame is None:
            cv.waitKey(1)

        else:
            # At this point, a frame should be ready.
            frame_with_rectangle = add_square_to_center(frame, 300)
            cv.imshow("Live Preview", frame_with_rectangle)


            gaze_direction, yaw, pitch, roll = pipeline.inference(frame)
            if gaze_direction is not None:
                gx = gaze_direction[0]
                gy = gaze_direction[1]
                gz = gaze_direction[2]

            if((gaze_direction is None) or (yaw is None) or (pitch is None) or (roll is None)):
                cv.waitKey(1)
            else:
                data_present = True

        # If all data is present and a model is available, try to inference now.
        if data_present and (model is not None):
            # x_mean, y_mean = average_predictions(gx, gy, gz, yaw, pitch, roll)
            x_mean, y_mean = predict_from_model(gx, gy, gz, yaw, pitch, roll, integer_response=True)

            if (x_mean is not None) and (y_mean is not None):
                current_x_mean = x_mean
                current_y_mean = y_mean

        # All data is present!
        if tracking:
            
            white_frame_copy = white_frame.copy()
            
            starting_offset = (30, 80)
            other_options = (cv.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 4, cv.LINE_AA)
            cv.putText(white_frame_copy, f"Current Mouse Location: ({most_recent_x}, {most_recent_y})", starting_offset, *other_options)

            starting_offset = (30, 200)
            other_options = (cv.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 4, cv.LINE_AA)
            cv.putText(white_frame_copy, f"Number of data points: {len(x_collected_data)}", starting_offset, *other_options)

            if data_present and model is not None:
                cv.circle(white_frame_copy, (current_x_mean, current_y_mean), radius=15, color=(0, 0, 0), thickness=-1)

            cv.imshow(WINDOW_NAME, white_frame_copy)

            # If all data is present, record it all.
            if data_present:

                gx_collected_data.append(gx)
                gy_collected_data.append(gy)
                gz_collected_data.append(gz)
                
                yaw_collected_data.append(yaw)
                pitch_collected_data.append(pitch)
                roll_collected_data.append(roll)

                x_collected_data.append(most_recent_x)
                y_collected_data.append(most_recent_y)

        else:
            black_frame_copy = black_frame.copy()

            starting_offset = (30, 200)
            other_options = (cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 4, cv.LINE_AA)
            cv.putText(black_frame_copy, f"Number of data points: {len(x_collected_data)}", starting_offset, *other_options)
            if data_present and model is not None:
                cv.circle(black_frame_copy, (current_x_mean, current_y_mean), radius=15, color=(255, 255, 255), thickness=-1)
            cv.imshow(WINDOW_NAME, black_frame_copy)
            

        if cv.waitKey(1) == ord("q"):
            cv.destroyAllWindows()
            break

In [None]:
def save_data():
    new_df = pd.DataFrame(
        {
            "gx_data" : gx_collected_data,
            "gy_data" : gy_collected_data,
            "gz_data" : gz_collected_data,

            "yaw_data" : yaw_collected_data,
            "pitch_data" : pitch_collected_data,
            "roll_data" : roll_collected_data,

            "x_data" : x_collected_data,
            "y_data" : y_collected_data
        }
    )

    # If no data has been saved before, create and save a new data frame.
    if not os.path.exists(DATA_SAVE_PATH):

        new_df.to_csv(DATA_SAVE_PATH)
        
        df = new_df

        print(f"Your data has been saved to a new data frame at {DATA_SAVE_PATH}!")

    # Otherwise, load the existing frame, append data to it, and save it.
    else:

        df = pd.read_csv(DATA_SAVE_PATH, index_col=0)
        df = pd.concat([df, new_df], ignore_index=True)

        df.to_csv(DATA_SAVE_PATH)

        print(f"Your data has been appended to an existing data frame at {DATA_SAVE_PATH}!")

In [None]:
def train_model():

    global model
    
    df = pd.read_csv(DATA_SAVE_PATH, index_col=0)

    # Convert each column of the training data to NumPy arrays.
    gx_data = df["gx_data"].to_numpy()
    gy_data = df["gy_data"].to_numpy()
    gz_data = df["gz_data"].to_numpy()
    yaw_data = df["yaw_data"].to_numpy()
    pitch_data = df["pitch_data"].to_numpy()
    roll_data = df["roll_data"].to_numpy()

    x_data = df["x_data"].to_numpy()
    y_data = df["y_data"].to_numpy()

    # Create x and y training datasets.
    input_data = np.array([np.array([gx, gy, gz, yaw, pitch, roll]) for gx, gy, gz, yaw, pitch, roll in zip(gx_data, gy_data, gz_data, yaw_data, pitch_data, roll_data)])
    output_data = np.array([np.array([x/1920., y/1080.]) for x, y in zip(x_data, y_data)])

    assert len(input_data) == len(output_data), f"We're not sure why, but the length of your x_train data ({len(input_data)}) does not match the length of your y_train data ({len(input_data)}). Make sure these are the same length or delete your data (stored in the Data.csv file) and try collecting it again."

    input = keras.Input(shape=(6,), name="Inputs")

    # Define a basic Keras Model to Predict Screen Coordinates.

    # Normalize each of the model's inputs.
    norm_layer = keras.layers.Normalization(name="Normalization-Layer")
    norm_layer.adapt(input_data)
    x = norm_layer(input)

    x = keras.layers.Flatten()(x)

    x = keras.layers.Dense(512, activation="relu")(x)
    x = keras.layers.Dense(256, activation="relu")(x)
    x = keras.layers.Dense(128, activation="relu")(x)
    x = keras.layers.Dense(64, activation="relu")(x)

    output = keras.layers.Dense(2, activation="sigmoid")(x)

    model = keras.Model(inputs=input, outputs=output)

    # Print out a model summary.
    model.summary()

    # Compile the model for training.
    training_config = Training_Configuration()

    model.compile(training_config.optimizer(),
                        loss=training_config.loss_function(),
                        metrics=["accuracy"])
    
    # Train the Model
    model.fit(x=input_data,
          y=output_data,
          epochs=training_config.epochs,
          batch_size=training_config.batch_size,
          validation_split=training_config.proportion_of_data_for_validation)

### Setup

In [None]:
core = ov.Core()

In [None]:
# Open an OpenCV VideoCapture Instance
try:
    cap = cv.VideoCapture(0)
    # Prime the camera by reading a few initial frames.
    for _ in range(0, 60):
        _, _ = cap.read()
except:
    pass

### Main Running Loop

In [None]:
while True:

    # A separate user input loop
    user_response = None
    while(True):
        user_response = int(input("What would you like to do?\n\t1. Collect Training Data and Test Your Currently Trained Model\n\t2. Train a Model from Collected Training Data\n\t3. Exit\n"))
        
        if not user_response in range(1, 4):
            print("Please try again and select a valid option.\n")
            continue
        else:
            break

    if user_response == 1:
        collect_data_and_test_current_model()
        save_data()

    elif user_response == 2:
        train_model()
    
    else:
        save_data()
        cap.release()
        
        print("The camera has been turned off and any remaining data has been saved You're good to go!")
        
        break