# NOTE :
### 1. Models trained/evaluated in final_ptge.ipynb havce been saved and then loaded. Pretrained models have been used here, hence please change the path to the pretrained models accordingly. We can also merge this code with the final_ptge.ipynb if we dont want to use pretrained models.
### 2. One can download my pretrained models as well. Details are provided in README.md

### 3. Due to limited compute and storage resources, we have considered to process and work with only 3 subjects [p00,p01,p02]. For the leave-out strategy we have used only p00
### 4. Make sure you have already executed "python3 prepare_and_process_data.py". This will created "processed" folder need to run this notebook


In [1]:
!pip install tensorflow==2.15.1

Collecting tensorflow==2.15.1
  Downloading tensorflow-2.15.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.2 kB)
Collecting ml-dtypes~=0.3.1 (from tensorflow==2.15.1)
  Downloading ml_dtypes-0.3.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (20 kB)
Collecting keras<2.16,>=2.15.0 (from tensorflow==2.15.1)
  Downloading keras-2.15.0-py3-none-any.whl.metadata (2.4 kB)
Downloading tensorflow-2.15.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (475.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m475.2/475.2 MB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m:00:01[0m00:01[0m
[?25hDownloading keras-2.15.0-py3-none-any.whl (1.7 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.7/1.7 MB[0m [31m57.3 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading ml_dtypes-0.3.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.2/2

In [2]:
!pip install tf-models-official==2.15

Collecting tf-models-official==2.15
  Downloading tf_models_official-2.15.0-py2.py3-none-any.whl.metadata (1.4 kB)
Collecting gin-config (from tf-models-official==2.15)
  Downloading gin_config-0.5.0-py3-none-any.whl.metadata (2.9 kB)
Collecting immutabledict (from tf-models-official==2.15)
  Downloading immutabledict-4.2.0-py3-none-any.whl.metadata (3.4 kB)
Collecting pycocotools (from tf-models-official==2.15)
  Downloading pycocotools-2.0.8-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (1.1 kB)
Collecting sacrebleu (from tf-models-official==2.15)
  Downloading sacrebleu-2.4.2-py3-none-any.whl.metadata (58 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m58.0/58.0 kB[0m [31m1.8 MB/s[0m eta [36m0:00:00[0m
Collecting seqeval (from tf-models-official==2.15)
  Downloading seqeval-1.2.2.tar.gz (43 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m43.6/43.6 kB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metada

In [3]:
import tensorflow as tf
tf.version.VERSION

2024-07-16 00:58:35.671468: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-07-16 00:58:35.671527: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-07-16 00:58:35.673062: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered


'2.15.1'

In [4]:
import tensorflow as tf
from sklearn.metrics import mean_absolute_error
import random
import numpy as np
import os, glob
from tensorflow.keras.layers import Flatten, Dense, BatchNormalization, Input
from tensorflow.keras.models import Model
from PIL import Image

gpus=tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu,True)
print(gpus)
import matplotlib.pyplot as plt

# Ensure the same seed for reproducibility
random.seed(12)
np.random.seed(12)
tf.random.set_seed(12)

[PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]


In [6]:
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

In [7]:
import tensorflow as tf
import numpy as np
import os
from PIL import Image


# Define custom Huber loss function
def custom_huber_loss(y_true, y_pred, delta=1.5):
    error = y_true - y_pred
    is_small_error = tf.abs(error) <= delta

    small_error_loss = tf.square(error) / 2
    big_error_loss = delta * (tf.abs(error) - delta / 2)

    return tf.where(is_small_error, small_error_loss, big_error_loss)


# Calculate gaze loss as the average of Huber losses
def gaze_loss(y_true, y_pred, delta=1.5):
    huber_losses = custom_huber_loss(y_true, y_pred, delta)
    return tf.reduce_mean(huber_losses)

# Function to preprocess the images
def preprocess_image(image, target_size):
    image = tf.image.resize(image, target_size)
    image = tf.cast(image, tf.float32) / 255.0
    return image

# Define the GazeDataset class
class GazeDataset(tf.data.Dataset):
    def __new__(cls, subject_to_leave_out=None, batch_size=8, validation=False):
        def _generator():
            root_dir = 'processed_data'
            subjects = ['p00', 'p01', 'p02']
            transform_face = lambda img: preprocess_image(img, (224, 224))
            transform_eye = lambda img: preprocess_image(img, (112, 112))

            for subject in subjects:
                if validation:
                    if subject != subject_to_leave_out:
                        continue
                else:
                    if subject == subject_to_leave_out:
                        continue
                
                person_dir = os.path.join(root_dir, 'Image', subject)
                for image_name in os.listdir(os.path.join(person_dir, 'face')):
                    face_image_path = os.path.join(person_dir, 'face', image_name)
                    left_eye_image_path = os.path.join(person_dir, 'lefteye', image_name)
                    right_eye_image_path = os.path.join(person_dir, 'righteye', image_name)
                    rotation_matrix_path = os.path.join(person_dir, 'rotation_matrix', image_name.replace('.jpg', '.npy'))
                    rotation_matrix_flipped_path = os.path.join(person_dir, 'rotation_matrix_flipped', image_name.replace('.jpg', '.npy'))
                    gaze_2d_path = os.path.join(person_dir, '2d_gaze', image_name.replace('.jpg', '.npy'))
                    gaze_3d_path = os.path.join(person_dir, '3d_gaze', image_name.replace('.jpg', '.npy'))
                    gaze_3d_flipped_path = os.path.join(person_dir, '3d_gaze_flipped', image_name.replace('.jpg', '.npy'))
                    eye_coords_path = os.path.join(person_dir, 'eye_coords', image_name.replace('.jpg', '.npy'))

                    face_image = Image.open(face_image_path).convert('RGB')
                    left_eye_image = Image.open(left_eye_image_path).convert('RGB')
                    right_eye_image = Image.open(right_eye_image_path).convert('RGB')

                    face_image = transform_face(np.array(face_image))
                    left_eye_image = transform_eye(np.array(left_eye_image))
                    right_eye_image = transform_eye(np.array(right_eye_image))

                    rotation_matrix = np.load(rotation_matrix_path).astype(np.float32)
                    rotation_matrix_flipped = np.load(rotation_matrix_flipped_path).astype(np.float32)
                    gaze_2d = np.load(gaze_2d_path).astype(np.float32)
                    gaze_3d = np.load(gaze_3d_path).astype(np.float32)
                    gaze_3d_flipped = np.load(gaze_3d_flipped_path).astype(np.float32)
                    eye_coords = np.load(eye_coords_path).astype(np.float32)

                    yield face_image, left_eye_image, right_eye_image, rotation_matrix, rotation_matrix_flipped, gaze_2d, gaze_3d, gaze_3d_flipped, eye_coords, subject

        return tf.data.Dataset.from_generator(
            _generator,
            output_signature=(
                tf.TensorSpec(shape=(224, 224, 3), dtype=tf.float32),
                tf.TensorSpec(shape=(112, 112, 3), dtype=tf.float32),
                tf.TensorSpec(shape=(112, 112, 3), dtype=tf.float32),
                tf.TensorSpec(shape=(3, 3), dtype=tf.float32),
                tf.TensorSpec(shape=(3, 3), dtype=tf.float32),
                tf.TensorSpec(shape=(2,), dtype=tf.float32),
                tf.TensorSpec(shape=(3,), dtype=tf.float32),
                tf.TensorSpec(shape=(3,), dtype=tf.float32),
                tf.TensorSpec(shape=(6,), dtype=tf.float32),
                tf.TensorSpec(shape=(), dtype=tf.string)
            )
        ).batch(batch_size)

# Testing the dataset
val_dataloader = GazeDataset(subject_to_leave_out='p00', batch_size=8)

# Analysis of Angular Errors

In [8]:
gaze_model = tf.keras.models.load_model('/kaggle/input/pretrained-models/gaze_model_with_leaveout_p00/kaggle/working/gaze_model_with_leaveout_p00', compile=False)
calibration_model = tf.keras.models.load_model('/kaggle/input/pretrained-models/calibration_model_with_leaveout_p00/kaggle/working/calibration_model_with_leaveout_p00', compile=False)
spaze_model = tf.keras.models.load_model('/kaggle/input/pretrained-models/spaze_model_with_leaveout_p00/kaggle/working/spaze_model_with_leaveout_p00', compile=False)

In [None]:
# Uncomment this cell, if you dont havent run prepare_and_process_data.py, and no 'processed_data' directory exists

# """
# src file for preparing data from MPIIFaceGaze and MPIIGaze directories
# """
# import importlib.util
# import sys

# # Path to preprocessing.py
# file_path = '/kaggle/input/preprocess/preprocessing.py'

# # Load the module
# spec = importlib.util.spec_from_file_location("dpc", file_path)
# dpc = importlib.util.module_from_spec(spec)
# sys.modules["dpc"] = dpc
# spec.loader.exec_module(dpc)

# # Now you can import preprocessing.py
# #import preprocessing as dpc
# import numpy as np
# import scipy.io as sio
# #import content as dpy
# import cv2
# import os
# import sys

# #import preprocessing as dpc
# import linecache
# root = "/kaggle/input/mpiigaze/MPIIFaceGaze/MPIIFaceGaze"
# sample_root = "/kaggle/input/mpiigaze/MPIIGaze/MPIIGaze/Evaluation Subset/sample list for eye image"
# out_root = "processed_data"
# scale = True
# MPII_path="/kaggle/input/mpiigaze/MPIIGaze/MPIIGaze/Data/Original"

# def ImageProcessing_MPII():
#     persons = os.listdir(sample_root)
#     persons.sort()
#     for person in persons[:3]:
#         sample_list = os.path.join(sample_root, person)

#         person = person.split(".")[0]
#         im_root = os.path.join(root, person)
#         anno_path = os.path.join(root, person, f"{person}.txt")

#         im_outpath = os.path.join(out_root, "Image", person)
#         label_outpath = os.path.join(out_root, "Label", f"{person}.label")

#         if not os.path.exists(im_outpath):
#             os.makedirs(im_outpath)
#         if not os.path.exists(os.path.join(out_root, "Label")):
#             os.makedirs(os.path.join(out_root, "Label"))

#         print(f"Start Processing {person}")
#         ImageProcessing_Person(im_root, anno_path, sample_list, im_outpath, label_outpath, person)



# def ImageProcessing_Person(im_root, anno_path, sample_list, im_outpath, label_outpath, person):
#     # Read camera matrix
#     camera = sio.loadmat(os.path.join(f"{im_root}", "Calibration", "Camera.mat"))
#     camera = camera["cameraMatrix"]

#     # Read gaze annotation
#     annotation = os.path.join(anno_path)
#     with open(annotation) as infile:
#         anno_info = infile.readlines()
#     anno_dict = {line.split(" ")[0]: line.strip().split(" ")[1:-1] for line in anno_info}

#     # Create the handle of label
#     outfile = open(label_outpath, 'w')
#     outfile.write("Face Left Right Origin WhichEye 3DGaze 3DHead 2DGaze 2DHead Rmat Smat GazeOrigin\n")
#     subdirs=['face',
#              'lefteye',
#              'righteye',
#              'rotation_matrix',
#              'rotation_matrix_flipped',
#              '3d_gaze',
#              '3d_gaze_flipped',
#              '2d_gaze',
#              'eye_coords']
#     for directory in subdirs:
#         if not os.path.exists(os.path.join(im_outpath,directory)):
#             os.makedirs(os.path.join(im_outpath,directory))

#     with open(sample_list) as infile:
#         im_list = infile.readlines()
#         total = len(im_list)

#     for count, info in enumerate(im_list):

#         progressbar = "".join(["\033[41m%s\033[0m" % '   '] * int(count/total * 20))
#         progressbar = "\r" + progressbar + f" {count}|{total}"
#         print(progressbar, end = "", flush=True)

#         # Read image info
#         im_info, which_eye = info.strip().split(" ")
#         day, im_name = im_info.split("/")
#         im_number = int(im_name.split(".")[0])

#         # Read image annotation and image
#         im_path = os.path.join(im_root, day, im_name)
#         im = cv2.imread(im_path)
#         annotation = anno_dict[im_info]
#         annotation = AnnoDecode(annotation)
#         origin = annotation["facecenter"]

#         # Normalize the image
#         norm = dpc.norm(center = annotation["facecenter"],
#                         gazetarget = annotation["target"],
#                         headrotvec = annotation["headrotvectors"],
#                         imsize = (224, 224),
#                         camparams = camera)

#         im_face = norm.GetImage(im)

#         # Crop left eye images
#         llc = norm.GetNewPos(annotation["left_left_corner"])
#         lrc = norm.GetNewPos(annotation["left_right_corner"])
#         im_left = norm.CropEye(llc, lrc)
#         im_left = dpc.EqualizeHist(im_left)

#         # Crop Right eye images
#         rlc = norm.GetNewPos(annotation["right_left_corner"])
#         rrc = norm.GetNewPos(annotation["right_right_corner"])
#         im_right = norm.CropEye(rlc, rrc)
#         im_right = dpc.EqualizeHist(im_right)

#         # Acquire essential info
#         gaze = norm.GetGaze(scale=scale)
#         head = norm.GetHeadRot(vector=True)
#         origin = norm.GetCoordinate(annotation["facecenter"])
#         rvec, svec = norm.GetParams()

#         # flip the images when it is right eyes
#         if which_eye == "left":
#             pass
#         elif which_eye == "right":
#             im_face = cv2.flip(im_face, 1)
#             im_left = cv2.flip(im_left, 1)
#             im_right = cv2.flip(im_right, 1)

#             temp = im_left
#             im_left = im_right
#             im_right = temp

#             gaze = dpc.GazeFlip(gaze)
#             head = dpc.HeadFlip(head)
#             origin[0] = -origin[0]

#         gaze_2d = dpc.GazeTo2d(gaze)
#         head_2d = dpc.HeadTo2d(head)

#         # Save the acquired info
#         cv2.imwrite(os.path.join(im_outpath, "face", str(count+1)+".jpg"), im_face)
#         cv2.imwrite(os.path.join(im_outpath, "lefteye", str(count+1)+".jpg"), im_left)
#         cv2.imwrite(os.path.join(im_outpath, "righteye", str(count+1)+".jpg"), im_right)
#         rotation_matrix=norm.GetHeadRot(vector=False)
#         rotation_matrix_flipped=dpc.FlipRot(head)
#         _3d_gaze_flipped=dpc.GazeFlip(gaze)

#         np.save(os.path.join(im_outpath,'rotation_matrix',str(count+1)+'.npy'), rotation_matrix)
#         np.save(os.path.join(im_outpath,'rotation_matrix_flipped',str(count+1)+'.npy'), rotation_matrix_flipped)
#         np.save(os.path.join(im_outpath,'2d_gaze',str(count+1)+'.npy'), gaze_2d)
#         np.save(os.path.join(im_outpath,'3d_gaze',str(count+1)+'.npy'), gaze)
#         np.save(os.path.join(im_outpath,'3d_gaze_flipped',str(count+1)+'.npy'), _3d_gaze_flipped)
#         save_name_face = os.path.join(person, "face", str(count+1) + ".jpg")
#         save_name_left = os.path.join(person, "lefteye", str(count+1) + ".jpg")
#         save_name_right = os.path.join(person, "righteye", str(count+1) + ".jpg")

#         save_origin = im_info
#         save_flag = which_eye
#         save_gaze = ",".join(gaze.astype("str"))
#         save_head = ",".join(head.astype("str"))
#         save_gaze2d = ",".join(gaze_2d.astype("str"))
#         save_head2d = ",".join(head_2d.astype("str"))
#         save_rvec = ",".join(rvec.astype("str"))
#         save_svec = ",".join(svec.astype("str"))
#         origin = ",".join(origin.astype("str"))

#         save_str = " ".join([save_name_face, save_name_left, save_name_right, save_origin, save_flag, save_gaze, save_head, save_gaze2d, save_head2d, save_rvec, save_svec, origin])

#         outfile.write(save_str + "\n")
#         # print(im_root,im_number,im_name,day)
#         eye_coord_file=f'{MPII_path}/{person}/{day}/annotation.txt'
#         # print(eye_coord_file)
#         coords=linecache.getline(eye_coord_file,im_number).split(' ')[-6:]
#         coords[-1]=coords[-1][:-1]
#         coords=np.array(list(map(float,coords)))
#         np.save(os.path.join(im_outpath,'eye_coords',str(count+1)+'.npy'), coords)

#     print("")
#     outfile.close()


# def AnnoDecode(anno_info):
# 	annotation = np.array(anno_info).astype("float32")
# 	out = {}
# 	out["left_left_corner"] = annotation[2:4]
# 	out["left_right_corner"] = annotation[4:6]
# 	out["right_left_corner"] = annotation[6:8]
# 	out["right_right_corner"] = annotation[8:10]
# 	out["headrotvectors"] = annotation[14:17]
# 	out["headtransvectors"] = annotation[17:20]
# 	out["facecenter"] = annotation[20:23]
# 	out["target"] = annotation[23:26]
# 	return out


# if __name__ == "__main__":
#     ImageProcessing_MPII()

# Analyzing Model's robustness in handling variations in calibration parameters

In [9]:
# Calculate angular error
def compute_angular_error(y_true, y_pred):
    y_true = tf.nn.l2_normalize(y_true, axis=-1)
    y_pred = tf.nn.l2_normalize(y_pred, axis=-1)
    dot_product = tf.reduce_sum(tf.multiply(y_true, y_pred), axis=-1)
    angular_error = tf.acos(tf.clip_by_value(dot_product, -1.0, 1.0))
    return tf.reduce_mean(angular_error) * (180.0 / np.pi)

# Define the evaluation function with calibration variations for angular error
def evaluate_model_with_calibration_variations_angular_error(model, dataloader, model_type='ptge', variation_range=(-0.1, 0.1)):
    total_error = 0.0
    num_batches = 0
    results = []

    for i, (face_image, left_eye_image, right_eye_image, rotation_matrix, rotation_matrix_flipped, gaze_2d, gaze_3d, gaze_3d_flipped, eye_coords, subject_id) in enumerate(dataloader):
        # Apply variations to calibration parameters (rotation_matrix)
        variation = np.random.uniform(variation_range[0], variation_range[1], rotation_matrix.shape)
        varied_rotation_matrix = rotation_matrix + variation

        subject_indices = [int(s.decode().split('p')[1]) for s in subject_id.numpy()]

        input_dict = {
            'eye_coords': tf.convert_to_tensor(eye_coords, dtype=tf.float32),
            'face': tf.convert_to_tensor(face_image, dtype=tf.float32),
            'flipped_face': tf.convert_to_tensor(tf.image.flip_left_right(face_image), dtype=tf.float32),
            'id': tf.convert_to_tensor(subject_indices, dtype=tf.int32),
            'lefteye': tf.convert_to_tensor(left_eye_image, dtype=tf.float32),
            'righteye': tf.convert_to_tensor(right_eye_image, dtype=tf.float32),
            'rotation_matrix': tf.convert_to_tensor(varied_rotation_matrix, dtype=tf.float32)
        }

        calibration_input_dict = input_dict.copy()
        calibration_input_dict['rotation_matrix_flipped'] = tf.convert_to_tensor(rotation_matrix_flipped, dtype=tf.float32)

        try:
            if model_type == 'ptge':
                # First, get the initial gaze estimation from the Gaze Model
                initial_gaze_estimation = model['gaze_model'](input_dict)

                # Now, use the Calibration Model to refine the gaze estimation
                calibration_input_dict['gaze'] = initial_gaze_estimation
                calibration_input_dict['gaze_flipped'] = initial_gaze_estimation  # No flipping, use as is

                refined_gaze_estimation = model['calibration_model'](calibration_input_dict)

                # Ensure the refined_gaze_estimation shape matches the gaze_3d shape
                refined_gaze_estimation = refined_gaze_estimation[:, :3]  # Only take the first 3 columns

                # Calculate angular error
                angular_error = compute_angular_error(gaze_3d, refined_gaze_estimation)
                results.append((gaze_3d.numpy(), refined_gaze_estimation.numpy()))

            elif model_type == 'spaze':
                # Get the gaze estimation from the SPAZE Model
                spaze_gaze_estimation = model['spaze_model'](face_image, training=False)

                # Calculate angular error
                angular_error = compute_angular_error(gaze_2d, spaze_gaze_estimation)
                results.append((gaze_2d.numpy(), spaze_gaze_estimation.numpy()))

            total_error += angular_error.numpy()
            num_batches += 1

        except Exception as e:
            print(f"Error during gaze model prediction: {str(e)}")
            continue

    average_error = total_error / num_batches if num_batches > 0 else float('inf')
    return average_error, results

# Define the function to evaluate both models and save results
def evaluate_and_save_angular_error_results(subjects, variation_ranges):
    # Initialize the results dictionary
    results = {
        'variation_range': [],
        'ptge_angular_error': [],
        'spaze_angular_error': [],
    }

    for subject in subjects:
        print(f"Evaluating for subject: {subject}")
        val_dataloader = GazeDataset(subject_to_leave_out=subject, batch_size=8)

        for variation_range in variation_ranges:
            print(f"Evaluating with variation range: {variation_range}")

            # Evaluate PTGE model with calibration variations for angular error
            ptge_angular_error, _ = evaluate_model_with_calibration_variations_angular_error(
                {'gaze_model': gaze_model, 'calibration_model': calibration_model}, 
                val_dataloader, 
                model_type='ptge', 
                variation_range=variation_range
            )
            # Print PTGE angular error
            print(f"PTGE Angular Error for variation range {variation_range}: {ptge_angular_error}")
            
            # Evaluate SPAZE model with calibration variations for angular error
            spaze_angular_error, _ = evaluate_model_with_calibration_variations_angular_error(
                {'spaze_model': spaze_model}, 
                val_dataloader, 
                model_type='spaze', 
                variation_range=variation_range
            )
            # Print SPAZE angular error
            print(f"SPAZE Angular Error for variation range {variation_range}: {spaze_angular_error}")
            
            results['variation_range'].append(variation_range)
            results['ptge_angular_error'].append(ptge_angular_error)
            results['spaze_angular_error'].append(spaze_angular_error)

    # Convert the results to a DataFrame and save
    df_results = pd.DataFrame(results)
    df_results.to_csv('ptge_spaze_angular_error_results-variations-calibration-params.csv', index=False)
    print("Results saved to 'ptge_spaze_angular_error_results-variations-calibration-params.csv'.")

# Define subjects and variation ranges
subjects = ['p00']
variation_ranges = [(-0.1, 0.1), (-0.2, 0.2), (-0.3, 0.3)]

# Evaluate and save results
evaluate_and_save_angular_error_results(subjects, variation_ranges)


Evaluating for subject: p00
Evaluating with variation range: (-0.1, 0.1)
PTGE Angular Error for variation range (-0.1, 0.1): 96.10783111572266
SPAZE Angular Error for variation range (-0.1, 0.1): 79.62430246988933
Evaluating with variation range: (-0.2, 0.2)
PTGE Angular Error for variation range (-0.2, 0.2): 96.10786414591472
SPAZE Angular Error for variation range (-0.2, 0.2): 79.62430246988933
Evaluating with variation range: (-0.3, 0.3)
PTGE Angular Error for variation range (-0.3, 0.3): 96.10500227864583
SPAZE Angular Error for variation range (-0.3, 0.3): 79.62430246988933
Results saved to 'ptge_spaze_angular_error_results-variations-calibration-params.csv'.


# Analyzing Model's robustness with clean data and in handling corrupted data

In [11]:

# Define the evaluation function
def evaluate_model_angular_error(model, dataloader, corruption=None, model_type='ptge'):
    total_angular_error = 0.0
    num_batches = 0
    results = []

    for i, (face_image, left_eye_image, right_eye_image, rotation_matrix, rotation_matrix_flipped, gaze_2d, gaze_3d, gaze_3d_flipped, eye_coords, subject_id) in enumerate(dataloader):
        if corruption == 'noise':
            face_image += tf.random.normal(face_image.shape, mean=0.0, stddev=0.1)
            left_eye_image += tf.random.normal(left_eye_image.shape, mean=0.0, stddev=0.1)
            right_eye_image += tf.random.normal(right_eye_image.shape, mean=0.0, stddev=0.1)
        elif corruption == 'blur':
            face_image = tf.nn.conv2d(face_image, tf.random.normal((3, 3, 3, 3), mean=0.0, stddev=1.0), strides=[1, 1, 1, 1], padding='SAME')
            left_eye_image = tf.nn.conv2d(left_eye_image, tf.random.normal((3, 3, 3, 3), mean=0.0, stddev=1.0), strides=[1, 1, 1, 1], padding='SAME')
            right_eye_image = tf.nn.conv2d(right_eye_image, tf.random.normal((3, 3, 3, 3), mean=0.0, stddev=1.0), strides=[1, 1, 1, 1], padding='SAME')

        subject_indices = [int(s.decode().split('p')[1]) for s in subject_id.numpy()]

        input_dict = {
            'eye_coords': tf.convert_to_tensor(eye_coords, dtype=tf.float32),
            'face': tf.convert_to_tensor(face_image, dtype=tf.float32),
            'flipped_face': tf.convert_to_tensor(tf.image.flip_left_right(face_image), dtype=tf.float32),
            'id': tf.convert_to_tensor(subject_indices, dtype=tf.int32),
            'lefteye': tf.convert_to_tensor(left_eye_image, dtype=tf.float32),
            'righteye': tf.convert_to_tensor(right_eye_image, dtype=tf.float32),
            'rotation_matrix': tf.convert_to_tensor(rotation_matrix, dtype=tf.float32),
        }

        if model_type == 'ptge':
            input_tuple = (input_dict,)
            initial_gaze_estimation = model['gaze_model'](*input_tuple, training=False)

            calibration_input_dict = input_dict.copy()
            calibration_input_dict.update({
                'gaze': initial_gaze_estimation,
                'gaze_flipped': initial_gaze_estimation,
                'rotation_matrix_flipped': tf.convert_to_tensor(rotation_matrix_flipped, dtype=tf.float32)
            })

            calibration_input_tuple = (calibration_input_dict,)
            refined_gaze_estimation = model['calibration_model'](*calibration_input_tuple, training=False)
            refined_gaze_estimation = refined_gaze_estimation[:, :3]

            angular_error_value = compute_angular_error(gaze_3d, refined_gaze_estimation)
            results.append((gaze_3d.numpy(), refined_gaze_estimation.numpy()))

        elif model_type == 'spaze':
            spaze_gaze_estimation = model['spaze_model'](face_image, training=False)
            angular_error_value = compute_angular_error(gaze_2d, spaze_gaze_estimation)
            results.append((gaze_2d.numpy(), spaze_gaze_estimation.numpy()))

        total_angular_error += angular_error_value.numpy()
        num_batches += 1

    average_angular_error = total_angular_error / num_batches if num_batches > 0 else float('inf')
    return average_angular_error, results

# Define the function to evaluate both models and save results
def evaluate_and_save_angular_error_results(subjects):
    results = {
        'subject': [],
        'ptge_clean_angular_error': [],
        'ptge_corrupted_angular_error_noise': [],
        'ptge_corrupted_angular_error_blur': [],
        'spaze_clean_angular_error': [],
        'spaze_corrupted_angular_error_noise': [],
        'spaze_corrupted_angular_error_blur': [],
    }

    for subject in subjects:
        print(f"Evaluating for subject: {subject}")
        val_dataloader = GazeDataset(subject_to_leave_out=subject, batch_size=8)

        # Evaluate PTGE model on clean data
        ptge_clean_angular_error, _ = evaluate_model_angular_error({'gaze_model': gaze_model, 'calibration_model': calibration_model}, val_dataloader, model_type='ptge')
        print(f"PTGE Clean Angular Error: {ptge_clean_angular_error}")

        # Evaluate PTGE model on corrupted data (noise)
        ptge_corrupted_angular_error_noise, _ = evaluate_model_angular_error({'gaze_model': gaze_model, 'calibration_model': calibration_model}, val_dataloader, corruption='noise', model_type='ptge')
        print(f"PTGE Corrupted Angular Error (Noise): {ptge_corrupted_angular_error_noise}")

        # Evaluate PTGE model on corrupted data (blur)
        ptge_corrupted_angular_error_blur, _ = evaluate_model_angular_error({'gaze_model': gaze_model, 'calibration_model': calibration_model}, val_dataloader, corruption='blur', model_type='ptge')
        print(f"PTGE Corrupted Angular Error (Blur): {ptge_corrupted_angular_error_blur}")

        # Evaluate SPAZE model on clean data
        spaze_clean_angular_error, _ = evaluate_model_angular_error({'spaze_model': spaze_model}, val_dataloader, model_type='spaze')
        print(f"SPAZE Clean Angular Error: {spaze_clean_angular_error}")

        # Evaluate SPAZE model on corrupted data (noise)
        spaze_corrupted_angular_error_noise, _ = evaluate_model_angular_error({'spaze_model': spaze_model}, val_dataloader, corruption='noise', model_type='spaze')
        print(f"SPAZE Corrupted Angular Error (Noise): {spaze_corrupted_angular_error_noise}")

        # Evaluate SPAZE model on corrupted data (blur)
        spaze_corrupted_angular_error_blur, _ = evaluate_model_angular_error({'spaze_model': spaze_model}, val_dataloader, corruption='blur', model_type='spaze')
        print(f"SPAZE Corrupted Angular Error (Blur): {spaze_corrupted_angular_error_blur}")

        results['subject'].append(subject)
        results['ptge_clean_angular_error'].append(ptge_clean_angular_error)
        results['ptge_corrupted_angular_error_noise'].append(ptge_corrupted_angular_error_noise)
        results['ptge_corrupted_angular_error_blur'].append(ptge_corrupted_angular_error_blur)
        results['spaze_clean_angular_error'].append(spaze_clean_angular_error)
        results['spaze_corrupted_angular_error_noise'].append(spaze_corrupted_angular_error_noise)
        results['spaze_corrupted_angular_error_blur'].append(spaze_corrupted_angular_error_blur)

    # Convert the results to a DataFrame and save
    df_results = pd.DataFrame(results)
    df_results.to_csv('ptge_spaze_angular_error_results-clean-corrupted-data.csv', index=False)
    print("Results saved to 'ptge_spaze_angular_error_results-clean-corrupted-data.csv'.")

# Define subjects
subjects = ['p00']

# Evaluate and save angular error results
evaluate_and_save_angular_error_results(subjects)


Evaluating for subject: p00
PTGE Clean Angular Error: 96.10797342936198
PTGE Corrupted Angular Error (Noise): 96.10800799560546
PTGE Corrupted Angular Error (Blur): 96.1067521870931
SPAZE Clean Angular Error: 79.62430246988933
SPAZE Corrupted Angular Error (Noise): 74.22585896046957
SPAZE Corrupted Angular Error (Blur): 80.8032314453125
Results saved to 'ptge_spaze_angular_error_results-clean-corrupted-data.csv'.
