In [None]:
import cv2
import os
import time
import uuid

In [None]:
IMAGES_PATH=os.path.join('data','images')
number_images=30

In [None]:
cap=cv2.VideoCapture(0)
for imgnum in range(number_images):
  ret,frame=cap.read()
  imgname=os.path.join(IMAGES_PATH,f'{str(uuid.uuid1())}.jpg')
  cv2.imwrite(imgname,frame)
  cv2.imshow('frame',frame)
  time.sleep(0.5)

  if cv2.waitKey(1) & 0xFF==ord('q'):
    break

cap.release()
cv2.destroyAllWindows()

In [None]:
import tensorflow as tf
import json
import numpy as np
from matplotlib import pyplot as plt

In [None]:
images=tf.data.Dataset.list_files('data/images/*.jpg',shuffle=False)

In [None]:
images.as_numpy_iterator().next()

In [None]:
def load_image(x):
  byte_img=tf.io.read_file(x)
  img=tf.io.decode_jpeg(byte_img)
  return img

In [None]:
images=images.map(load_image)

In [None]:
images.as_numpy_iterator().next()[0]

In [None]:
for folder in ['train','test','val']:
    for file in os.listdir(os.path.join('data',folder,'images')):
        filename=file.split('.')[0]+'.json'
        existing_filepath=os.path.join('data','labels',filename)
        
        if os.path.exists(existing_filepath):
            new_filepath=os.path.join('data',folder,'labels',filename)
            os.replace(existing_filepath,new_filepath)

In [None]:
import albumentations as alb

In [None]:
augmentor =alb.Compose([alb.RandomCrop(width=450,height=450),
                       alb.HorizontalFlip(p=.5),
                       alb.RandomBrightnessContrast(p=.2),
                       alb.RandomGamma(p=.2),
                       alb.RGBShift(p=.5),
                       alb.VerticalFlip(p=.5),
                       ],
                      bbox_params=alb.BboxParams(format='albumentations',
                                                label_fields=['class_labels']))

In [None]:
img=cv2.imread(os.path.join('data','test','images','1c6c4762-1650-11ee-9a76-c8b29bfe0ea8.jpg'))

In [None]:
with open(os.path.join('data','test','labels','1c6c4762-1650-11ee-9a76-c8b29bfe0ea8.json'),'r') as f:
    label=json.load(f)

In [None]:
coord=[0,0,0,0]

coord[0]=label['shapes'][0]['points'][0][0]
coord[1]=label['shapes'][0]['points'][0][1]
coord[2]=label['shapes'][0]['points'][1][0]
coord[3]=label['shapes'][0]['points'][1][1]


In [None]:
coord

In [None]:
coord=np.divide(coord,[640,480,640,480])

In [None]:
coord

In [None]:
augmented=augmentor(image=img,bboxes=[coord],class_labels=['face'])

In [None]:
augmented

In [None]:
cv2.rectangle(augmented['image'],
                tuple(np.multiply(augmented['bboxes'][0][:2],[450,450]).astype(int)),
                tuple(np.multiply(augmented['bboxes'][0][2:],[450,450]).astype(int)),
                (255,0,0),2
            )

plt.imshow(augmented['image'])

In [None]:
for partition in ['test','train','val']:
    for image in os.listdir(f'data/{partition}/images'):
        img=cv2.imread(f'data/{partition}/images/{image}')

        coord=[0,0,0.0001,.0001]
        label_path=f'data/{partition}/labels/{image.split(".")[0]}.json'

        if os.path.exists(label_path):
            with open(label_path,'r') as f:
                label=json.load(f)
            
            coord[0]=label['shapes'][0]['points'][0][0]
            coord[1]=label['shapes'][0]['points'][0][1]
            coord[2]=label['shapes'][0]['points'][1][0]
            coord[3]=label['shapes'][0]['points'][1][1]

            coord=list(np.divide(coord,[640,480,640,480]))

        try:
            for x in range(100):
                augmented=augmentor(image=img,bboxes=[coord],class_labels=['face'])
                cv2.imwrite(f'aug_data/{partition}/images/{image.split(".")[0]}{x}.jpg',augmented['image'])

                annotations={}
                annotations['image']=image

                if os.path.exists(label_path):
                    if len(augmented['bboxes'])==0:
                        annotations['bbox']=[0,0,0,0]
                        annotations['class']=0

                    else :
                        annotations['bbox']=augmented['bboxes'][0]
                        annotations['class']=1
                
                else :
                    annotations['bbox']=[0,0,0,0]
                    annotations['class']=0

                with open(f'aug_data/{partition}/labels/{image.split(".")[0]}{x}.json','w') as f:
                    json.dump(annotations,f)
            
        except Exception as e:
            print(e)
            


