1. Albumentations changes the coordinates for the bounding box after applying the data augmentation techniques.

2. Two coordinates for bounding box, top left and bottom right or vice versa.

3. Loss function will be binary cross entropy which is common for a classification model.

4. Localization loss is of two types : one is to estimate the differnece of our predicted from the actual coordinates, second is evaluating the width and height of our bounding box and comparing it to our predicted width and height.

5. Localization loss therefore is trying to ensure that our bounding box is extremely close to representing our object. 

6. Uses VGG16 as base model and adds in two layers for our classification and regression model to give us our bounding boxes. 

7. 5 outputs out of our model, [0,1] for whether indivdual face is being detected or not (x1,y1,x2,y2) which is the coordinates of our bounding box. 

In [None]:
# !pip install labelme tensorflow opencv-python matplotlib albumentations

In [1]:
import os 
import time 
import uuid #Creates a unique uniform identifier, hence creates unique file names for each image
import cv2

In [None]:
IMAGES_PATH = os.path.join('data','images')
number_images = 30

In [None]:
# cap = cv2.VideoCapture(0) #Establishes a connection to the webcam, 0 is the camera number
# for imgnum in range(number_images): #Loop through the range of images
#     print('Collecting image {}'.format(imgnum))
#     ret, frame = cap.read()
#     imgname = os.path.join(IMAGES_PATH,f'{str(uuid.uuid1())}.jpg') #Establish your file path
#     cv2.imwrite(imgname, frame) #Save the images to the file
#     cv2.imshow('frame', frame)
#     time.sleep(0.5) #time.sleep for half a second between each frame hence can take positive and negative samples

#     if cv2.waitKey(1) & 0xFF == ord('q'):
#         break
# cap.release()
# cv2.destroyAllWindows()

In [None]:
!labelme /data/images 
#Triggers the labelme annotation library and where we can annotate the images
#Annotated the images using bounding box and then these images are stored as json files 

In [2]:
import tensorflow as tf
import json #To load the json labels into our python pipeline
import numpy as np
from matplotlib import pyplot as plt

In [None]:
#Load the images that we have obtained
images = tf.data.Dataset.list_files('data\\images\\*.jpg',shuffle=True)

In [None]:
images.as_numpy_iterator().next() #To ensure the images have been picked up

In [None]:
# To load the images
def load_image(x): 
    byte_img = tf.io.read_file(x)
    img = tf.io.decode_jpeg(byte_img)
    return img

In [None]:
images = images.map(load_image)

In [None]:
images.as_numpy_iterator().next()

In [None]:
type(images)

In [None]:
image_generator = images.batch(4).as_numpy_iterator() 

In [None]:
plot_images = image_generator.next()

In [None]:
fig, ax = plt.subplots(ncols=4, figsize=(20,20))
for idx, image in enumerate(plot_images):
    ax[idx].imshow(image) 
plt.show()

In [None]:
#Here we move the images from the labels folder and match them up with the images in respective train, test and val folders
#Hence we automate the entire process instead of having to do them using the labelme library 
for folder in ['train','test','val']:
    for file in os.listdir(os.path.join('data', folder, 'images')):
        
        filename = file.split('.')[0]+'.json'
        existing_filepath = os.path.join('data','labels', filename)
        if os.path.exists(existing_filepath): 
            new_filepath = os.path.join('data',folder,'labels',filename)
            os.replace(existing_filepath, new_filepath)   

In [None]:
img = cv2.imread(os.path.join('data','train','images','2bc593db-d215-11ed-8079-089798bd9627.jpg')) #Read an image to determine it's shape and dimensions

In [None]:
img.shape #Check dimensions

In [None]:
import albumentations as alb  #Library responsible for data augmentation in order to essentially increase the size of our dataset

In [None]:
#Different augmentation techniques that we apply like random crop, horizontal flip etc
#Below we use the albumentations format because it scales the dimensions of our bounding box via normalization

