In [None]:
import cv2
import numpy as np
from sklearn.model_selection import train_test_split
import json
import dlib
detector = dlib.get_frontal_face_detector()
predictor = dlib.shape_predictor('shape_predictor_68_face_landmarks.dat')

In [None]:
def preprocess_eye_region(frame, eye_coords, target_size=(40, 48)):
    """
    Preprocesses the eye region for the CNN model.
    Args:
        frame: The input image frame (in BGR format).
        eye_coords: Coordinates of the eye region.
        target_size: The target size for each eye region.
    Returns:
        The preprocessed eye region.
    """
    x_min = min(x for x, y in eye_coords)
    x_max = max(x for x, y in eye_coords)
    y_min = min(y for x, y in eye_coords)
    y_max = max(y for x, y in eye_coords)

    # Cropping the eye region based on the extremities of the landmarks
    cropped_eye = frame[y_min:y_max, x_min:x_max]

    # Resizing the cropped eye region to the target size
    resized_eye = cv2.resize(cropped_eye, target_size)

    return resized_eye.astype(np.float32) / 255.0

In [None]:
def get_combined_eyes(frame):
    """
    Detects and combines the eye regions from the frame.
    Args:
        frame: The input image frame.
    Returns:
        The combined eye regions, or None if not detected.
        
        
    """
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    faces = detector(gray)

    for face in faces:
        landmarks = predictor(gray, face)

        # Extract the coordinates for each eye
        left_eye = [(landmarks.part(n).x, landmarks.part(n).y) for n in range(36, 42)]
        right_eye = [(landmarks.part(n).x, landmarks.part(n).y) for n in range(42, 48)]

        # Preprocess each eye region
        left_eye_region = preprocess_eye_region(frame, left_eye)

        right_eye_region = preprocess_eye_region(frame, right_eye)

        # Combine the eyes side by side
        combined_eyes = np.hstack([left_eye_region, right_eye_region])

        # Ensure the combined eyes image has the correct shape
        if combined_eyes.shape[1] != 80:
            raise ValueError("Combined eyes region does not match the expected width.")
        return combined_eyes

    return None

In [None]:

def normalize_head_pose(head_pose_data, rotation_scale=180, translation_max_displacement=None):
    """
    Normalizes the head pose data.
    Args:
        head_pose_data: List containing the head pose data (rotation and translation vectors).
        rotation_scale: Maximum value for the rotation vector components (180 for degrees, np.pi for radians).
        translation_max_displacement: A tuple (max_x, max_y, max_z) representing the maximum displacement in each axis. If None, standard deviation normalization will be used.

    Returns:
        Normalized head pose data.
    """
    # Normalize rotation vectors
    normalized_rotation = np.array(head_pose_data[:3]) / rotation_scale

    # Normalize translation vectors
    if translation_max_displacement:
        max_x, max_y, max_z = translation_max_displacement
        normalized_translation = np.array(head_pose_data[3:]) / np.array([max_x, max_y, max_z])
    else:
        # Standard deviation normalization
        translation_vector = np.array(head_pose_data[3:])
        std_dev = np.std(translation_vector)
        mean_val = np.mean(translation_vector)
        normalized_translation = (translation_vector - mean_val) / std_dev

    return np.concatenate([normalized_rotation, normalized_translation]).tolist()


In [None]:
import os
from glob import glob
import pandas as pd
# Assuming normalize_head_pose and get_combined_eyes are defined as before
def get_screen_size(metadata_file_path):
    with open(metadata_file_path, 'r') as f:
        metadata = json.load(f)

        # Check if 'screenData' is a key in the metadata
        if 'screenData' in metadata:
            metadata = metadata['screenData']
        # Otherwise, assume the metadata is already at the top level

        screen_width = metadata.get('screenWidth')
        screen_height = metadata.get('screenHeight')

        if screen_width is None or screen_height is None:
            raise ValueError("Screen size not found in metadata")

        return screen_width, screen_height

def parse_head_pose_data(row):
    # Split the strings and convert to float
    rotation_str, translation_str = row['head_pose'], row['head_translation']
    rotation = [float(x) for x in rotation_str.strip('"').split(',')]
    translation = [float(x) for x in translation_str.strip('"').split(',')]
    return rotation + translation  # Combine into a single list

