In [None]:
#Importing necessary libraries
import os
import random
import uuid
import cv2
import time
import numpy as np
import json
import tensorflow as tf
import matplotlib.pyplot as plt
import albumentations as alb 

In [None]:
#create data/images and data labels to save image and labels
images_path=os.path.join('data','images')
labels_path=os.path.join('data','labels')
num_images=80
try:
    os.makedirs(os.path.join(images_path))
    os.makedirs(os.path.join(labels_path))
except Exception as e:
    print(e)

In [None]:
cap=cv2.VideoCapture(0)
for image_num in range(num_images):
    print(f'Collecting image {image_num}')
    ret,frame=cap.read()
    image_name=os.path.join(images_path,f'{str(uuid.uuid1())}.jpg')
    cv2.imwrite(image_name,frame)
    cv2.imshow('frame',frame)
    time.sleep(0.5)
    if cv2.waitKey(1)& 0xFF ==ord('q'):
        break
cap.release()
cv2.destroyAllWindows()

In [None]:
!labelme

In [None]:
try:
    for folder in ['train','val','test']:
        os.makedirs(os.path.join('data',folder,'images'))
        os.makedirs(os.path.join('data',folder,'labels'))
except Exception as e:
    print(e)
        

In [None]:
try:
    for folder in ['train','val','test']:
        os.makedirs(os.path.join('aug_data',folder,'images'))
        os.makedirs(os.path.join('aug_data',folder,'labels'))
except Exception as e:
    print(e)
       

In [None]:
#splitting a dataset of images and corresponding JSON label files into three sets: train, validation, and test. 
images_list=os.listdir(images_path)
data_len=len(images_list)
random.shuffle(images_list)
ratio=(0.7,0.2,0.1)
train_list=images_list[:int(ratio[0]*data_len)]
val_list=images_list[int(ratio[0]*data_len):int(ratio[0]*data_len)+int(ratio[1]*data_len)]
test_list=images_list[int(ratio[0]*data_len)+int(ratio[1]*data_len):]
for file in train_list:
    filename=file.split('.')[0]
    old_image_dir=os.path.join(images_path,filename+'.jpg')
    old_label_dir=os.path.join(labels_path,filename+'.json')
    new_image_dir=os.path.join('data','train','images',filename+'.jpg')
    new_label_dir=os.path.join('data','train','labels',filename+'.json')
    try:
        os.replace(old_image_dir,new_image_dir)
        os.replace(old_label_dir,new_label_dir)
    except Exception as e:
        print(e)
for file in val_list:
    filename=file.split('.')[0]
    old_image_dir=os.path.join(images_path,filename+'.jpg')
    old_label_dir=os.path.join(labels_path,filename+'.json')
    new_image_dir=os.path.join('data','val','images',filename+'.jpg')
    new_label_dir=os.path.join('data','val','labels',filename+'.json')
    try:
        os.replace(old_image_dir,new_image_dir)
        os.replace(old_label_dir,new_label_dir)
    except Exception as e:
        print(e)
for file in test_list:
    filename=file.split('.')[0]
    old_image_dir=os.path.join(images_path,filename+'.jpg')
    old_label_dir=os.path.join(labels_path,filename+'.json')
    new_image_dir=os.path.join('data','test','images',filename+'.jpg')
    new_label_dir=os.path.join('data','test','labels',filename+'.json')
    try:
        os.replace(old_image_dir,new_image_dir)
        os.replace(old_label_dir,new_label_dir)
    except Exception as e:
        print(e)

In [None]:
len(train_list),len(val_list),len(test_list)

In [None]:
#Creates an augmentation pipeline using Albumentations
augmentor=alb.Compose([alb.RandomCrop(width=450,height=450),
                      alb.HorizontalFlip(p=0.5),
                      alb.RandomBrightnessContrast(p=0.2),
                      alb.RandomGamma(p=0.2),
                      alb.RGBShift(0.2),
                      alb.VerticalFlip(p=0.5)],
                     keypoint_params=alb.KeypointParams(format='xy',label_fields=['class_labels']))