augmentor = alb.Compose([alb.RandomCrop(width=450, height=450), 
                         alb.HorizontalFlip(p=0.5), 
                         alb.RandomBrightnessContrast(p=0.2),
                         alb.RandomGamma(p=0.2), 
                         alb.RGBShift(p=0.2), 
                         alb.VerticalFlip(p=0.5)], 
                       bbox_params=alb.BboxParams(format='albumentations', 
                                                  label_fields=['class_labels']))

In [None]:
img = cv2.imread(os.path.join('data','train', 'images','2bc593db-d215-11ed-8079-089798bd9627.jpg')) #Load the image from the training set

In [None]:
with open(os.path.join('data', 'train', 'labels', '2bc593db-d215-11ed-8079-089798bd9627.json'), 'r') as f:
    label = json.load(f) #Load the annotated image

In [None]:
#Since our label consists of the annotated image attributes in the form of a nested dictionary and list we can access the coordinates in that way 
label['shapes'][0]['points'] 

In [None]:
coords = [0,0,0,0]
coords[0] = label['shapes'][0]['points'][0][0]
coords[1] = label['shapes'][0]['points'][0][1]
coords[2] = label['shapes'][0]['points'][1][0]
coords[3] = label['shapes'][0]['points'][1][1]

In [None]:
coords = list(np.divide(coords, [640,480,640,480])) #Transformed our coordinates into the alubemnations format in which we require it 

In [None]:
augmented = augmentor(image=img, bboxes=[coords], class_labels=['face']) #Returns a dictionary back with the three keys

In [None]:
augmented['bboxes'][0][2:] #Represents x_max, y_max

In [None]:
augmented['bboxes'] #Represents x_min, y_min 

In [None]:
#Re-transforming the image accordingly to our augmentation parameters otherwise the image will look really small hence just rescaling it appropiately
#Passing it as a tuple because that is what opencv expects
cv2.rectangle(augmented['image'], 
              tuple(np.multiply(augmented['bboxes'][0][:2], [450,450]).astype(int)),
              tuple(np.multiply(augmented['bboxes'][0][2:], [450,450]).astype(int)), 
                    (255,0,0), 2)

#Visualise how our augmentations will look
plt.imshow(augmented['image'])

In [None]:
#We run our data augmentation for all the images present in our train,test and val folders
for partition in ['train','test','val']: 
    #check if our images exist in the train,test and val folders
    for image in os.listdir(os.path.join('data', partition, 'images')):
        img = cv2.imread(os.path.join('data', partition, 'images', image))
        
        coords = [0,0,0.00001,0.00001] #Default annotation for images where annotation does not exist
        label_path = os.path.join('data', partition, 'labels', f'{image.split(".")[0]}.json')
        if os.path.exists(label_path):
            with open(label_path, 'r') as f:
                label = json.load(f)
            
            #If it exists then we perfrom the transformations that we performed above for one image wherein we convert it from a stacked array or tensor to a straight vector
            coords[0] = label['shapes'][0]['points'][0][0]
            coords[1] = label['shapes'][0]['points'][0][1]
            coords[2] = label['shapes'][0]['points'][1][0]
            coords[3] = label['shapes'][0]['points'][1][1]
            coords = list(np.divide(coords, [640,480,640,480]))

            #Above part is simply loading up the images and labels 
            
            
        try: 
            for x in range(60): #Then for every image we have we will be creating 60 augmented images hence our dataset will consist of 90x60 augmented images 
                augmented = augmentor(image=img, bboxes=[coords], class_labels=['face'])
                cv2.imwrite(os.path.join('aug_data', partition, 'images', f'{image.split(".")[0]}.{x}.jpg'), augmented['image'])

                annotation = {}
                annotation['image'] = image

                if os.path.exists(label_path):
                    if len(augmented['bboxes']) == 0: 
                        annotation['bbox'] = [0,0,0,0]
                        annotation['class'] = 0 
                    else: 
                        annotation['bbox'] = augmented['bboxes'][0]
                        annotation['class'] = 1
                else: 
                    annotation['bbox'] = [0,0,0,0]
                    annotation['class'] = 0 


                with open(os.path.join('aug_data', partition, 'labels', f'{image.split(".")[0]}.{x}.json'), 'w') as f:
                    json.dump(annotation, f)

        except Exception as e:
            print(e)

