In [None]:
!pip install opencv-python

## 1. Collect img through webcam

In [None]:
import albumentations as albu

In [None]:
import tensorflow as tf
import json
import numpy as np
from matplotlib import pyplot as plt

In [None]:
import os 
import time
import uuid
import cv2

In [None]:
IMAGES_PATH = os.path.join('data','images')
num_img = 10

In [None]:

facefolder = "YOUR FULL PATH\OF THIS WORKING DIRECTORY\"
data  = "data"
img_lbl = ["images", "labels"]


In [None]:
#### Run this cell if there is no data folder /// OPTIONAL ///
if not os.path.exists(os.path.join(facefolder, data)):
    os.makedirs(os.path.join(facefolder, data))
    if not os.listdir(os.path.join(facefolder, data)):
        for a in img_lbl:
            os.makedirs(os.path.join(facefolder, data, a))

In [None]:
cap = cv2.VideoCapture(0)
for imgnum in range(num_img):
    print('Collecting image {}'.format(imgnum))
    ret, frame = cap.read()
    imgname = os.path.join(IMAGES_PATH, f'{str(uuid.uuid1())}.jpg')
    cv2.imwrite(imgname, frame)
    cv2.imshow('frame', frame)
    time.sleep(0.5)

    if cv2.waitKey(1) & 0xFF == ord('q'):  ## allows us to break out of the loop
        break
cap.release()
cv2.destroyAllWindows()

### 1.2 Annotate the img with label

In [None]:
!labelme 

## 2. Review the Dataset 

### Limit memory growth

In [None]:
gpus = tf.config.experimental.list_physical_devices('GPU')

for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)

In [None]:
tf.config.list_physical_devices('GPU')

#### Load Img into Tensorflow data Pipeline

In [None]:
img = tf.data.Dataset.list_files('data\\images\\*.jpg')

In [None]:
img.as_numpy_iterator().next()

In [None]:
def load_image(x):
    byte_img = tf.io.read_file(x)  ### read the file path and then return byte encode img
    image = tf.io.decode_jpeg(byte_img) ### decode
    return image
    

In [None]:
img = img.map(load_image)

In [None]:
 img.as_numpy_iterator().next()

In [None]:
img_generator = img.batch(4).as_numpy_iterator()

In [None]:
plot_images = img_generator.next()

In [None]:
fig, ax = plt.subplots(ncols=4, figsize=(20,20)) 
for idx, image in enumerate(plot_images):
    ax[idx].imshow(image)
plt.show()

### Partition Unaug Data

In [None]:
partition = ["train", "test", "val"]

In [None]:
### Run this cell to create train, test, and validation folders with images and labels in it.

if not os.path.exists(os.path.join(facefolder, data, partition[0])):
    for a in partition:
        os.makedirs(os.path.join(facefolder, data, a))
        for e in img_lbl:
            os.makedirs(os.path.join(facefolder, data, a, e))
else:
    print("Folders are existed")

In [None]:
### Split data 70% train, 15% test and 15% Val
import random
import shutil

random.seed(42)

# Paths to the original image directory
images_dir = 'data/images'

# Destination directories for the images
train_dir = 'data/train/images'
val_dir = 'data/val/images'
test_dir = 'data/test/images'

all_images = sorted(os.listdir(images_dir))

random.shuffle(all_images)
total_images = len(all_images)
train_split = int(0.7 * total_images)
val_split = int(0.15 * total_images)

train_files = all_images[:train_split]  ### from beginning to the 70%
val_files = all_images[train_split:train_split + val_split]  ### from 70% to 85%
test_files = all_images[train_split + val_split:] ### last 15%

def move_images(file_list, dest_dir):
    for file_name in file_list:
        img_src = os.path.join(images_dir, file_name)
        img_dest = os.path.join(dest_dir, file_name)

        shutil.move(img_src, img_dest)

move_images(train_files, train_dir)
move_images(val_files, val_dir)
move_images(test_files, test_dir)


In [None]:
### Moveing labels from orignal folder into each: train, test and validation

