### Saurabh Chatterjee
### chatterjeesaurabh38@gmail.com

In [1]:
from operator import rshift
import cv2 as cv
import numpy as np
import mediapipe as mp
import matplotlib.pyplot as plt
import math
import time
import csv

In [6]:

mp_face_mesh = mp.solutions.face_mesh         # Face mesh landmarks object

LEFT_EYE = [ 362, 382, 381, 380, 374, 373, 390, 249, 263, 466, 388, 387, 386, 385,384, 398 ]
RIGHT_EYE = [ 33, 7, 163, 144, 145, 153, 154, 155, 133, 173, 157, 158, 159, 160, 161, 246 ]

LEFT_IRIS = [474,475, 476, 477]
RIGHT_IRIS = [469, 470, 471, 472]

# Eye Corners:
R_H_LEFT = [33] # right eye right most landmark
R_H_RIGHT = [133] # right eye left most landmark
L_H_LEFT = [362] # left eye right most landmark
L_H_RIGHT = [263] # left eye left most landmark

def euclidean_distance(point1, point2):
    x1, y1 = point1.ravel()
    x2, y2 = point2.ravel()
    distance = math.sqrt((x2-x1)**2 + (y2-y1)**2)
    return distance

def iris_position(iris_center, left_point, right_point):
    center_to_left_dist = euclidean_distance(iris_center, left_point)
    total_distance = euclidean_distance(right_point, left_point)
    norm_dist = center_to_left_dist/total_distance

def calculate_relative_coordinates(iris_center, eye_corner1, eye_corner2):
    # Calculate New iris center Coordinates wrt Eye Corner, with line joining the two corners as new x-axis, one of the eye corner as origin
    # Calculate the vector components of the line connecting the two eye corners
    delta_x = eye_corner2[0] - eye_corner1[0]
    delta_y = eye_corner2[1] - eye_corner1[1]
    delta = math.sqrt(delta_x**2 + delta_y**2)

    # Calculate the relative coordinates of the iris center

    x_2 = iris_center[0] - eye_corner1[0]    #
    y_2 = iris_center[1] - eye_corner1[1]    #
    relative_x_2 = ((iris_center[0] - eye_corner1[0]) * delta_x + (iris_center[1] - eye_corner1[1]) * delta_y)/delta  #
    relative_y_2 = (-(iris_center[0] - eye_corner1[0]) * delta_y + (iris_center[1] - eye_corner1[1]) * delta_x)/delta  #

    relative_x = ((iris_center[0] - eye_corner1[0]) * delta_x + (iris_center[1] - eye_corner1[1]) * delta_y)/delta**2   #  Again divided by delta to Normalise the eye x-movement between 0-1
    relative_y = (-(iris_center[0] - eye_corner1[0]) * delta_y + (iris_center[1] - eye_corner1[1]) * delta_x)/delta**2  # Again divided by delta (NOT delta/4) to Normalise the eye y-movement between 0-1 

    # Calculate the distance and angle in the new coordinate system
    #distance = math.sqrt(relative_x ** 2 + relative_y ** 2)
    #angle = math.atan2(relative_y, relative_x)

    frame_time = time.time()
    
    return relative_x, relative_y, frame_time, delta, x_2, y_2, relative_x_2, relative_y_2, iris_center[0], iris_center[1]
    

center_left = [0, 0]              # Declaring them Global Variables, so that if in a frame loop eye is not detected then they take last updated values
left_eye_left_corner = [0, 0]
left_eye_right_corner = [0, 0]

iris_relative_coords = []    # Stores Iris Center locations from each frame


prev_frame_time = 0
new_frame_time = 0


#cap = cv.VideoCapture('Dataset_Blink.mp4')
cap = cv.VideoCapture(0)
#cap = cv.VideoCapture('for_detection_error_calliberation_1080p_120fps_60cm.mp4')   # Use for Error Calliberation
#cap = cv.VideoCapture('http://192.168.42.129:8080/video')
#cap = cv.VideoCapture('60_fps_face_video.mp4')


# fourcc = cv.VideoWriter_fourcc(*'XVID')               #
# output_file = 'output_video.mp4'                       #
# video_writer = cv.VideoWriter(output_file, fourcc, 30, (640, 480))      #


