In [2]:
import cv2
import os
import time
import uuid

In [2]:
images_path = os.path.join('data', 'images')
num_pics = 20

In [None]:
# 1 - Gathering Data (pictures of myself)
camera = cv2.VideoCapture(0)
for i in range(num_pics):
    ret_val, image = camera.read()
    imgname = os.path.join(images_path, 'o' + str(uuid.uuid1()) + '.jpg')
    cv2.imwrite(imgname, image)
    cv2.imshow('image', image)
    time.sleep(0.5)

    if cv2.waitKey(1) & 0xFF == ord('q'):
        break
camera.release()
cv2.destroyAllWindows()

In [None]:
## Building Image From Dataset 

In [4]:
import tensorflow as tf
import numpy as np
from matplotlib import pyplot as plt
import json

2023-11-26 12:45:45.247686: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [56]:
# 2 - Parition Unaugmented Data - 70 train, 15 test, 15 val
# Move matching labels (images done manually)
for folder in ['train','test','val']:
    for file in os.listdir(os.path.join('data', folder, 'images')):
        
        label_file = file.split('.')[0]+'.json'
        existing_path = os.path.join('data','labels', label_file)
        if os.path.exists(existing_path): 
            new_path = os.path.join('data',folder,'labels',label_file)
            os.replace(existing_path, new_path)      

In [15]:
# 3 - Image Augmentation on Images + Labels
import albumentations as alb

# Create augmentation pipeline
transform = alb.Compose([
    alb.RandomCrop(width=450, height=450),
    alb.HorizontalFlip(p=0.5),
    alb.RandomBrightnessContrast(p=0.2),
    alb.RandomGamma(p=0.2),
    alb.RGBShift(p=0.2),
    alb.VerticalFlip(p=0.5),
], bbox_params=alb.BboxParams(format='albumentations', label_fields=['class_labels']))

data_types = ['train', 'test', 'val']

In [None]:
## Test Image Augmentation

In [None]:
img = cv2.imread(os.path.join('data', 'train', 'images','oa7b92f4e-73a4-11ee-bc9d-faffc22fdfb0.jpg'))
# print(img.shape)
h = img.shape[0]
w = img.shape[1]

with open(os.path.join('data', 'train', 'labels', 'oa7b92f4e-73a4-11ee-bc9d-faffc22fdfb0.json'), 'r') as label_file:
    label = json.load(label_file)

print(label) # label is a dictionary

# Extract and transform coordinates into albumentations format
coords = [0 for i in range(4)]
coords[0] = label['shapes'][0]['points'][0][0] # x1
coords[1] = label['shapes'][0]['points'][0][1] # y1
coords[2] = label['shapes'][0]['points'][1][0] # x2
coords[3] = label['shapes'][0]['points'][1][1] # y2
print(coords)

coords = list(np.divide(coords, [w, h, w, h]))
print(coords) # albumentations format

# Augment image
augmented = transform(image=img, bboxes=[coords], class_labels=['face'])
cv2.imwrite(os.path.join('aug_data', 'test', 'images', 'oa7b92f4e-73a4-11ee-bc9d-faffc22fdfb0.jpg'), augmented['image'])
print(augmented.keys())

# Show image
cv2.rectangle(augmented['image'],
              tuple(np.multiply(augmented['bboxes'][0][:2], [450, 450]).astype(int)),
              tuple(np.multiply(augmented['bboxes'][0][2:], [450, 450]).astype(int)),
              (255, 0, 0), 2)

# plt.imshow(augmented['image'])

In [1]:
# 3 - Create Augmented Data

In [8]:
for type in ['train','test','val']:
    for image in os.listdir(os.path.join('data', type, 'images')):
        
        img = cv2.imread(os.path.join('data', type, 'images', image))
        h = img.shape[0]
        w = img.shape[1]
        coords = [0,0,0.00001,0.00001] # deafult coords (near 0) if label does not exist
        label_path = os.path.join('data', type, 'labels', f'{image.split(".")[0]}.json')
        if os.path.exists(label_path):
            with open(label_path, 'r') as label_file:
                label = json.load(label_file)

        coords[0] = label['shapes'][0]['points'][0][0] # x1
        coords[1] = label['shapes'][0]['points'][0][1] # y1
        coords[2] = label['shapes'][0]['points'][1][0] # x2
        coords[3] = label['shapes'][0]['points'][1][1] # y2
        
        coords = list(np.divide(coords, [w, h, w, h])) # put coords in albumentations format

        try: 
            for i in range(60):
                augmented = transform(image=img, bboxes=[coords], class_labels=['face'])
                cv2.imwrite(os.path.join('aug_data', type, 'images', f'{image.split(".")[0]}.{i}.jpg'), augmented['image'])

                aug_label_data = {}
                aug_label_data['image'] = image

                if os.path.exists(label_path):
                    if len(augmented['bboxes']) == 0: # no bounding box in the augmented image
                        aug_label_data['bbox'] = [0, 0, 0, 0]
                        aug_label_data['class'] = 0
                    else:
                        aug_label_data['bbox'] = augmented['bboxes'][0]
                        aug_label_data['class'] = 1
                else:
                    aug_label_data['bbox'] = [0, 0, 0, 0]
                    aug_label_data['class'] = 0

                with open(os.path.join('aug_data', type, 'labels', f'{image.split(".")[0]}.{i}.json'), 'w') as aug_label_file:
                    json.dump(aug_label_data, aug_label_file)
                    
        except Exception as e:
            print(e)
            