for folder in ['train', 'test', 'val']:
    for file in os.listdir(os.path.join('data', folder, 'images')):
        filename = file.split('.')[0]+'.json'
        existing_filepath = os.path.join('data', 'labels', filename)
        if os.path.exists(existing_filepath):
            new_filepath  =os.path.join('data', folder, 'labels', filename)
            os.replace(existing_filepath, new_filepath)

### Apply image Augmentation on Images and Labels on 1 img as example


In [None]:
augmentor = albu.Compose([
    albu.RandomCrop(width=450, height = 450),
    albu.HorizontalFlip(p=0.5),
    albu.RandomBrightnessContrast(p=0.2),
    albu.RandomGamma(p=0.2),
    albu.RGBShift(p =0.2),
    albu.VerticalFlip(p=0.5)
], bbox_params=albu.BboxParams(format='albumentations', label_fields = ['class_labels']))

In [None]:
img = cv2.imread(os.path.join('data', 'train', 'images', 'ANY IMAGE .jpg'))

In [None]:
with open(os.path.join('data', 'train', 'labels', 'YOUR JSON FILE .json'), 'r') as f:
    label = json.load(f)

In [None]:
shape = label['shapes'][0]
len(shape['points'])

In [None]:
shape

In [None]:
coords = [0,0,0,0]
coords[0] = label['shapes'][0]['points'][0][0]
coords[1] = label['shapes'][0]['points'][0][1]
coords[2] = label['shapes'][0]['points'][1][0]
coords[3] = label['shapes'][0]['points'][1][1]

In [None]:
coords ### Raw Pascal Voc format 

In [None]:
coords = list(np.divide(coords, [640,480,640,480]))  ### Transform

In [None]:
coords  ### Now be Albumentations format

In [None]:
augmented_img = augmentor(image = img, bboxes = [coords], class_labels= ['face'])

In [None]:
augmented_img['bboxes']

In [None]:
cv2.rectangle(augmented_img['image'],
              tuple(np.multiply(augmented_img['bboxes'][0][:2], [450, 450]).astype(int)),
              tuple(np.multiply(augmented_img['bboxes'][0][2:], [450, 450]).astype(int)),
                     (255,0,0), 2)  ### 2 is the thickness of the rectangle
plt.imshow(augmented_img['image'])

### Build Augmentation Pipeline


##### Create augment folder for augmented data

In [None]:
AUG_PATH = "aug_data"

In [None]:
if not os.path.exists(AUG_PATH):
    os.makedirs(os.path.join(facefolder,AUG_PATH))

if not os.path.exists(os.path.join(facefolder, AUG_PATH, partition[0])):
    for a in partition:
        os.makedirs(os.path.join(facefolder, AUG_PATH , a))
        for e in img_lbl:
            os.makedirs(os.path.join(facefolder, AUG_PATH , a, e))
else:
    print("Folders are existed")

##### Build Augmentation Pipeline

In [None]:
for partition in ['train', 'test', 'val']:
    for image in os.listdir(os.path.join('data', partition, 'images')):
        img = cv2.imread(os.path.join('data', partition, 'images', image))

        coords = [0, 0, 0.00001, 0.00001] #### set default coords for img that does not have label
        label_path = os.path.join('data', partition, 'labels', f'{image.split(".")[0]}.json')
        if os.path.exists(label_path):
            with open(label_path, 'r') as f:
                label = json.load(f)   #### Load the json file to get class and coordinates
            if 'shapes' in label and len(label['shapes']) > 0:
                shape = label['shapes'][0]
                if 'points' in shape and len(shape['points']) >= 2:
                    coords[0] = label['shapes'][0]['points'][0][0]
                    coords[1] = label['shapes'][0]['points'][0][1]
                    coords[2] = label['shapes'][0]['points'][1][0]
                    coords[3] = label['shapes'][0]['points'][1][1]
                    coords = list(np.divide(coords, [640, 480, 640, 480])) ### from pascal voc to albumentation
                else:
                    print(f"Warning: Not enough points in {label_path}")
            else:
                print(f"Warning: No shapes in {label_path}")
            

        
        try:
            for x in range(60):
                augmented = augmentor(image = img, bboxes=[coords], class_labels=['face'])
                cv2.imwrite(os.path.join('aug_data', partition, 'images', f'{image.split(".")[0]}.{x}.jpg'), augmented['image'])

                annotation = {}
                annotation['image'] = image

                if os.path.exists(label_path):
                    if len(augmented['bboxes']) ==0:
                        annotation['bbox'] = [0,0,0,0]
                        annotation['class'] = 0
                    else:
                        annotation['bbox'] = augmented['bboxes'][0]
                        annotation['class'] = 1
                else:
                    annotation['bbox'] = [0,0,0,0]
                    annotation['class'] = 0

                with open(os.path.join('aug_data', partition, 'labels', f'{image.split(".")[0]}.{x}.json'), 'w') as f:
                    json.dump(annotation, f)
        except Exception as e:
            print(e)