with mp_face_mesh.FaceMesh(
    max_num_faces=1,
    refine_landmarks=True,      # Get 478 landmarks (instead of 468): 10 more for IRIS landmarks
    min_detection_confidence=0.5,
    min_tracking_confidence=0.5
) as face_mesh:
    while True:
        ret, frame = cap. read()

        if not ret:
            break

        frame = cv.flip(frame, 1)     # Flip image so the frame does not appear mirror image
        rgb_frame = cv.cvtColor(frame, cv.COLOR_BGR2RGB)    # cv2 takes BGR images - Convert to RGB for mediapipe
        img_h, img_w = frame.shape[:2]


        results = face_mesh.process(rgb_frame)   # Activate face_mesh to produce face landmarks on the image # Output is Normalised bw 0-1

        if results.multi_face_landmarks:
            #print(results.multi_face_landmarks[0].landmark)

            # Multiply by frame width-height to get landmarks values in pixels:
            mesh_points = np.array([np.multiply([p.x, p.y], [img_w, img_h]).astype(int) for p in results.multi_face_landmarks[0].landmark])  
            # print(mesh_points)
            cv.polylines(frame, [mesh_points[LEFT_EYE]], True, (0,255,0), 1, cv.LINE_AA)
            cv.polylines(frame, [mesh_points[RIGHT_EYE]], True, (0,255,0), 1, cv.LINE_AA)
            (l_cx, l_cy), l_radius = cv.minEnclosingCircle(mesh_points[LEFT_IRIS])
            (r_cx, r_cy), r_radius = cv.minEnclosingCircle (mesh_points[RIGHT_IRIS])
            center_left = np.array([l_cx, l_cy], dtype=np.int32)
            center_right = np.array([r_cx, r_cy], dtype=np.int32)

            cv.circle(frame, center_left, int(l_radius), (0,0,255), 1, cv.LINE_AA)    # Draw circle on Iris
            cv.circle(frame, center_right, int(r_radius), (255,0,0), 1, cv.LINE_AA)

            left_eye_left_corner = mesh_points[L_H_LEFT][0]
            left_eye_right_corner = mesh_points[L_H_RIGHT][0]

            cv.circle(frame, left_eye_left_corner, 2, (0,255,255), -1, cv.LINE_AA)   # Draw circle on Eye Corner
            cv.circle(frame, left_eye_right_corner, 2, (255,255,255), -1, cv.LINE_AA)
        


        iris_relative_coords.append(calculate_relative_coordinates(center_left, left_eye_left_corner, left_eye_right_corner))
        

        # For FPS Display:
        new_frame_time = time.time()
        time_diff = new_frame_time-prev_frame_time
        fps = 1/(time_diff)
        prev_frame_time = new_frame_time
    
        # converting the fps into integer
        fps = int(fps)
        # converting the fps to string so that we can display it on frame by using putText function
        fps = str(fps)

        # putting the FPS count on the frame
        cv.putText(frame, fps, (7, 70), cv.FONT_HERSHEY_SIMPLEX, 2, (100, 255, 0), 3, cv.LINE_AA)


        cv.imshow('img', frame)

        #video_writer.write(frame)   #

        if cv.waitKey(1) & 0xFF == ord('q'):
            break

iris_relative_coords_array = np.array(iris_relative_coords)

csv_fields = ['relative_x', 'relative_y', 'frame_time', 'delta', 'x_2', 'y_2', 'relative_x_2', 'relative_y_2', 'iris_center_x', 'iris_center_y']
csv_filename = "eye_data.csv"  

with open(csv_filename, "w", newline="") as file:
    writer = csv.writer(file)
    writer.writerow(csv_fields)
    writer.writerows(iris_relative_coords)



#video_writer.release()
cap.release()
cv.destroyAllWindows()

In [21]:
print(iris_relative_coords)
print(len(iris_relative_coords))