In [None]:
#Preparing the augmented images for a tensorflow dataset
train_images = tf.data.Dataset.list_files('aug_data\\train\\images\\*.jpg', shuffle=False)
train_images = train_images.map(load_image)
train_images = train_images.map(lambda x: tf.image.resize(x, (120,120))) #Resizing the images for easier computation in our neural net
train_images = train_images.map(lambda x: x/255) #Scaling the image by dividing by 255 so that our values are between 0 and 1 and we can pass it through a sigmoid activation function in the final layer of our neural net

In [None]:
test_images = tf.data.Dataset.list_files('aug_data\\test\\images\\*.jpg', shuffle=False)
test_images = test_images.map(load_image)
test_images = test_images.map(lambda x: tf.image.resize(x, (120,120)))
test_images = test_images.map(lambda x: x/255)

In [None]:
val_images = tf.data.Dataset.list_files('aug_data\\val\\images\\*.jpg', shuffle=False)
val_images = val_images.map(load_image)
val_images = val_images.map(lambda x: tf.image.resize(x, (120,120)))
val_images = val_images.map(lambda x: x/255)

In [None]:
train_images.as_numpy_iterator().next()

In [None]:
#Load the labels
def load_labels(label_path):
    with open(label_path.numpy(), 'r', encoding = "utf-8") as f:
        label = json.load(f)
        
    return [label['class']], label['bbox'] #We extract the class and the bounding box

In [None]:
#Loading our labels to a tensorflow dataset which are in json format
train_labels = tf.data.Dataset.list_files('aug_data\\train\\labels\\*.json', shuffle=False)
train_labels = train_labels.map(lambda x: tf.py_function(load_labels, [x], [tf.uint8, tf.float16]))

In [None]:
test_labels = tf.data.Dataset.list_files('aug_data\\test\\labels\\*.json', shuffle=False)
test_labels = test_labels.map(lambda x: tf.py_function(load_labels, [x], [tf.uint8, tf.float16]))

In [None]:
val_labels = tf.data.Dataset.list_files('aug_data\\val\\labels\\*.json', shuffle=False)
val_labels = val_labels.map(lambda x: tf.py_function(load_labels, [x], [tf.uint8, tf.float16]))

In [None]:
train_labels.as_numpy_iterator().next() #Will return the array of the class and the array for the bbox coordinates

In [None]:
#Now we need to combine the images and labels that we have and create our final dataset 
len(train_images), len(train_labels), len(test_images), len(test_labels), len(val_images), len(val_labels)

In [None]:
train = tf.data.Dataset.zip((train_images, train_labels)) #Zip method combines the dataset 
train = train.shuffle(4000) #Shuffle should be higher than len of the dataset 
train = train.batch(8) #Batch the images 
train = train.prefetch(4) #Eliminates any bottlenecks when loading your data 

In [None]:
test = tf.data.Dataset.zip((test_images, test_labels))
test = test.shuffle(1300)
test = test.batch(8)
test = test.prefetch(4)

In [None]:
val = tf.data.Dataset.zip((val_images, val_labels))
val = val.shuffle(1000)
val = val.batch(8)
val = val.prefetch(4)

In [None]:
train.as_numpy_iterator().next()[1] 

In [None]:
#View the images and annotations using numpy_iterator which essentially allows us to loop through it 
data_samples = train.as_numpy_iterator()

In [None]:
res = data_samples.next()

In [None]:
fig, ax = plt.subplots(ncols=4, figsize=(20,20))
for idx in range(4): 
    sample_image = res[0][idx]
    sample_coords = res[1][1][idx]
    
    cv2.rectangle(sample_image, 
                  tuple(np.multiply(sample_coords[:2], [120,120]).astype(int)),
                  tuple(np.multiply(sample_coords[2:], [120,120]).astype(int)), 
                        (255,0,0), 2)

    ax[idx].imshow(sample_image)

1. For our model we used VGG16 as our base neural net architecture and added on our final prediction layers for classification and regression.

2. Binary classification for determining whether the individual is at their desk or not. 

3. Regression for determining the coordinates of the bounding box. 