In [None]:
train_images=tf.data.Dataset.list_files('aug_data/train/images/*.jpg',shuffle=False)
train_images=train_images.map(load_image)
train_images=train_images.map(lambda x: tf.image.resize(x,(120,120)))
train_images=train_images.map(lambda x: x/255)

In [None]:
test_images=tf.data.Dataset.list_files('aug_data/test/images/*.jpg',shuffle=False)
test_images=test_images.map(load_image)
test_images=test_images.map(lambda x: tf.image.resize(x,(120,120)))
test_images=test_images.map(lambda x: x/255)

In [None]:
val_images=tf.data.Dataset.list_files('aug_data/val/images/*.jpg',shuffle=False)
val_images=val_images.map(load_image)
val_images=val_images.map(lambda x: tf.image.resize(x,(120,120)))
val_images=val_images.map(lambda x: x/255)

In [None]:
train_images.as_numpy_iterator().next().shape

In [None]:
def load_labels(label_path):
    with open(label_path.numpy(),'r',encoding='utf-8') as f:
        label=json.load(f)
    
    return [label['class']],label['bbox']

In [None]:
train_labels=tf.data.Dataset.list_files('aug_data/train/labels/*.json',shuffle=False)
train_labels=train_labels.map(lambda x: tf.py_function(load_labels,[x],[tf.uint8,tf.float16]))
    

In [None]:
train_labels.as_numpy_iterator().next()

In [None]:
test_labels=tf.data.Dataset.list_files('aug_data/test/labels/*.json',shuffle=False)
test_labels=test_labels.map(lambda x: tf.py_function(load_labels,[x],[tf.uint8,tf.float16]))

In [None]:
val_labels=tf.data.Dataset.list_files('aug_data/val/labels/*.json',shuffle=False)
val_labels=val_labels.map(lambda x: tf.py_function(load_labels,[x],[tf.uint8,tf.float16]))

In [None]:
train=tf.data.Dataset.zip((train_images,train_labels))
train=train.shuffle(200)
train=train.batch(8)
train=train.prefetch(4)

In [None]:
test=tf.data.Dataset.zip((test_images,test_labels))
test=test.shuffle(50)
test=test.batch(8)
test=test.prefetch(4)

In [None]:
val=tf.data.Dataset.zip((val_images,val_labels))
val=val.shuffle(50)
val=val.batch(8)
val=val.prefetch(4)

In [None]:
train.as_numpy_iterator().next()[1]

In [None]:
res=train.as_numpy_iterator().next()

In [None]:
fig,ax= plt.subplots(ncols=4,figsize=(20,20))

for i in range(4):
    sample_image=res[0][i]
    sample_coords=res[1][1][i]

    cv2.rectangle(sample_image,
                    tuple(np.multiply(sample_coords[:2],[120,120]).astype(int)),
                    tuple(np.multiply(sample_coords[2:],[120,120]).astype(int)),
                    (250,0,0),1
                )
    
    ax[i].imshow(sample_image)

In [None]:
from tensorflow.keras.models import Model
from keras.applications import ResNet50
from keras.layers import Input,Conv2D,MaxPooling2D,Dense,Flatten,Add,GlobalAveragePooling2D

In [None]:
def build_model():

  input_layer=Input(shape=(120,120,3))
  vgg=ResNet50(include_top=False)(input_layer)

  f1=GlobalAveragePooling2D()(vgg)
  class1=Dense(2048,activation='relu')(f1)
  class2=Dense(1,activation='sigmoid')(class1)

  f2=GlobalAveragePooling2D()(vgg)
  reg1=Dense(2048,activation='relu')(f2)
  reg2=Dense(4,activation='sigmoid')(reg1)

  model=Model(inputs=input_layer,outputs=[class2,reg2])
  return model

In [None]:
facetracker=build_model()
facetracker.summary()