def prepare_dataset(base_dir):
    X, Y = [], []
    processed_files = set()
    column_names = ['image_path', 'cursor_x', 'cursor_y', 'left_pup', 'eye_y1', 'eye_x2', 'eye_y2', 'eye_x3', 'eye_y3', 'eye_x4', 'eye_y4', 'eye_x5', 'eye_y5', 'eye_x6', 'eye_y6', 'head_pose', 'head_translation']

    for subdir in glob(os.path.join(base_dir, '*/')):
        print(f"Processing directory: {subdir}")
        metadata_file_path = os.path.join(subdir, 'metadata.json')
        screen_width, screen_height = get_screen_size(metadata_file_path)
        print(f"Screen size: {screen_width}x{screen_height}")

        # Find any CSV files in the directory
        csv_files = glob(os.path.join(subdir, '*.csv'))
        #skip calibration files
        # csv_files = [f for f in csv_files if 'calibration' not in f]

        for data_file_path in csv_files:
            if data_file_path in processed_files:
                # Skip this file since it has already been processed
                continue
            processed_files.add(data_file_path)  # Mark this file as processed

            print(f"Processing data CSV file: {data_file_path}")
            data = pd.read_csv(data_file_path, header=None, names=column_names)

            if not csv_files:
                print(f"No data CSV file found in directory: {subdir}")
                continue
            # Find any directory that contains image files (assuming JPEG for example)
            img_folders = [d for d in os.listdir(subdir) if os.path.isdir(os.path.join(subdir, d)) and glob(os.path.join(subdir, d, '*.png'))]
            if not img_folders:
                print(f"No image folder found that contains images in directory: {subdir}")
                continue
            data = pd.read_csv(data_file_path, header=None, names=column_names)
            # print how many columns 
            print(data.shape)

            for index, row in data.iterrows():
                # Directly use the image path from the dataframe
                img_path = os.path.join(row['image_path'])
                cursor_x, cursor_y = row['cursor_x'], row['cursor_y']
                eye_box_pupil_data = row[3:15].tolist()
                head_pose_data = parse_head_pose_data(row)

                normalized_eye_box_pupil_data = [float(coord) / screen_width if i % 2 == 0 else float(coord) / screen_height for i, coord in enumerate(eye_box_pupil_data)]
                normalized_head_pose_data = normalize_head_pose(head_pose_data)

                # Load the image
                img = cv2.imread(img_path)
                if img is None:
                    print(f"Image not found: {img_path}")
                    continue

                combined_eyes = get_combined_eyes(img)

                # Append to datasets
                Y.append([cursor_x / screen_width, cursor_y / screen_height] + normalized_eye_box_pupil_data + normalized_head_pose_data)
                X.append(combined_eyes)
    return X, Y

In [None]:
# Example usage:
base_dir = './data'
X, Y = prepare_dataset(base_dir)

In [None]:

X_filtered = [img for img in X if img is not None and isinstance(img, np.ndarray)]
Y_filtered = [Y[i] for i in range(len(Y)) if X[i] is not None and isinstance(X[i], np.ndarray)]

X_filtered = np.array(X_filtered)

Y_filtered = np.array(Y_filtered)
Y_filtered = Y_filtered[:, :2]

In [None]:
len(X_filtered), len(Y_filtered)

In [None]:
X_train, X_test, Y_train, Y_test = train_test_split(X_filtered, Y_filtered, test_size=0.2, random_state=42)
#Val data
X_train, X_val, Y_train, Y_val = train_test_split(X_train, Y_train, test_size=0.2, random_state=42)

In [None]:
from keras.models import Sequential
from keras.layers import Conv2D, Flatten, Dense, MaxPool2D
from keras.metrics import MeanSquaredError, MeanAbsoluteError

model = Sequential([
    Conv2D(32, (3, 3), activation='relu', input_shape=(48, 80, 3)), 
    MaxPool2D(),
    Conv2D(64, (3, 3), activation='relu'),
    MaxPool2D(),
    Conv2D(128, (3, 3), activation='relu'),
    MaxPool2D(),
    Flatten(),
    Dense(64, activation='relu'),
    Dense(14) 
])

# Compile the model
model.compile(optimizer='adam', loss='mse', metrics=[MeanSquaredError(), MeanAbsoluteError()])
model.fit(X_train, Y_train, epochs=100, validation_split=0.1, batch_size=32)

In [None]:
from keras.models import Sequential
from keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout, BatchNormalization
from keras.regularizers import l2
from keras.preprocessing.image import ImageDataGenerator
from keras.callbacks import EarlyStopping
from keras.optimizers import Adam

train_datagen = ImageDataGenerator(
    # rotation_range=20,
    # width_shift_range=0.1,
    # height_shift_range=0.1,
    # shear_range=0.2,
    # zoom_range=[0.9, 1.1],
    # horizontal_flip=True,
    # fill_mode='nearest'
)

# Assuming you have your training data in train_data and train_labels
train_generator = train_datagen.flow(X_train, Y_train, batch_size=32)


model = Sequential()

# Adding L2 Regularization to Convolutional Layers
l2_reg = 0.001

# First Conv Block
model.add(Conv2D(32, (3, 3), activation='relu', input_shape=(48, 80, 3), kernel_regularizer=l2(l2_reg)))
model.add(MaxPooling2D())
model.add(Dropout(0.1))

# Second Conv Block
model.add(Conv2D(64, (3, 3), activation='relu'))
model.add(MaxPooling2D())
model.add(Dropout(0.15))

# Third Conv Block
model.add(Conv2D(128, (3, 3), activation='relu', kernel_regularizer=l2(l2_reg)))
model.add(BatchNormalization())
model.add(MaxPooling2D())

