In [None]:
import cv2
import uuid
import os
import time

In [None]:
!pip install labelme albumentations

### collecting images

In [None]:
IMAGES_PATH=os.path.join("data","images")
num_of_images=10

In [None]:
cap=cv2.VideoCapture(0)

for img_num in range(num_of_images):
    print("Collecting image ",img_num)
    ret,frame=cap.read()
    imgpath=os.path.join(IMAGES_PATH,f'{str(uuid.uuid1())}.jpg',)
    cv2.imwrite(imgpath,frame)
    cv2.imshow("frame",frame)
    time.sleep(1)
    if cv2.waitKey(1) & 0xFF==ord('q'):
        break
cap.release()
cv2.destroyAllWindows()

### annotating images with labelMe

In [None]:
!labelme

### Review dataset and build image loading function

In [None]:
import tensorflow as tf
import matplotlib.pyplot as plt
import numpy as np
import json

#### load image into tf data pipeline

In [None]:
images=tf.data.Dataset.list_files("data/images/*.jpg",shuffle=False)

In [None]:
images.as_numpy_iterator().next()

In [None]:
def load_image(x):
    byte_img=tf.io.read_file(x)
    img=tf.io.decode_jpeg(byte_img)
    return img

In [None]:
images=images.map(load_image)

In [None]:
images.as_numpy_iterator().next()

In [None]:
image_generator=images.batch(4).as_numpy_iterator()

In [None]:
plot_images=image_generator.next()

In [None]:
fig,ax=plt.subplots(ncols=4,figsize=(20,20))
for idx,image in enumerate(plot_images):
    ax[idx].imshow(image)
plt.show()

In [None]:
#### train test split done manually

#### moving matching labels 

In [None]:
for folders in ['train','test','val']:
    for file in os.listdir(os.path.join('data',folders,'images')):
        filename=file.split('.')[0]+'.json'
        existing_filepath=os.path.join('data','labels',filename)
        if os.path.exists(existing_filepath):
            new_filepath=os.path.join('data',folders,'labels',filename)
            os.replace(existing_filepath,new_filepath)

### apply image augmentation

In [None]:
import albumentations as alb

In [None]:
transform = alb.Compose([
    alb.RandomCrop(width=450, height=450),
    alb.HorizontalFlip(p=0.5),
    alb.RandomBrightnessContrast(p=0.2),
    alb.RandomGamma(p=0.2),
    alb.RGBShift(p=0.2),
    alb.VerticalFlip(p=0.5)],
    bbox_params=alb.BboxParams(format='albumentations',label_fields=['class_labels']))

In [None]:
img=cv2.imread(os.path.join('data','train','images','941bed60-199a-11ee-baac-900f0c7ffb34.jpg'))

In [None]:
img

In [None]:
with open(os.path.join('data','train','labels','941bed60-199a-11ee-baac-900f0c7ffb34.json'),'r') as f:
    label=json.load(f)

In [None]:
label['shapes'][0]['label']

#### extract coordinates and rescale to match image resolution

In [None]:
coords=[0,0,0,0]
coords[0]=label['shapes'][0]['points'][1][0]
coords[1]=label['shapes'][0]['points'][1][1]
coords[2]=label['shapes'][0]['points'][0][0]
coords[3]=label['shapes'][0]['points'][0][1]

In [None]:
coords

In [None]:
coords=list(np.divide(coords,[640,480,640,480]))

In [None]:
coords

#### apply augmentation

In [None]:
augmented=transform(image=img,bboxes=[coords],class_labels=['face'])

In [None]:
augmented

In [None]:
cv2.rectangle(augmented['image'],tuple(np.multiply(augmented['bboxes'][0][0:2],[450,450]).astype(int)),
              tuple(np.multiply(augmented['bboxes'][0][2:],[450,450]).astype(int)), (255,0,0),2)
plt.imshow(augmented['image'])

### augmentation pipeline