In [None]:
# loops over the three folders: 'train', 'val', and 'test' to augmentation
for folder in ['train','val','test']:
    for image in os.listdir(os.path.join('data',folder,'images')):
        image_path=os.path.join('data',folder,'images',image)
        img=cv2.imread(image_path)
        label_path=os.path.join('data',folder,'labels',f'{image.split(".")[0]}.json')
        classes=[0,0]
        coords=[0,0,0.0000001,0.0000001]
        
        if os.path.exists(label_path):
            with open(label_path,'r') as f:
                label=json.load(f)
            if label['shapes'][0]['label']=='left_eye':
                classes[0]=1
                coords[0] = np.squeeze(label['shapes'][0]['points'])[0]
                coords[1] = np.squeeze(label['shapes'][0]['points'])[1]
            if label['shapes'][0]['label']=='right_eye':
                classes[1]=1
                coords[2] = np.squeeze(label['shapes'][0]['points'])[0]
                coords[3] = np.squeeze(label['shapes'][0]['points'])[1]
            if len(label['shapes']) > 1:
                if label['shapes'][1]['label']=='left_eye':
                    classes[0]=1
                    coords[0] = np.squeeze(label['shapes'][1]['points'])[0]
                    coords[1] = np.squeeze(label['shapes'][1]['points'])[1]
                if label['shapes'][1]['label']=='right_eye':
                    classes[1]=1
                    coords[2] = np.squeeze(label['shapes'][1]['points'])[0]
                    coords[3] = np.squeeze(label['shapes'][1]['points'])[1]
            
            np.divide(coords, [640,480,640,480])
            
        try: 
            for x in range(40):
                keypoints = [(coords[:2]), (coords[2:])]
                augmented = augmentor(image=img, keypoints=keypoints, class_labels=['left_eye','right_eye'])
                cv2.imwrite(os.path.join('aug_data', folder, 'images', f'{image.split(".")[0]}.{x}.jpg'), augmented['image'])

                annotation = {}
                annotation['image'] = image
                annotation['class'] = [0,0]
                annotation['keypoints'] = [0,0,0,0]

                if os.path.exists(label_path):
                    if len(augmented['keypoints']) > 0: 
                        for idx, cl in enumerate(augmented['class_labels']):
                            if cl == 'left_eye': 
                                annotation['class'][0] = 1 
                                annotation['keypoints'][0] = augmented['keypoints'][idx][0]
                                annotation['keypoints'][1] = augmented['keypoints'][idx][1]
                            if cl == 'right_eye': 
                                annotation['class'][1] = 1 
                                annotation['keypoints'][2] = augmented['keypoints'][idx][0]
                                annotation['keypoints'][3] = augmented['keypoints'][idx][1]
                                
                annotation['keypoints'] = list(np.divide(annotation['keypoints'], [450,450,450,450]))


                with open(os.path.join('aug_data', folder, 'labels', f'{image.split(".")[0]}.{x}.json'), 'w') as f:
                    json.dump(annotation, f)

        except Exception as e:
            print(e)

In [None]:
#read an image file given its file path 
def load_image(x):
    byte_img=tf.io.read_file(x)
    img=tf.io.decode_jpeg(byte_img)
    return img

In [None]:
#load and preprocess image data for the train, validation, and test sets
train_images=tf.data.Dataset.list_files(os.path.join('aug_data','train','images','*'),shuffle=False)
train_images=train_images.map(load_image)
train_images=train_images.map(lambda x: tf.image.resize(x,(250,250))/255.0) #resize and normalize images
val_images=tf.data.Dataset.list_files(os.path.join('aug_data','val','images','*'),shuffle=False)
val_images=val_images.map(load_image)
val_images=val_images.map(lambda x:tf.image.resize(x,(250,250))/255.0)
test_images=tf.data.Dataset.list_files(os.path.join('aug_data','test','images','*'),shuffle=False)
test_images=test_images.map(load_image)
test_images=test_images.map(lambda x: tf.image.resize(x,(250,250))/255.0)

In [None]:
def load_labels(label_path):
    with open(label_path.numpy(), 'r', encoding = "utf-8") as f:
        label = json.load(f)
    return [label['keypoints']]

In [None]:
train_labels = tf.data.Dataset.list_files(os.path.join('aug_data','train','labels','*'), shuffle=False)
train_labels = train_labels.map(lambda x: tf.py_function(load_labels, [x], [tf.float16]))
val_labels = tf.data.Dataset.list_files(os.path.join('aug_data','val','labels','*'), shuffle=False)
val_labels = val_labels.map(lambda x: tf.py_function(load_labels, [x], [tf.float16]))
test_labels = tf.data.Dataset.list_files(os.path.join('aug_data','test','labels','*'), shuffle=False)
test_labels = test_labels.map(lambda x: tf.py_function(load_labels, [x], [tf.float16]))