[(0.5156081808396125, -0.22820236813778252, 1688633788.685549, 43.104524124504614, 22, -4, 22.22504526979301, -2.4591386206660326, 309, 209), (0.5490084985835694, -0.23342776203966006, 1688633788.720557, 42.01190307520001, 23, -3, 23.064891829954, -2.4516861284677627, 310, 209), (0.5497737556561085, -0.18099547511312214, 1688633788.7405617, 42.04759208325728, 23, -3, 23.11666261590841, -1.9026059766179761, 310, 209), (0.5486425339366515, -0.08597285067873302, 1688633788.7685673, 42.04759208325728, 23, -2, 23.069097466492963, -0.9037378388935388, 310, 210), (0.5260180995475112, -0.1855203619909502, 1688633788.800574, 42.04759208325728, 22, -3, 22.117794478183974, -1.9501711260334256, 309, 209), (0.5497737556561085, -0.18099547511312214, 1688633788.8315823, 42.04759208325728, 23, -3, 23.11666261590841, -1.9026059766179761, 310, 209), (0.5248868778280542, -0.09049773755656107, 1688633788.8635938, 42.04759208325728, 22, -2, 22.070229328768523, -0.9513029883089881, 310, 210), (0.53567567567

In [25]:
# **RUN THIS ONCE ONLY - Not at every full run of the Code**
# For calculating Saccades THRESHOLD: [Keep looking at a single point (DON'T BLINK) for few seconds and stop the code]

def estimate_threshold(x_coords, y_coords):     # [First Keep looking at a single point (DON'T BLINK) for few seconds and stop the code]
    sq_sum_x = 0
    sq_sum_y = 0

    for i in range(1, len(x_coords)):
        sq_sum_x += np.square(x_coords[i]-x_coords[i-1])
        sq_sum_y += np.square(y_coords[i]-y_coords[i-1])

    threshold_x = np.sqrt(sq_sum_x/(len(x_coords)-1))   #RMSE
    threshold_y = np.sqrt(sq_sum_y/(len(x_coords)-1))

    return threshold_x, threshold_y


threshold_x, threshold_y = estimate_threshold(iris_relative_coords_array[:,0], iris_relative_coords_array[:,1])

print(f'Threshold X: {threshold_x} , Threshold Y: {threshold_y}')

Threshold X: 0.010380128112277578 , Threshold Y: 0.04755125559437479


In [43]:
def calculate_distance(point1, point2):
    """Calculate the Euclidean distance between two points."""
    x1, y1 = point1
    x2, y2 = point2

    delta_x = x2-x1
    delta_y = y2-y1

    return math.sqrt((delta_x)**2 + (delta_x)**2), delta_x, delta_y

def calculate_centroid(fixation_group):
    add_x = 0
    add_y = 0
    for j in range(len(fixation_group)):
        add_x += fixation_group[j][0]
        add_y += fixation_group[j][1]
    centroid_x = add_x/len(fixation_group)
    centroid_y = add_y/len(fixation_group)

    return [centroid_x, centroid_y]


def estimate_saccades_fixations(iris_coordinates, frame_times, threshold_x, threshold_y, speed_threshold, max_dispersion_threshold):
    iris_coordinates = iris_coordinates.tolist()

    saccades = []
    fixations = []

    temp_saccades_list = []
    temp_fix_list = []
    
    is_saccades = False      # Flag to track saccades group

    prev_coords = iris_coordinates[0]
    prev_time = frame_times[0]
    
    speed_list = []
    elliptic_distance_list = []

    for i in range(1, len(iris_coordinates)):
        curr_coords = iris_coordinates[i]
        curr_time = frame_times[i]
        
        distance, delta_x, delta_y = calculate_distance(prev_coords, curr_coords)
        time_diff = curr_time - prev_time
        speed = distance / time_diff
        speed = np.absolute(speed)

        speed_list.append(speed)

        coords_and_time = curr_coords.copy()
        coords_and_time.append(curr_time)       # To keep current coords and its time together 
        
        elliptic_distance = (delta_x/threshold_x)**2 + (delta_y/threshold_y)**2
        elliptic_distance_list.append(elliptic_distance)


        if (elliptic_distance)>1:
            if speed > speed_threshold:
                # It's a saccades group
        
                # Check if it's a new saccades group
                if not is_saccades:
                    # Transfer previous fixations data to fixations_list
                    if temp_fix_list:
                        fixations.append(temp_fix_list)
                        temp_fix_list = []

                    # Reset temporary saccades list
                    temp_saccades_list = []
        
                # Add current saccades group to temporary saccades list
                temp_saccades_list.append(coords_and_time)
                
                is_saccades = True


        else:
                # It's a fixation group
        
                # Check if it's the end of a saccades group
                if is_saccades:
                    # Transfer previous saccades data to saccades_list
                    if temp_saccades_list:
                        saccades.append(temp_saccades_list)
                        temp_saccades_list = []

                    # Reset is_saccades flag
                    is_saccades = False
                
                # Add current fixations group to temporary fixations list
                temp_fix_list.append(coords_and_time)
        
        prev_coords = curr_coords
        prev_time = curr_time


        merged_fixations = []
        previous_centroid = None
        max_dispersion_distance = max_dispersion_threshold

        for i in range(len(fixations)):
            fixation = fixations[i]
            centroid = calculate_centroid(fixation)

            if previous_centroid is not None:
                dispersion_distance = math.sqrt((centroid[0] - previous_centroid[0]) ** 2 + (centroid[1] - previous_centroid[1]) ** 2)

                if dispersion_distance <= max_dispersion_distance:
                    merged_fixations[-1].extend(fixation)
                else:
                    merged_fixations.append(fixation)
            else:
                merged_fixations.append(fixation)

            previous_centroid = centroid

    
    return saccades, fixations, merged_fixations, speed_list, elliptic_distance_list

In [44]:
speed_threshold = np.sqrt(threshold_x**2 + threshold_y**2)*30
max_dispersion_threshold = 0.01

saccades, fixations, merged_fixations, speed_list, elliptic_distance_list = estimate_saccades_fixations(iris_relative_coords_array[:,:2], iris_relative_coords_array[:,2], threshold_x, threshold_y, speed_threshold, max_dispersion_threshold)


# centroid_diff_list = []         #
# for i in range(1, len(fixations)):     #
#     centroid_diff = list(np.subtract(np.array(calculate_centroid(fixations[i])), np.array(calculate_centroid(fixations[i-1]))))
#     centroid_diff_list.append(centroid_diff)


# plt.figure(figsize=(10,6))
# plt.ylim(0,10)
# plt.stem(speed_list, markerfmt=" ", basefmt="0 ")
# plt.xlabel('time or frame')
# plt.ylabel('speed')
# plt.plot([0, len(speed_list)], [speed_threshold, speed_threshold], color='orange')
# plt.plot(elliptic_distance_list, color='brown' )
# plt.legend(['Speed Threshold', 'Elliptical Distance', 'Speed'])

# print('Saccades: ', saccades)
# print(f'No. of Fixations: {len(fixations)}, Fixations: {fixations}')
# print(f'No. of Merged Fixatioms: {len(merged_fixations)}, Merged Fixations: {merged_fixations}')

# print('Adjacent Centroid Differences in Fixations: ', centroid_diff_list)    #




saccades_csv_filename = "saccades.csv"  
with open(saccades_csv_filename, "w", newline="") as file:
    writer = csv.writer(file)
    writer.writerows(saccades)

fixations_csv_filename = "fixations.csv"  
with open(fixations_csv_filename, "w", newline="") as file:
    writer = csv.writer(file)
    writer.writerows(fixations)

KeyboardInterrupt: 

In [41]:
# Saccades & Fixation Info: Store Summaries and Features

# SACCADES:
num_saccades = len(saccades)
print('Number of Saccades:', num_saccades)

total_sac_time = saccades[num_saccades-1][-1][2] - saccades[0][-1][2]
saccade_rate = num_saccades/total_sac_time
print(f'Saccade Rate: {saccade_rate} Hz')

saccade_info = []

for group in saccades:
    group_size = len(group)
    group_time_len = group[group_size-1][2] - group[0][2]      # This Saccades Group's Time Duration
    print('Saccade Group Time Duration: ', group_time_len)

    sacc_frame_speed_list = []

    if group_size>1:
        for i in range(1, group_size):
            frame_time_diff = group[i][2] - group[i-1][2]
            distance_cover,_ , _ = calculate_distance(group[i][:2], group[i-1][:2])
            sacc_frame_speed = distance_cover/frame_time_diff
            sacc_frame_speed_list.append(sacc_frame_speed)
            print('group speeds:', sacc_frame_speed)  
    else:
        sacc_frame_speed_list.append(0)
        print('group speeds:', 0)
    
    group_saccade_max_speed = max(sacc_frame_speed_list)
    print('Group Saccade Speed: ', group_saccade_max_speed)

    saccade_group_info = [group_time_len, group_saccade_max_speed]

    saccade_info.append(saccade_group_info)

print('saccade info: ', saccade_info)

max_saccade_duration = np.max([i[0] for i in saccade_info])
mean_saccade_duration = np.mean([i[0] for i in saccade_info])
sd_saccade_duration = np.std([i[0] for i in saccade_info])

max_saccades_speed = np.max([i[1] for i in saccade_info])

saccades_csv_fields = ['Saccades Time Lengths:', 'Max Saccades Speed:']
saccades_csv_filename = "saccades_info.csv"  

with open(saccades_csv_filename, "w", newline="") as file:
    writer = csv.writer(file)
    writer.writerow(('Number of Saccades: ', num_saccades))
    writer.writerow(('Saccade Rate (Hz): ', saccade_rate))
    writer.writerow(('Peak Saccadic Velocity: ', max_saccades_speed))
    writer.writerow(('Max Saccade Duration: ', max_saccade_duration))
    writer.writerow(('Mean Saccade Duration: ', mean_saccade_duration))
    writer.writerow(('Std Saccade Duration: ', sd_saccade_duration))
    
    writer.writerow(saccades_csv_fields)
    writer.writerows(saccade_info)


#-------------------------------------------------------------------------------------------------------

# FIXATIONS:
num_fixations = len(merged_fixations)
print('Number of Fixations:', num_fixations)

total_fix_time = merged_fixations[num_fixations-1][-1][2] - merged_fixations[0][-1][2]
fixation_rate = num_fixations/total_fix_time
print(f'Fixation Rate: {fixation_rate} Hz')

fixation_info = []

for fix_group in merged_fixations:
    fix_group_size = len(fix_group)
    fix_group_time_len = fix_group[fix_group_size-1][2] - fix_group[0][2]      # This Fixations Group's Time Duration
    print('Fixation Group Time Duration: ', fix_group_time_len)

    fixation_group_info = [fix_group_time_len]

    fixation_info.append(fixation_group_info)

print('Fixation info: ', fixation_info)


max_fixation_duration = max([i[0] for i in fixation_info])
mean_fixation_duration = np.mean([i[0] for i in fixation_info])
sd_fixation_duration = np.std([i[0] for i in fixation_info])

fixation_csv_fields = ['Fixation Time Lengths:']
fixation_csv_filename = "fixation_info.csv"  

with open(fixation_csv_filename, "w", newline="") as file:
    writer = csv.writer(file)
    writer.writerow(('Number of Fixations: ', num_fixations))
    writer.writerow(('Fixation Rate (Hz): ', fixation_rate))
    writer.writerow(('Max Fixation Duration: ', max_fixation_duration))
    writer.writerow(('Mean Fixation Duration: ', mean_fixation_duration))
    writer.writerow(('Std Fixation Duration: ', sd_fixation_duration))
    
    writer.writerow(fixation_csv_fields)
    writer.writerows(fixation_info)


Number of Saccades: 328
Saccade Rate: 3.114097435882032 Hz
Saccade Group Time Duration:  0.0
group speeds: 0
Group Saccade Speed:  0
Saccade Group Time Duration:  0.0
group speeds: 0
Group Saccade Speed:  0
Saccade Group Time Duration:  0.0
group speeds: 0
Group Saccade Speed:  0
Saccade Group Time Duration:  0.0
group speeds: 0
Group Saccade Speed:  0
Saccade Group Time Duration:  0.04901123046875
group speeds: 0.0
Group Saccade Speed:  0.0
Saccade Group Time Duration:  0.0
group speeds: 0
Group Saccade Speed:  0
Saccade Group Time Duration:  0.015003442764282227
group speeds: 1.5705515689253908
Group Saccade Speed:  1.5705515689253908
Saccade Group Time Duration:  0.0
group speeds: 0
Group Saccade Speed:  0
Saccade Group Time Duration:  0.0
group speeds: 0
Group Saccade Speed:  0
Saccade Group Time Duration:  0.01600337028503418
group speeds: 1.5855432615751408
Group Saccade Speed:  1.5855432615751408
Saccade Group Time Duration:  0.0
group speeds: 0
Group Saccade Speed:  0
Saccade G

### Saurabh Chatterjee
### chatterjeesaurabh38@gmail.com