In [None]:
for folder in ['train','test','val']:
    for image in os.listdir(os.path.join('data',folder,'images')):
        img =cv2.imread(os.path.join('data',folder,'images',image))
        coords=[0,0,0.00001,0.00001]
        label_path=os.path.join('data',folder,'labels',f'{image.split(".")[0]}.json')
        if os.path.exists(label_path):
            with open(label_path,'r') as f:
                label=json.load(f)
                
            if (label['shapes'][0]['points'][1][0])<(label['shapes'][0]['points'][0][0]):
                coords[0]=label['shapes'][0]['points'][1][0]
                coords[1]=label['shapes'][0]['points'][1][1]
                coords[2]=label['shapes'][0]['points'][0][0]
                coords[3]=label['shapes'][0]['points'][0][1]
            else:
                coords[0]=label['shapes'][0]['points'][0][0]
                coords[1]=label['shapes'][0]['points'][0][1]
                coords[2]=label['shapes'][0]['points'][1][0]
                coords[3]=label['shapes'][0]['points'][1][1]
            coords=list(np.divide(coords,[640,480,640,480]))
            
        for i in range(60):
            augmented=transform(image=img,bboxes=[coords],class_labels=['face'])
            cv2.imwrite(os.path.join('aug_data',folder,'images',f'{image.split(".")[0]}.{i}.jpg'),augmented['image'])
            
            annotations={}
            annotations['image']=image
            
            if(os.path.exists(label_path)):
                if len(augmented['bboxes'])==0:
                    annotations['bboxes']=[0,0,0,0]
                    annotations['class']=0
                else:
                    annotations['bboxes']=augmented['bboxes'][0]
                    annotations['class']=1
            else:
                annotations['bboxes']=[0,0,0,0]
                annotations['class']=0
            
            with open(os.path.join('aug_data',folder,'labels',f'{image.split(".")[0]}.{i}.json'),'w') as f:
                json.dump(annotations,f)

#### load augmented images into tensorlfow dataset

In [None]:
train_images = tf.data.Dataset.list_files('aug_data/train/images/*.jpg', shuffle=False)
train_images = train_images.map(load_image)
train_images = train_images.map(lambda x: tf.image.resize(x, (120,120)))
train_images = train_images.map(lambda x: x/255)

In [None]:
test_images = tf.data.Dataset.list_files('aug_data/test/images/*.jpg', shuffle=False)
test_images = test_images.map(load_image)
test_images = test_images.map(lambda x: tf.image.resize(x, (120,120)))
test_images = test_images.map(lambda x: x/255)

In [None]:
val_images = tf.data.Dataset.list_files('aug_data/val/images/*.jpg', shuffle=False)
val_images = val_images.map(load_image)
val_images = val_images.map(lambda x: tf.image.resize(x, (120,120)))
val_images = val_images.map(lambda x: x/255)

### preparing labels

In [None]:
def load_labels(label_path):
    with open(label_path.numpy(),'r',encoding='utf-8') as f:
        label=json.load(f)
    
    return [label['class'],label['bboxes']]

In [None]:
train_labels = tf.data.Dataset.list_files('aug_data/train/labels/*.json', shuffle=False)
train_labels = train_labels.map(lambda x: tf.py_function(load_labels, [x], [tf.uint8, tf.float16]))

In [None]:
test_labels = tf.data.Dataset.list_files('aug_data/test/labels/*.json', shuffle=False)
test_labels = test_labels.map(lambda x: tf.py_function(load_labels, [x], [tf.uint8, tf.float16]))

In [None]:
val_labels = tf.data.Dataset.list_files('aug_data/val/labels/*.json', shuffle=False)
val_labels = val_labels.map(lambda x: tf.py_function(load_labels, [x], [tf.uint8, tf.float16]))

In [None]:
train_labels.as_numpy_iterator().next()

### combine image and labels

In [None]:
len(train_images),len(train_labels),len(test_images),len(test_labels),len(val_images),len(val_labels)

#### create final datasets

In [None]:
train=tf.data.Dataset.zip((train_images,train_labels))
train=train.shuffle(3000)
train=train.batch(8)
train=train.prefetch(4)

In [None]:
test=tf.data.Dataset.zip((test_images,test_labels))
test=test.shuffle(3000)
test=test.batch(8)
test=test.prefetch(4)

In [None]:
val=tf.data.Dataset.zip((val_images,val_labels))
val=val.shuffle(3000)
val=val.batch(8)
val=val.prefetch(4)