### Load Augmented Images to Tenforflow Dataset

In [None]:
train_images = tf.data.Dataset.list_files('aug_data\\train\\images\\*.jpg', shuffle= False)
train_images = train_images.map(load_image)
train_images = train_images.map(lambda x: tf.image.resize(x, (120, 120)))
train_images = train_images.map(lambda x: x/255)

In [None]:
test_images = tf.data.Dataset.list_files('aug_data\\test\\images\\*.jpg', shuffle= False)
test_images = test_images.map(load_image)
test_images = test_images.map(lambda x: tf.image.resize(x, (120, 120)))
test_images = test_images.map(lambda x: x/255)

In [None]:
val_images = tf.data.Dataset.list_files('aug_data\\val\\images\\*.jpg', shuffle= False)
val_images = val_images.map(load_image)
val_images = val_images.map(lambda x: tf.image.resize(x, (120, 120)))
val_images = val_images.map(lambda x: x/255)

In [None]:
train_images.as_numpy_iterator().next()

## Gte Labels

#### Build label loading function

In [None]:
def load_labels(label_path):
    with open(label_path.numpy(), 'r', encoding = "utf-8") as f:
        label = json.load(f)
    return [label['class']], label['bbox']

#### Load labels to tensorflow Dataset 

In [None]:
train_labels = tf.data.Dataset.list_files('aug_data\\train\\labels\\*.json', shuffle= False)
train_labels = train_labels.map(lambda x: tf.py_function(load_labels, [x], [tf.uint8, tf.float16]))

In [None]:
test_labels = tf.data.Dataset.list_files('aug_data\\test\\labels\\*.json', shuffle= False)
test_labels = test_labels.map(lambda x: tf.py_function(load_labels, [x], [tf.uint8, tf.float16]))

In [None]:
val_labels = tf.data.Dataset.list_files('aug_data\\val\\labels\\*.json', shuffle= False)
val_labels = val_labels.map(lambda x: tf.py_function(load_labels, [x], [tf.uint8, tf.float16]))

In [None]:
train_labels.as_numpy_iterator().next()

## Combine Label and Image Samples

In [None]:
len(train_images), len(train_labels), len(test_images), len(test_labels), len(val_images), len(val_labels)

In [None]:
train = tf.data.Dataset.zip((train_images, train_labels))
train = train.shuffle(5000)
train = train.batch(8)
train = train.prefetch(4)

In [None]:
test = tf.data.Dataset.zip((train_images, train_labels))
test = test.shuffle(5000)
test = test.batch(8)
test = test.prefetch(4)

In [None]:
val = tf.data.Dataset.zip((train_images, train_labels))
val = val.shuffle(5000)
val = val.batch(8)
val = val.prefetch(4)

In [None]:
train.as_numpy_iterator().next()

##### View Img

In [None]:
data_samples = train.as_numpy_iterator()


In [None]:
res = data_samples.next()