In [None]:
# 4.a Load augmented Images to TF Dataset

In [16]:
def load_image(file):
    encoded = tf.io.read_file(file)
    img = tf.io.decode_jpeg(encoded)
    return img

aug_images = {}
for type in data_types:
    type_images = tf.data.Dataset.list_files(f'aug_data/{type}/images/*.jpg', shuffle=False)
    type_images = type_images.map(load_image)
    type_images = type_images.map(lambda x: tf.image.resize(x, (120,120)))
    type_images = type_images.map(lambda x: x/255)
    aug_images[type] = type_images
print(aug_images.keys())
# aug_images['test'].as_numpy_iterator().next()

dict_keys(['train', 'test', 'val'])


In [None]:
# 4.b Load augmented labels to TF Dataset 

In [17]:
def load_labels(file): # understand more
    with open(file.numpy(), 'r', encoding = "utf-8") as f:
        label = json.load(f)
        
    return [label['class']], label['bbox']


aug_labels = {}
for type in data_types:
    type_labels = tf.data.Dataset.list_files(f'aug_data/{type}/labels/*.json', shuffle=False)
    type_labels = type_labels.map(lambda x: tf.py_function(load_labels, [x], [tf.uint8, tf.float16])) 
    aug_labels[type] = type_labels
print(aug_labels.keys())
aug_labels['train'].as_numpy_iterator().next()

dict_keys(['train', 'test', 'val'])


(array([0], dtype=uint8), array([0., 0., 0., 0.], dtype=float16))

In [19]:
len(aug_images['train']), len(aug_labels['train'])

(4200, 4200)

In [20]:
# 4.c Combine the data
aug_data = {}
for type in data_types:
    data = tf.data.Dataset.zip((aug_images[type], aug_labels[type]))
    if (type == 'train'):
        data = data.shuffle(5000)
    elif (type == 'test'):
        data = data.shuffle(1000)
    else:
        data = data.shuffle(1000)
    data = data.batch(8)    
    data = data.prefetch(4)  # improve latency and throughput
    aug_data[type] = data

print(aug_data['test'].as_numpy_iterator().next()[0])    # images
aug_data['test'].as_numpy_iterator().next()[1]           # labels