In [None]:
train.as_numpy_iterator().next()[0].shape

#### view images

In [None]:
data_samples=train.as_numpy_iterator()

In [None]:
res=data_samples.next()

In [None]:
fig,ax=plt.subplots(ncols=4,figsize=(120,120))
for idx in range(4):
    sample_image=res[0][idx]
    sample_coords=res[1][1][idx]
    
    cv2.rectangle(sample_image,
                  tuple(np.multiply(sample_coords[:2],[120,120]).astype(int)),
                  tuple(np.multiply(sample_coords[2:],[120,120]).astype(int)),
                 (255,0,0),2)
    ax[idx].imshow(sample_image)

### Build deep learning model

In [None]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, GlobalMaxPooling2D, Dense
from tensorflow.keras.applications import VGG16

#### download vgg16

In [None]:
vgg=VGG16(include_top=False)

In [None]:
vgg.summary()

In [None]:
def build_model():
    input_layer=Input(shape=(120,120,3))
    vgg=VGG16(include_top=False)(input_layer)
    
    #classification model
    f1=GlobalMaxPooling2D()(vgg)
    class1=Dense(units=2048,activation='relu')(f1)
    class2=Dense(units=1,activation='sigmoid')(class1)
    
    #bounding box model
    f2=GlobalMaxPooling2D()(vgg)
    regres1=Dense(units=2048,activation='relu')(f2)
    regres2=Dense(4,activation='sigmoid')(regres1)
    
    facetracker=Model(inputs=input_layer,outputs=[class2,regres2])
    
    return facetracker

#### test neural network

In [None]:
facetracker=build_model()

In [None]:
facetracker.summary()

In [None]:
X,y=train.as_numpy_iterator().next()

In [None]:
X.shape

In [None]:
y[1]

In [None]:
classes,coords=facetracker.predict(X)

In [None]:
classes,coords ##not correct as nn is not trained

### Define losses and optimizers

In [None]:
len(train)

In [None]:
batches_per_epoch=len(train)
lr_decay=(1./0.75-1)/batches_per_epoch

In [None]:
opt=tf.keras.optimizers.legacy.Adam(learning_rate=0.0001,decay=lr_decay)

#### create localisation loss and classification loss

In [None]:
def localizaton_loss(y_true,yhat):
    delta_coord=tf.reduce_sum(tf.square(y_true[:,:2]-yhat[:,:2]))
    
    h_true=y_true[:,3]-y_true[:,1]
    w_true=y_true[:,2]-y_true[:,0]
    
    h_pred=yhat[:,3]-yhat[:,1]
    w_pred=yhat[:,2]-yhat[:,0]
    
    delta_size=tf.reduce_sum(tf.square(h_true-h_pred)+tf.square(w_true-w_pred))
    
    return delta_size+delta_coord

In [None]:
classloss=tf.keras.losses.BinaryCrossentropy()
regressloss=localizaton_loss

#### test loss metrics

In [None]:
localizaton_loss(y[1],coords)

In [None]:
classloss(y[0],classes)

### Train the model

#### create custom model class

In [None]:
class FaceTracker(Model):
    def __init__(self,facetracker,**kwargs):
        super().__init__(**kwargs)
        self.model=facetracker
    
    def compile(self,opt,classloss,localizatonloss,**kwargs):
        super().compile(**kwargs)
        self.closs=classloss
        self.lloss=localizatonloss
        self.opt=opt
        
    def train_step(self,batch,**kwargs):
        X,y=batch
        
        with tf.GradientTape() as tape:
            classes,coords=self.model(X,training=True)
            
            batch_classloss=self.closs(y[0],classes)
            batch_localizationloss=self.lloss(tf.cast(y[1],tf.float32),coords)
            
            total_loss=batch_localizationloss+0.5*batch_classloss
            
            grad=tape.gradient(total_loss,self.model.trainable_variables)
        
        opt.apply_gradients(zip(grad,self.model.trainable_variables))
        
        return {"total_loss":total_loss, "class_loss":batch_classloss, "regess_loss":batch_localizationloss}
    
    def test_step(self,batch,**kwargs):
        X,y=batch
        
        classes,coords=self.model(X,training=False)
        
        batch_classloss=self.closs(y[0],classes)
        batch_localizationloss=self.lloss(tf.cast(y[1],tf.float32),coords)
        
        total_loss=batch_localizationloss+0.5*batch_classloss
        
        return {"total_loss":total_loss, "class_loss":batch_classloss, "regess_loss":batch_localizationloss}
    
    def call(self,X,**kwargs):
        return self.model(X,**kwargs)