# Fourth Conv Block
model.add(Conv2D(256, (3, 3), activation='relu'))
model.add(MaxPooling2D())
model.add(Dropout(0.2))


# Flatten and Dense Layers
model.add(Flatten())
model.add(Dense(64, activation='relu'))
model.add(Dropout(0.2))
model.add(Dense(2))  # Adjust the number of outputs as needed

# Compile the model
model.compile(optimizer=Adam(learning_rate=0.00005), loss='mse', metrics=['mean_squared_error', 'mean_absolute_error'])

early_stopping = EarlyStopping(monitor='val_loss', patience=20, restore_best_weights=True)

history = model.fit(
    train_generator,
    epochs=100,  # Adjust number of epochs
    validation_data=(X_val, Y_val),  # Assuming validation data is available
    callbacks=[early_stopping],
    batch_size=32
)

In [None]:
from keras.models import Sequential
from keras.layers import Conv2D, MaxPool2D, Flatten, Dense, Dropout

# Define the model
model = Sequential()

# Add convolutional layers with dropout
model.add(Conv2D(32, (3, 3), activation='relu', input_shape=(48, 80, 3)))
model.add(MaxPool2D())
model.add(Dropout(0.25))  # Dropout layer after pooling

model.add(Conv2D(64, (3, 3), activation='relu'))
model.add(MaxPool2D())
model.add(Dropout(0.25))  # Another dropout layer

model.add(Conv2D(128, (3, 3), activation='relu'))
model.add(MaxPool2D())
model.add(Dropout(0.4))  # Higher dropout rate for deeper layers

# Flatten the output from convolutional layers before passing it to the dense layers
model.add(Flatten())

# Add dense layers with dropout
model.add(Dense(64, activation='relu'))
model.add(Dropout(0.5))  # Dropout layer before the output layer
model.add(Dense(14, activation='sigmoid'))  # Adjust the number of outputs as needed

# Compile the model
model.compile(optimizer='adam', loss='mse', metrics=['mean_squared_error', 'mean_absolute_error'])
model.fit(X_train, Y_train, epochs=100, validation_split=0.1, batch_size=32)

In [None]:
#evaluate the model
model.evaluate(X_test, Y_test)

In [19]:
from keras.models import load_model
model = load_model('./models/eye_gaze_v13.h5')

In [21]:
#plot predicted vs actual on test data on a canvas using opencv 

import cv2
import numpy as np 
predictions = model.predict(X_test)
screen_width, screen_height = 2650, 1440
canvas = np.zeros((screen_height, screen_width, 3), dtype=np.uint8)
cv2.namedWindow('Gaze Tracking on Canvas', cv2.WINDOW_NORMAL)
cv2.setWindowProperty('Gaze Tracking on Canvas', cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)
# plot the first 10 images one by one
for i in range(50,75):
    # get the predicted x,y coordinates
    x, y = predictions[i][0] * screen_width, predictions[i][1] * screen_height
    print(x,y)
    # get the actual x,y coordinates
    x_actual, y_actual = Y_test[i][0] * screen_width, Y_test[i][1] * screen_height
    print(x_actual, y_actual)
    # plot the predicted x,y coordinates
    cv2.circle(canvas, (int(x), int(y)), 10, (0, 0, 255), -1)
    # plot the actual x,y coordinates
    cv2.circle(canvas, (int(x_actual), int(y_actual)), 10, (0, 255, 0), -1)
    # show the canvas
    cv2.imshow('Gaze Tracking on Canvas', canvas)
    cv2.waitKey(0)
    # clear the canvas
    canvas = np.zeros((screen_height, screen_width, 3), dtype=np.uint8)

cv2.waitKey(0)
cv2.destroyAllWindows()

2419.257739186287 1341.7916679382324
2026.8359375000002 1315.0
657.1261465549469 947.7400588989258
504.12109375000006 857.0
2747.9534924030304 702.7704763412476
2468.84765625 844.0
2262.391984462738 1020.4450035095215
2112.75390625 968.0000000000001
1167.4029260873795 1139.7003936767578
1281.5234375 1132.0
818.1490480899811 603.8757562637329
1004.1015625 380.0
1989.0138149261475 991.2393951416016
1859.8125366139427 1270.5
1265.8489927649498 1077.641887664795
1443.0078125 561.0
2576.560863852501 933.3009338378906
2087.91015625 911.0
2059.6619337797165 487.7213430404663
1984.0070298769772 153.0
1527.9271751642227 414.2601442337036
1516.50390625 180.0
334.1575860977173 500.01577377319336
91.09375000000001 68.0
1586.6878032684326 934.3475532531738
1602.1089630931458 1201.5
1342.8165465593338 780.8297538757324
1325.776215582894 547.5
152.4806333705783 1120.3733825683594
40.36321031048623 1410.0
1991.881439089775 1106.953411102295
1772.1874999999998 1078.0
2166.5651619434357 901.842098236084

In [None]:
model.save('./models/eye_gaze_v13.h5')