In [None]:
fig, ax = plt.subplots(ncols=4, figsize=(20,20))
for idx in range(4): 
    sample_image = res[0][idx].copy()
    sample_coords = res[1][1][idx]
    
    cv2.rectangle(sample_image, 
                  tuple(np.multiply(sample_coords[:2], [120,120]).astype(int)),
                  tuple(np.multiply(sample_coords[2:], [120,120]).astype(int)), 
                        (255,0,0), 2)

    ax[idx].imshow(sample_image)

## Build Deeping learning

In [None]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, Dense, GlobalMaxPooling2D
from tensorflow.keras.applications import VGG16

### Download VGG16

In [None]:
vgg = VGG16(include_top=False)

In [None]:
vgg.summary()

#### Build instance of Network

In [None]:
def build_model():
    input_layer = Input(shape=(120,120, 3))

    vgg = VGG16(include_top=False)(input_layer)  #### Note: include_top = False means we are dropping the connected layer

    f1 = GlobalMaxPooling2D()(vgg) #### For classification
    class1 = Dense(2048, activation='relu')(f1)
    class2 = Dense(1, activation = 'sigmoid')(class1)
 
    f2 = GlobalMaxPooling2D()(vgg) #### For Box Model 
    regress1 = Dense(2048, activation = 'relu')(f2)
    regress2 = Dense(4, activation = 'sigmoid')(regress1)

    facetracker  =Model(inputs = input_layer, outputs = [class2, regress2])
    return facetracker

In [None]:
facetracker = build_model()

In [None]:
facetracker.summary()

In [None]:
X, y =train.as_numpy_iterator().next()

In [None]:
X.shape

In [None]:
classes, coords =facetracker.predict(X)

In [None]:
classes

In [None]:
coords

## Create Losses and Optimizers

###### Define how much learning rate will drop after each epoch

In [None]:
batches_per_epoch = len(train)
lr_decay = (1./0.75 -1)/batches_per_epoch

In [None]:
opt = tf.keras.optimizers.Adam(learning_rate=0.0001, decay=lr_decay)  ##### Optimizer for backpropagation

In [None]:
def localization_loss(y_true, yhat):
    delta_coord = tf.reduce_sum(tf.square(y_true[:, :2] - yhat[:, :2]))
    
    h_true = y_true[:, 3] - y_true[:,1] ### Calculate actual height and width of the box
    w_true = y_true[:, 2] - y_true[:, 0]

    h_pred = yhat[:,3] - yhat[:, 1]  ### Calculate predicted height and width of the box
    w_pred = yhat[:,2] - yhat[:, 0]

    delta_size = tf.reduce_sum(tf.square(w_true - w_pred) + tf.square(h_true - h_pred))

    return delta_coord + delta_size

In [None]:
classloss = tf.keras.losses.BinaryCrossentropy()
regressloss = localization_loss

#### Test Loss Metrics

In [None]:
localization_loss(y[1], coords)


In [None]:
classloss(y[0], classes).numpy()

In [None]:
regressloss(y[1], coords)

# Train Neural Network

In [None]:
#### When creating model class from keras, always define __init__, compile, train_step, and call
class FaceTracker(Model):
    def __init__(self, facetracker,  **kwargs):   ### Pass in initial params
        super().__init__(**kwargs)
        self.model = facetracker

    def compile(self, opt, classloss, localizationloss, **kwargs):
        super().compile(**kwargs)
        self.closs = classloss
        self.lloss = localizationloss
        self.opt = opt

    def train_step(self, batch, *kwargs):
        X,y =  batch

        with tf.GradientTape() as tape:
            classes, coords = self.model(X, training=True)  ### Make prediction

            batch_classloss = self.closs(y[0], classes)   ### Cal loss
            batch_localizationloss = self.lloss(tf.cast(y[1],tf.float32), coords) ### Cal loss

            total_loss = batch_localizationloss+0.5*batch_classloss

            grad = tape.gradient(total_loss, self.model.trainable_variables) ### Cal gradient

        opt.apply_gradients(zip(grad, self.model.trainable_variables)) ### Apply backpropagation

        return {"total_loss": total_loss, "class_loss": batch_classloss, "regress_loss": batch_localizationloss}

    def test_step(self, batch, **kwargs):
        X, y  =batch

        classes, coords  =self.model(X, training  = False)

        batch_classloss = self.closs(y[0], classes)
        batch_localizationloss = self.lloss(tf.cast(y[1], tf.float32), coords)
        total_loss  =batch_localizationloss+0.5*batch_classloss

        return {"total_loss": total_loss, "class_loss": batch_classloss, "regress_loss": batch_localizationloss}

    def call(self, X, **kwargs):
        return self.model(X, **kwargs)