In [None]:
def localization_loss(y_true,yhat):
    delta_cord=tf.reduce_sum(tf.square(y_true[:,:2]-yhat[:,:2]))

    h_true=y_true[:,3]-y_true[:,1]
    w_true=y_true[:,2]-y_true[:,0]

    h_pred=yhat[:,3]-yhat[:,1]
    w_pred=yhat[:,2]-yhat[:,0]
    
    delta_size=tf.reduce_sum(tf.square(w_true-w_pred)+tf.square(h_true-h_pred))

    return delta_cord+delta_size

In [None]:
opt=tf.keras.optimizers.legacy.Adam(learning_rate=.0001)

In [None]:
classloss=tf.keras.losses.BinaryCrossentropy()
regressloss=localization_loss

In [None]:
model.compile(loss=[classloss,regressloss], optimizer='adam',loss_weights=[.5,1])

In [None]:
class FaceTracker(Model):
    def __init__(self, eyetracker,  **kwargs):
        super().__init__(**kwargs)
        self.model = eyetracker

    def compile(self, opt, classloss, localizationloss, **kwargs):
        super().compile(**kwargs)
        self.closs = classloss
        self.lloss = localizationloss
        self.opt = opt

    def train_step(self, batch, **kwargs):

        X, y = batch

        with tf.GradientTape() as tape:
            classes, coords = self.model(X, training=True)

            batch_classloss = self.closs(y[0], classes)
            batch_localizationloss = self.lloss(tf.cast(y[1], tf.float32), coords)

            total_loss = batch_localizationloss+0.5*batch_classloss

            grad = tape.gradient(total_loss, self.model.trainable_variables)

        opt.apply_gradients(zip(grad, self.model.trainable_variables))

        return {"total_loss":total_loss, "class_loss":batch_classloss, "regress_loss":batch_localizationloss}

    def test_step(self, batch, **kwargs):
        X, y = batch

        classes, coords = self.model(X, training=False)

        batch_classloss = self.closs(y[0], classes)
        batch_localizationloss = self.lloss(tf.cast(y[1], tf.float32), coords)
        total_loss = batch_localizationloss+0.5*batch_classloss

        return {"total_loss":total_loss, "class_loss":batch_classloss, "regress_loss":batch_localizationloss}

    def call(self, X, **kwargs):
        return self.model(X, **kwargs)

In [None]:
model=FaceTracker(facetracker)

In [None]:
model.compile(opt,classloss,regressloss)

In [None]:
model.fit(train,validation_data=val,epochs=10)

In [None]:
img=cv2.imread('aug_data/test/images/1c6c4762-1650-11ee-9a76-c8b29bfe0ea80.jpg')
plt.imshow(img)

In [None]:
img=cv2.resize(img,(120,120))
img=img/255
pred=model.predict(np.expand_dims(img,0))

In [None]:
pred[1][0][:2]

In [None]:
cv2.rectangle(img,
                tuple(np.multiply(pred[1][0][:2],[120,120]).astype(int)),
                tuple(np.multiply(pred[1][0][2:],[120,120]).astype(int)),
                (250,0,0),2)

plt.imshow(img)

In [None]:
for i in range(5):
    model.fit(train,validation_data=val,epochs=2)
    img=cv2.imread('aug_data/test/images/1c6c4762-1650-11ee-9a76-c8b29bfe0ea80.jpg')
    img=cv2.resize(img,(120,120))
    img=img/255
    pred=model.predict(np.expand_dims(img,0))
    cv2.rectangle(img,
                    tuple(np.multiply(pred[1][0][:2],[120,120]).astype(int)),
                    tuple(np.multiply(pred[1][0][2:],[120,120]).astype(int)),
                    (250,0,0),2)
    plt.imshow(img)

In [None]:
import joblib

In [None]:
model=joblib.load('faceDetect.joblib')

In [None]:
cap=cv2.VideoCapture(0)

while True:
  ret,frame=cap.read()
  
  frame=cv2.resize(frame,(120,120))
  frame=frame/255
  pred=model.predict(np.expand_dims(frame,0))
  frame=cv2.resize(frame,(480,480))
  cv2.rectangle(frame,
                    tuple(np.multiply(pred[1][0][:2],[480,480]).astype(int)),
                    tuple(np.multiply(pred[1][0][2:],[480,480]).astype(int)),
                    (0,0,250),1)
                    
  cv2.imshow('frame',frame)
  if cv2.waitKey(1) & 0xFF==ord('q'):
    break

cap.release()
cv2.destroyAllWindows()