4. We use the functional api model so that we combine our loss functions for the classification and regression and the later will be the localisation loss which we will write ourselves. 

In [3]:
#Import relevant libraries and classes
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, Dense, GlobalMaxPooling2D
from tensorflow.keras.applications import VGG16

In [None]:
vgg  =VGG16(include_top=False) #include_top gets rid of the final layers where we will add our classification and regression layers

In [None]:
vgg.summary()

In [None]:
#Build an instance of the model
def build_model(): 
    input_layer = Input(shape=(120,120,3))
    
    vgg = VGG16(include_top=False)(input_layer)

    # Classification Model  
    f1 = GlobalMaxPooling2D()(vgg) #Condensiong all the info from VGG16 and hence we obtain the max values
    class1 = Dense(2048, activation='relu')(f1)
    class2 = Dense(1, activation='sigmoid')(class1)
    
    # Bounding box model
    f2 = GlobalMaxPooling2D()(vgg)
    regress1 = Dense(2048, activation='relu')(f2)
    regress2 = Dense(4, activation='sigmoid')(regress1)
    
    facetracker = Model(inputs=input_layer, outputs=[class2, regress2])
    return facetracker

In [None]:
facetracker = build_model()

In [None]:
facetracker.summary()

In [None]:
X, y = train.as_numpy_iterator().next()

In [None]:
X.shape

In [None]:
classes, coords = facetracker.predict(X)

In [None]:
classes, coords

In [None]:
#Defining our losses and optimizers
batches_per_epoch = len(train)
lr_decay = (1./0.75 -1)/batches_per_epoch

In [None]:
opt = tf.keras.optimizers.Adam(learning_rate=0.0001)

In [None]:
#Our localization and classification loss
def localization_loss(y_true, yhat):            
    delta_coord = tf.reduce_sum(tf.square(y_true[:,:2] - yhat[:,:2]))
                  
    h_true = y_true[:,3] - y_true[:,1] 
    w_true = y_true[:,2] - y_true[:,0] 

    h_pred = yhat[:,3] - yhat[:,1] 
    w_pred = yhat[:,2] - yhat[:,0] 
    
    delta_size = tf.reduce_sum(tf.square(w_true - w_pred) + tf.square(h_true-h_pred))
    
    return delta_coord + delta_size

In [None]:
classloss = tf.keras.losses.BinaryCrossentropy()
regressloss = localization_loss

In [None]:
classloss(y[0], classes) #testing out the classification

In [None]:
regressloss(y[1], coords) #Testing out the regression loss

In [None]:
class FaceTracker(Model): 
    def __init__(self, eyetracker,  **kwargs): 
        super().__init__(**kwargs)
        self.model = eyetracker

    def compile(self, opt, classloss, localizationloss, **kwargs):
        super().compile(**kwargs)
        self.closs = classloss
        self.lloss = localizationloss
        self.opt = opt
    
    def train_step(self, batch, **kwargs): 
        
        X, y = batch
        
        with tf.GradientTape() as tape: 
            classes, coords = self.model(X, training=True)
            
            batch_classloss = self.closs(y[0], classes)
            batch_localizationloss = self.lloss(tf.cast(y[1], tf.float32), coords)
            
            total_loss = batch_localizationloss+0.5*batch_classloss
            
            grad = tape.gradient(total_loss, self.model.trainable_variables)
        
        opt.apply_gradients(zip(grad, self.model.trainable_variables))
        
        return {"total_loss":total_loss, "class_loss":batch_classloss, "regress_loss":batch_localizationloss}
    
    def test_step(self, batch, **kwargs): 
        X, y = batch
        
        classes, coords = self.model(X, training=False)
        
        batch_classloss = self.closs(y[0], classes)
        batch_localizationloss = self.lloss(tf.cast(y[1], tf.float32), coords)
        total_loss = batch_localizationloss+0.5*batch_classloss
        
        return {"total_loss":total_loss, "class_loss":batch_classloss, "regress_loss":batch_localizationloss}
        
    def call(self, X, **kwargs): 
        return self.model(X, **kwargs)

In [None]:
model = FaceTracker(facetracker)

In [None]:
model.compile(opt, classloss, regressloss)