In [None]:
model=FaceTracker(facetracker)

In [None]:
model.compile(opt,classloss,regressloss)

#### Train

In [None]:
logdir='logs'

In [None]:
tensorboard_callbacks=tf.keras.callbacks.TensorBoard(log_dir=logdir)

In [None]:
hist=model.fit(train,epochs=13,validation_data=val,callbacks=[tensorboard_callbacks])

#### plot performance

In [None]:
fig,ax=plt.subplots(ncols=3,figsize=(20,5))

ax[0].plot(hist.history['total_loss'],color='teal',label='loss')
ax[0].plot(hist.history['val_total_loss'],color='orange',label='val_loss')
ax[0].title.set_text('loss')
ax[0].legend()

ax[1].plot(hist.history['class_loss'],color='teal',label='class_loss')
ax[1].plot(hist.history['val_class_loss'],color='orange',label='val_class_loss')
ax[1].title.set_text('class loss')
ax[1].legend()

ax[2].plot(hist.history['regess_loss'],color='teal',label='regress_loss')
ax[2].plot(hist.history['val_regess_loss'],color='orange',label='val_regress_loss')
ax[2].title.set_text('regress loss')
ax[2].legend()

plt.show()

### Make predictions

In [None]:
test_data=test.as_numpy_iterator()

In [None]:
test_sample=test_data.next()

In [None]:
yhat=facetracker.predict(test_sample[0])

In [None]:
yhat

In [None]:
fig,ax=plt.subplots(ncols=4,figsize=(20,20))

for idx in range(4):
    sample_image=test_sample[0][idx]
    sample_coords=yhat[1][idx]
    
    if yhat[0][idx]>0.5:
        cv2.rectangle(sample_image,
                  tuple(np.multiply(sample_coords[:2],[120,120]).astype(int)),
                  tuple(np.multiply(sample_coords[2:],[120,120]).astype(int)),
                 (255,0,0),2)
        
    ax[idx].imshow(sample_image)

#### save the model

In [None]:
from tensorflow.keras.models import load_model

In [None]:
facetracker.save('facetracker.h5')

In [None]:
facetracker_loaded=load_model('facetracker.h5')

#### realtime detection

In [None]:
cap=cv2.VideoCapture(0)

while cap.isOpened():
    ret,frame=cap.read()
    frame=frame[50:500,50:500,:]
    
    rgb=cv2.cvtColor(frame,cv2.COLOR_BGR2RGB)
    resized=tf.image.resize(rgb,(120,120))
    yhat=facetracker_loaded.predict(np.expand_dims(resized/255,0))
    coords=yhat[1][0]
    
    if yhat[0][0]>0.5:
        print(yhat)
        ### main rectangle
        cv2.rectangle(frame,tuple(np.multiply(coords[:2],[450,450]).astype(int)),
                      tuple(np.multiply(coords[2:],[450,450]).astype(int)),
                      (255,0,0),2)
        
        ### text rectangle
        cv2.rectangle(frame, 
                      tuple(np.add(np.multiply(coords[:2], [450,450]).astype(int), 
                                    [0,-30])),
                      tuple(np.add(np.multiply(coords[:2], [450,450]).astype(int),
                                    [80,0])), 
                            (255,0,0), -1)
        
        ### text
        cv2.putText(frame, 'face', tuple(np.add(np.multiply(coords[:2], [450,450]).astype(int),
                                               [0,-5])),
                    cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2, cv2.LINE_AA)
        
    cv2.imshow("facetracker",frame)
    
    if cv2.waitKey(1) & 0xFF==ord('q'):
        break
cap.release()
cv2.destroyAllWindows()