[[[[0.82052696 0.76954657 0.703799  ]
   [0.81666666 0.7656863  0.6990196 ]
   [0.82941175 0.77843136 0.7117647 ]
   ...
   [0.69803923 0.654902   0.58431375]
   [0.69803923 0.654902   0.58431375]
   [0.7049632  0.6539828  0.58731616]]

  [[0.8091299  0.75784314 0.69601715]
   [0.82009804 0.76911765 0.7058824 ]
   [0.8196691  0.76868874 0.7020221 ]
   ...
   [0.69411767 0.6509804  0.5803922 ]
   [0.69889706 0.654902   0.58474267]
   [0.71256125 0.66158086 0.5949142 ]]

  [[0.81911767 0.7642157  0.7132353 ]
   [0.8231005  0.7721201  0.7088848 ]
   [0.82009804 0.76911765 0.702451  ]
   ...
   [0.69797796 0.6548407  0.5842525 ]
   [0.7063113  0.6553309  0.58866423]
   [0.7140319  0.6630515  0.5914828 ]]

  ...

  [[0.02892157 0.02892157 0.03676471]
   [0.01960784 0.01960784 0.02647059]
   [0.02303922 0.02303922 0.02303922]
   ...
   [0.6431373  0.5764706  0.5137255 ]
   [0.6426471  0.57598037 0.5132353 ]
   [0.6392157  0.57254905 0.50686276]]

  [[0.01795343 0.02481618 0.0213848 ]
   [0.0

(array([[0],
        [1],
        [1],
        [1],
        [1],
        [1],
        [0],
        [1]], dtype=uint8),
 array([[0.    , 0.    , 0.    , 0.    ],
        [0.977 , 0.    , 1.    , 0.5684],
        [0.    , 0.0767, 0.3533, 0.707 ],
        [0.    , 0.    , 0.1066, 0.83  ],
        [0.    , 0.1925, 0.1703, 1.    ],
        [0.    , 0.391 , 0.472 , 1.    ],
        [0.    , 0.    , 0.    , 0.    ],
        [0.717 , 0.4822, 1.    , 1.    ]], dtype=float16))

In [1]:
#5 - Building Deep Learning Model with Functional API

In [21]:
import keras
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, Dense, GlobalMaxPooling2D
from tensorflow.keras.applications import VGG16

In [22]:
vgg = VGG16(include_top=False)

In [10]:
# vgg.summary()

In [23]:
# model function
def create_model():
    input_layer = Input(shape=(120,120,3))

    vgg = VGG16(include_top=False)(input_layer)

    # Classification Model
    class0 = GlobalMaxPooling2D()(vgg)
    class1 = Dense(2048, activation='relu')(class0)
    class2 = Dense(1, activation='sigmoid')(class1)

    # Regression (Bounding Box) Model
    reg0 = GlobalMaxPooling2D()(vgg)
    reg1 = Dense(2048, activation='relu')(reg0)
    reg2 = Dense(4, activation='sigmoid')(reg1)

    facetracker = Model(inputs=input_layer, outputs=[class2, reg2])
    return facetracker

In [24]:
facetracker = create_model()
facetracker.summary()
keras.utils.plot_model(facetracker, "initial model", show_shapes=True, show_layer_names=True, show_dtype=True)

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_2 (InputLayer)        [(None, 120, 120, 3)]        0         []                            
                                                                                                  
 vgg16 (Functional)          (None, None, None, 512)      1471468   ['input_2[0][0]']             
                                                          8                                       
                                                                                                  
 global_max_pooling2d (Glob  (None, 512)                  0         ['vgg16[0][0]']               
 alMaxPooling2D)                                                                                  
                                                                                              

In [25]:
# Predicting without training
x, y = aug_data['train'].as_numpy_iterator().next()
print(x.shape)
classes, coords = facetracker.predict(x)
classes, coords

(8, 120, 120, 3)


(array([[0.5195978 ],
        [0.5889544 ],
        [0.5825449 ],
        [0.62903005],
        [0.45246664],
        [0.56652933],
        [0.5924776 ],
        [0.55499333]], dtype=float32),
 array([[0.41382602, 0.43507943, 0.35255468, 0.6840632 ],
        [0.34845304, 0.45146948, 0.3919316 , 0.6254567 ],
        [0.40200987, 0.35000685, 0.36320502, 0.6969399 ],
        [0.4727461 , 0.3934826 , 0.30914906, 0.66708475],
        [0.41071898, 0.4226405 , 0.45249328, 0.6815185 ],
        [0.4295434 , 0.42590398, 0.405505  , 0.6759449 ],
        [0.41726786, 0.39431426, 0.4065091 , 0.7552469 ],
        [0.45479757, 0.38928175, 0.41880015, 0.61845714]], dtype=float32))

In [45]:
# 6 - Losses & Optimizers

In [34]:
# Optimizer
batches_per_epoch = len(aug_data['train'])
decay = (1./0.75 -1)/batches_per_epoch
opt = tf.keras.optimizers.legacy.Adam(learning_rate=0.0001, decay=decay)

In [35]:
def localization_loss(act, pred):
    delta_coord = tf.reduce_sum(tf.square(act[:,:2] - pred[:,:2]))

    h_act = act[:,3] - act[:,1]
    w_act = act[:,2] - act[:,0]
    h_pred = pred[:,3] - pred[:,1]
    w_pred = pred[:,2] - pred[:,0]
    delta_size = tf.reduce_sum(tf.square(w_act - w_pred) + tf.square(h_act - h_pred))

    return delta_coord + delta_size

In [36]:
classloss = tf.keras.losses.BinaryCrossentropy()
regressloss = localization_loss

In [37]:
# Test Loss on a sample
localization_loss(y[1], coords)
classloss(y[0], classes)
regressloss(y[1], coords)

<tf.Tensor: shape=(), dtype=float32, numpy=4.1566067>

In [1]:
# 7 - Train Neural Network

In [38]:
# 7.a - class
class FaceTracker(Model):
    def __init__(self, model, **kwargs):
        super().__init__(**kwargs)
        self.model = model

    def compile(self, opt, classloss, localizationloss, **kwargs):
        super().compile(**kwargs)
        self.classloss = classloss
        self.lloss = localizationloss
        self.opt = opt

    def train_step(self, batch, **kwargs):
        images, labels = batch
        
        with tf.GradientTape() as tape: 
            classes, coords = self.model(images, training=True)
            
            batch_classloss = self.classloss(labels[0], classes)
            batch_lloss = self.lloss(tf.cast(labels[1], tf.float32), coords)
            
            total_loss = batch_lloss+0.5*batch_classloss
            
            grad = tape.gradient(total_loss, self.model.trainable_variables) #
        
        opt.apply_gradients(zip(grad, self.model.trainable_variables)) #
        
        return {"total_loss":total_loss, "class_loss":batch_classloss, "regress_loss":batch_lloss}

    def test_step(self, batch, **kwargs):
        images, labels = batch
        classes, coords = self.model(images, training=False)

        batch_classloss = self.classloss(labels[0], classes)
        batch_lloss = self.lloss(tf.cast(labels[1], tf.float32), coords)
        total_loss = batch_lloss+0.5*batch_classloss

        return {"total_loss":total_loss, "class_loss":batch_classloss, "regress_loss":batch_lloss}

    def call(self, images, **kwargs):
        return self.model(images, **kwargs)

In [39]:
model = FaceTracker(facetracker)

In [40]:
model.compile(opt, classloss, regressloss)

In [41]:
# 7.b Train
logdir='logs'
tensorboard_cb = tf.keras.callbacks.TensorBoard(log_dir=logdir)
hist = model.fit(aug_data['train'].take(100), epochs=10, 
                    validation_data=aug_data['val'], callbacks=[tensorboard_cb])

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [87]:
# hist.history

In [None]:
# 8 - Testing

In [47]:
test_data = aug_data['test'].as_numpy_iterator()
test_sample = test_data.next()
prediction = facetracker.predict(test_sample[0])
prediction



[array([[0.94338655],
        [0.03185209],
        [0.9999999 ],
        [0.03022125],
        [0.50843704],
        [0.99999976],
        [0.15136749],
        [0.11426912]], dtype=float32),
 array([[0.17687875, 0.05481217, 0.7464975 , 0.68787485],
        [0.02564882, 0.02330072, 0.0300895 , 0.03386927],
        [0.3542798 , 0.10361259, 0.9563853 , 0.9033525 ],
        [0.02750944, 0.01476298, 0.02891585, 0.03980681],
        [0.03675908, 0.03087154, 0.4159087 , 0.52682054],
        [0.14307618, 0.2404233 , 0.7453403 , 0.919243  ],
        [0.09227018, 0.03287106, 0.09265514, 0.10350192],
        [0.07772366, 0.04728038, 0.08805643, 0.10673418]], dtype=float32)]

In [None]:
fig, ax = plt.subplots(ncols=4, figsize=(20,20))
for idx in range(4): 
    sample_image = test_sample[0][idx]
    sample_coords = prediction[1][idx]
    
    if prediction[0][idx] > 0.9:
        cv2.rectangle(sample_image, 
                      tuple(np.multiply(sample_coords[:2], [120,120]).astype(int)),
                      tuple(np.multiply(sample_coords[2:], [120,120]).astype(int)), 
                            (255,0,0), 2)
    
    ax[idx].imshow(sample_image)

In [49]:
# Save Model
facetracker.save('facedetection.keras')

In [50]:
# Load Model
from tensorflow.keras.models import load_model

facetracker = load_model('facedetection.keras')

In [None]:
# Live Test 

import cv2
import numpy as np
import tensorflow as tf

cap = cv2.VideoCapture(0)
while cap.isOpened():
    retval, frame = cap.read() # returns a boolean, and captured frame which is a numpy array
    frame = frame[50:500, 50:500,:] 
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    resized = tf.image.resize(frame_rgb, (120,120))

    pred = facetracker.predict(np.expand_dims(resized/255, 0))
    sample_coords = pred[1][0]

    if pred[0] > 0.5:
        cv2.rectangle(frame, 
                      tuple(np.multiply(sample_coords[:2], [450,450]).astype(int)),
                      tuple(np.multiply(sample_coords[2:], [450,450]).astype(int)), 
                            (255,0,0), 2)
        # Controls the label rectangle
        cv2.rectangle(frame, 
                      tuple(np.add(np.multiply(sample_coords[:2], [450,450]).astype(int), 
                                    [0,-30])),
                      tuple(np.add(np.multiply(sample_coords[:2], [450,450]).astype(int),
                                    [80,0])), 
                            (255,0,0), -1)
        # Text
        cv2.putText(frame, 'face', tuple(np.add(np.multiply(sample_coords[:2], [450,450]).astype(int),
                                               [0,-5])),
                    cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2, cv2.LINE_AA)
    
    cv2.imshow('EyeTrack', frame)

    if cv2.waitKey(1) & 0xFF == ord('q'):
        break
cap.release()
cv2.destroyAllWindows()