In [None]:
logdir='logs'

In [None]:
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=logdir)

In [None]:
hist = model.fit(train, epochs=10, validation_data=val, callbacks=[tensorboard_callback])

In [None]:
hist.history

In [None]:
fig, ax = plt.subplots(ncols=3, figsize=(20,5))

ax[0].plot(hist.history['total_loss'], color='teal', label='loss')
ax[0].plot(hist.history['val_total_loss'], color='orange', label='val loss')
ax[0].title.set_text('Loss')
ax[0].legend()

ax[1].plot(hist.history['class_loss'], color='teal', label='class loss')
ax[1].plot(hist.history['val_class_loss'], color='orange', label='val class loss')
ax[1].title.set_text('Classification Loss')
ax[1].legend()

ax[2].plot(hist.history['regress_loss'], color='teal', label='regress loss')
ax[2].plot(hist.history['val_regress_loss'], color='orange', label='val regress loss')
ax[2].title.set_text('Regression Loss')
ax[2].legend()

plt.show()

In [None]:
test_data = test.as_numpy_iterator()

In [None]:
test_sample = test_data.next()

In [None]:
yhat = facetracker.predict(test_sample[0])

In [None]:
fig, ax = plt.subplots(ncols=4, figsize=(20,20))
for idx in range(4): 
    sample_image = test_sample[0][idx]
    sample_coords = yhat[1][idx]
    
    if yhat[0][idx] > 0.9:
        cv2.rectangle(sample_image, 
                      tuple(np.multiply(sample_coords[:2], [120,120]).astype(int)),
                      tuple(np.multiply(sample_coords[2:], [120,120]).astype(int)), 
                            (255,0,0), 2)
    
    ax[idx].imshow(sample_image)

In [4]:
from tensorflow.keras.models import load_model
from keras.models import load_model

In [None]:
facetracker.save('facetracker.h5')

In [6]:
facetracker = load_model('face.h5')



In [7]:
import numpy as np


# Define text and font properties
text = "Away from Desk"
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = 1
font_thickness = 2

# Get size of text


In [8]:
cap = cv2.VideoCapture(0)
while cap.isOpened():
    _ , frame = cap.read()
    frame = frame[50:500, 50:500,:]
    
    rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    resized = tf.image.resize(rgb, (120,120))
    
    yhat = facetracker.predict(np.expand_dims(resized/255,0))
    sample_coords = yhat[1][0]
    
    if yhat[0] > 0.5: 
        # Controls the main rectangle
        cv2.rectangle(frame, 
                      tuple(np.multiply(sample_coords[:2], [450,450]).astype(int)),
                      tuple(np.multiply(sample_coords[2:], [450,450]).astype(int)), 
                            (124,252,0), 2)
        # Controls the label rectangle
        cv2.rectangle(frame, 
                      tuple(np.add(np.multiply(sample_coords[:2], [450,450]).astype(int), 
                                    [0,-30])),
                      tuple(np.add(np.multiply(sample_coords[:2], [450,450]).astype(int),
                                    [80,0])), 
                            (124,252,0), -1)
        
        # Controls the text rendered
        cv2.putText(frame, 'At Desk', tuple(np.add(np.multiply(sample_coords[:2], [450,450]).astype(int),
                                               [0,-5])),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.75, (255,255,255), 2, cv2.LINE_AA)
    
    
    else:
        text_size, _ = cv2.getTextSize(text, font, font_scale, font_thickness)

        # Calculate position of text
        text_x = int((frame.shape[1] - text_size[0]) / 2)
        text_y = int((frame.shape[0] + text_size[1]) / 2)

# Draw text on image
        cv2.putText(frame, text, (text_x, text_y), font, font_scale, (255, 255, 255), font_thickness)

# Draw rectangle around text
        rect_color = (0, 0, 255)
        rect_thickness = 2
        cv2.rectangle(frame, (text_x - 10, text_y - text_size[1] - 10), (text_x + text_size[0] + 10, text_y + 10), rect_color, rect_thickness)

      
        
    cv2.imshow('EyeTrack', frame)  
        
    
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break
cap.release()
cv2.destroyAllWindows()