In [None]:
len(train_images),len(train_labels),len(val_images),len(val_labels),len(test_images),len(test_labels)

In [None]:
train=tf.data.Dataset.zip((train_images,train_labels))
train=train.shuffle(train.cardinality())
train=train.batch(8)
train=train.prefetch(4)
val=tf.data.Dataset.zip((val_images,val_labels))
val=val.shuffle(val.cardinality())
val=val.batch(8)
val=val.prefetch(4)
test=tf.data.Dataset.zip((test_images,test_labels))
test=test.shuffle(test.cardinality())
test=test.batch(8)
test=test.prefetch(4)

In [None]:
len(train),len(val),len(test)

In [None]:
data_samples=train.as_numpy_iterator()

In [None]:
res=data_samples.next()

In [None]:
fig,ax=plt.subplots(ncols=4,figsize=(20,20))
for idx in range(4):
    sample_image=res[0][idx]
    sample_coords=res[1][0][idx]
    cv2.circle(sample_image, tuple(np.multiply(sample_coords[:2], [250,250]).astype(int)), 2, (255,0,0), -1)
    cv2.circle(sample_image, tuple(np.multiply(sample_coords[2:], [250,250]).astype(int)), 2, (0,255,0), -1)
    ax[idx].imshow(sample_image)
    

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D,Dropout,Input,Reshape
from tensorflow.keras.applications import ResNet152V2

In [None]:
model =Sequential([Input(shape=(250,250,3)),
                  ResNet152V2(include_top=False,input_shape=(250,250,3)),
                  Conv2D(512,3,padding='same',activation='relu'),
                  Conv2D(512,3,padding='same',activation='relu'),
                  Conv2D(256,3,2,padding='same',activation='relu'),
                  Conv2D(256,2,2,activation='relu'),
                  Dropout(0.05),
                  Conv2D(4,2,2),
                  Reshape((4,))])
model.summary()

In [None]:
optimaizer=tf.keras.optimizers.Adam(learning_rate=0.001)
loss=tf.keras.losses.MeanSquaredError()

In [None]:
model.compile(optimaizer,loss)

In [None]:
X, y = train.as_numpy_iterator().next()
X.shape

In [None]:
scores = model.predict(X)
scores

In [None]:
histroy=model.fit(train,epochs=100,validation_data=val)

In [None]:
histroy.history

In [None]:
plt.plot(histroy.history['loss'], color='teal', label='loss')
plt.plot(histroy.history['val_loss'], color='orange', label='val loss')
plt.suptitle('Loss')
plt.legend()
plt.show()

In [None]:
test_data = test.as_numpy_iterator()

In [None]:
test_sample = test_data.next()

In [None]:
yhat = model.predict(test_sample[0])

In [None]:
fig, ax = plt.subplots(ncols=4, figsize=(20,20))
for idx in range(4): 
    sample_image = test_sample[0][idx]
    sample_coords = yhat[idx]
    
    cv2.circle(sample_image, tuple(np.multiply(sample_coords[:2], [250,250]).astype(int)), 2, (255,0,0), -1)
    cv2.circle(sample_image, tuple(np.multiply(sample_coords[2:], [250,250]).astype(int)), 2, (0,255,0), -1)
    
    ax[idx].imshow(sample_image)


In [None]:
from tensorflow.keras.models import load_model

In [None]:
model.save('eyetrackerresnet.h5')

In [None]:
model = load_model('eyetrackerresnet.h5')

In [None]:
model.predict(test_sample[0])

In [None]:
cap = cv2.VideoCapture(0)
while cap.isOpened():
    _ , frame = cap.read()
    
    frame = frame[50:500,50:500,:] 
    rgb_img = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    resized = cv2.resize(rgb_img, (250,250))
    
    yhat = model.predict(np.expand_dims(resized/255,0))
    sample_coords = yhat[0,:4]
    
    cv2.circle(frame, tuple(np.multiply(sample_coords[:2], [450,450]).astype(int)), 2, (255,0,0), -1)
    cv2.circle(frame, tuple(np.multiply(sample_coords[2:], [450,450]).astype(int)), 2, (0,255,0), -1)
    
    cv2.imshow('EyeTrack', frame)
    
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break
cap.release()
cv2.destroyAllWindows()