In [None]:
model =  FaceTracker(facetracker)

In [None]:
model.compile(opt, classloss, regressloss)

#### Log Dir

In [None]:
logdir = "logs"

In [None]:
tensorboard_callback  = tf.keras.callbacks.TensorBoard(log_dir = logdir)

In [None]:
hist  =model.fit(train, epochs = 40, validation_data=val, callbacks=[tensorboard_callback])

In [None]:
hist.history

In [None]:
fig, ax = plt.subplots(ncols=3, figsize=(20,5))

ax[0].plot(hist.history['total_loss'], color='teal', label='loss')
ax[0].plot(hist.history['val_total_loss'], color='orange', label='val loss')
ax[0].title.set_text('Loss')
ax[0].legend()

ax[1].plot(hist.history['class_loss'], color='teal', label='class loss')
ax[1].plot(hist.history['val_class_loss'], color='orange', label='val class loss')
ax[1].title.set_text('Classification Loss')
ax[1].legend()

ax[2].plot(hist.history['regress_loss'], color='teal', label='regress loss')
ax[2].plot(hist.history['val_regress_loss'], color='orange', label='val regress loss')
ax[2].title.set_text('Regression Loss')
ax[2].legend()

plt.show()

# Predicting using a test set

In [None]:
test_dat = test.as_numpy_iterator()

In [None]:
test_sample = test_dat.next()

In [None]:
yhat = facetracker.predict(test_sample[0])

In [None]:
fig, ax = plt.subplots(ncols=4, figsize=(20,20))
for idx in range(4): 
    sample_image = test_sample[0][idx].copy()
    sample_coords = yhat[1][idx]
    
    if yhat[0][idx] > 0.9:
        cv2.rectangle(sample_image, 
                      tuple(np.multiply(sample_coords[:2], [120,120]).astype(int)),
                      tuple(np.multiply(sample_coords[2:], [120,120]).astype(int)), 
                            (255,0,0), 2)
    
    ax[idx].imshow(sample_image)

### Save the model

In [None]:
from tensorflow.keras.models import load_model

In [None]:
facetracker.save('facetracker2.h5')

In [None]:
facetracker = load_model('facetracker2.h5')

## Testing real time

In [None]:
cap = cv2.VideoCapture(0)
while cap.isOpened():
    _ , frame = cap.read()
    frame = frame[50:500, 50:500,:]
    
    rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    resized = tf.image.resize(rgb, (120,120))
    
    yhat = facetracker.predict(np.expand_dims(resized/255,0))
    sample_coords = yhat[1][0]
    
    if yhat[0] > 0.5: 
        # Controls the main rectangle
        cv2.rectangle(frame, 
                      tuple(np.multiply(sample_coords[:2], [450,450]).astype(int)),
                      tuple(np.multiply(sample_coords[2:], [450,450]).astype(int)), 
                            (255,0,0), 2)
        # Controls the label rectangle
        cv2.rectangle(frame, 
                      tuple(np.add(np.multiply(sample_coords[:2], [450,450]).astype(int), 
                                    [0,-30])),
                      tuple(np.add(np.multiply(sample_coords[:2], [450,450]).astype(int),
                                    [80,0])), 
                            (255,0,0), -1)
        
        # Controls the text rendered
        cv2.putText(frame, 'face', tuple(np.add(np.multiply(sample_coords[:2], [450,450]).astype(int),
                                               [0,-5])),
                    cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2, cv2.LINE_AA)
    
    cv2.imshow('EyeTrack', frame)
    
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break
cap.release()
cv2.destroyAllWindows()