In [None]:
import tensorflow as tf
import keras
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Flatten, Dense, Dropout, BatchNormalization
from tensorflow.keras.applications import VGG16
from tensorflow.keras import mixed_precision

#* autoreload to reload modules
%load_ext autoreload
%autoreload 2

#* import custom modules
from process import *
for gpu in tf.config.list_physical_devices('GPU'):
    tf.config.experimental.set_memory_growth(gpu, True)
tf.test.is_gpu_available()

#* mixed precision to speed up training
# policy = mixed_precision.Policy('mixed_float16')
# mixed_precision.set_global_policy(policy)


In [None]:
size = (120, 120)
batch_size = 8

In [None]:
train_images = prepare_images("aug_data/train/images/*.jpg", size=size)
test_images = prepare_images("aug_data/test/images/*.jpg", size=size)
val_images = prepare_images("aug_data/val/images/*.jpg", size=size)

train_labels = prepare_labels("aug_data/train/labels/*.json")
test_labels = prepare_labels("aug_data/test/labels/*.json")
val_labels = prepare_labels("aug_data/val/labels/*.json")

train = combine(train_images, train_labels, batch_size=batch_size)
test = combine(test_images, test_labels)
val = combine(val_images, val_labels)

In [None]:
def build_model(size=size):
    input_layer = Input(shape=(size[0],size[1], 3))
    vgg = VGG16(include_top=False)(input_layer)

    #* 1 for classification
    f1 = tf.keras.layers.GlobalMaxPooling2D()(vgg)
    class1 = Dense(1024, activation='relu')(f1)
    class2 = Dense(1, activation='sigmoid')(class1)
    # class2 = tf.cast(class2, tf.float16)

    #* 4 for bounding box
    f2 = tf.keras.layers.GlobalAveragePooling2D()(vgg)
    reggress1 = Dense(1024, activation='relu')(f2)
    reggress2 = Dense(4, activation='sigmoid')(reggress1)
    #reggress2 = tf.cast(reggress2, tf.float16)

    face_tracker = Model(inputs = input_layer, outputs = [class2, reggress2])

    return face_tracker

face_tracker = build_model()
#face_tracker.summary()

## Define losses and Optimizer

In [None]:
batches_per_epoch = len(train)
lr_decay = (1./0.75 - 1) / (batches_per_epoch)
opt = tf.keras.optimizers.Adam(learning_rate=0.0001, decay=lr_decay)

[YOLO](https://stats.stackexchange.com/questions/319243/object-detection-loss-function-yolo)

In [None]:
def localization_loss(y_true, y_pred):
    # if len(y_true.shape) > 2:
    #     y_true = tf.squeeze(y_true, axis=0)
    #     #real_coords.shape.as_list() == pred_coords.shape.as_list()
    # try:
    #     delta_coord = tf.reduce_sum(tf.square(y_true[:, :2] - y_pred[:, :2]))
    # except :
    #     y_true = tf.squeeze(y_true, axis=0)
    delta_coord = tf.reduce_sum(tf.square(y_true[:, :2] - y_pred[:, :2]))
    
    try :
        h_true = y_true[:, 3]  - y_true[:, 1]
        w_true = y_true[:, 2]  - y_true[:, 0]
        
        h_pred = y_pred[:, 3]  - y_pred[:, 1]
        w_pred = y_pred[:, 2]  - y_pred[:, 0]
    except Exception as e:
        print(e)
        print(y_true)
        print(y_pred)
        raise e
    
    delta_size = tf.reduce_sum(tf.square(w_true - w_pred) + tf.square(h_true - h_pred))
    # delta_size = tf.reduce_sum(tf.square(tf.sqrt(w_true) - tf.sqrt(w_pred)) + tf.square(tf.sqrt(h_true) - tf.sqrt(h_pred)))    
    
    return delta_coord + 0.5*delta_size

In [None]:
class_loss = tf.keras.losses.BinaryCrossentropy()
regression_loss = localization_loss

def total_loss (y_true, y_pred):
    class_loss = tf.keras.losses.BinaryCrossentropy()(y_true[0], y_pred[0])
    regression_loss = localization_loss(y_true[1], y_pred[1])
    return class_loss + regression_loss

In [None]:
checking = False
if checking:
    train_iter = train.as_numpy_iterator()
    real_coords_list = []
    pred_coords_list = []
    class_loss_hist = []
    regression_loss_hist = []
    class_shape_tracker = []
    coords_shape_tracker = []

    for i in range(len(train)):
        data_sample = train_iter.next()
        
        img = data_sample[0]
        labels = data_sample[1]
        
        real_class = labels[0]
        real_coords = labels[1]
        
        #* model 
        with tf.GradientTape() as tape:
            pred_class, pred_coords = face_tracker(img, training=False)
            try :
                # if len(real_coords.shape) > 2:
                #     real_coords = tf.squeeze(real_coords, axis=0)
                #     real_coords.shape.as_list() == pred_coords.shape.as_list()
                # class_loss_value = class_loss(real_class, pred_class)
                # regression_loss_value = regression_loss(real_coords, pred_coords)
                # class_loss_hist.append(class_loss_value)
                # regression_loss_hist.append(regression_loss_value)
                class_shape_tracker.append(len(real_class.shape))
                coords_shape_tracker.append(len(real_coords.shape))
            except Exception as e:
                print(e)
                print(real_class)
                print(pred_class)
                print(real_coords)
                print(pred_coords)
                raise e


## Define the model

In [None]:
class FaceTracker(Model):
    def __init__(self, model, **kwargs):
        super().__init__(**kwargs)
        self.model = model
        self.coord_track = []
        
    def compile(self, optimizer, class_loss, regression_loss, **kwargs):
        super().compile(**kwargs)
        self.optimizer = optimizer
        self.class_loss = class_loss
        self.regression_loss = regression_loss
        
    def train_step(self, batch, **kwargs):
        x, y = batch
        
        with tf.GradientTape() as tape:
            #* predict
            classes, coords = self.model(x, training=True)
            #self.coord_track.append(coords)
            #* calculate loss
            batch_class_loss = self.class_loss(y[0], classes)
            batch_regression_loss = self.regression_loss(y[1], coords)
            
            #* total loss
            total_loss = 2*batch_regression_loss + batch_class_loss
            
            #* get gradients
            grad = tape.gradient(total_loss, self.model.trainable_variables)
            
        #* update weights
        self.optimizer.apply_gradients(zip(grad, self.model.trainable_variables))
        return {'loss': total_loss, 'class_loss': batch_class_loss, 'regression_loss': batch_regression_loss}
    
    def test_step(self, batch, **kwargs):
        x, y = batch
        
        classes, coords = self.model(x, training=False)
        batch_class_loss = self.class_loss(y[0], classes)
        batch_regression_loss = self.regression_loss(y[1], coords)
        
        total_loss = batch_regression_loss +  batch_class_loss
        return {'loss': total_loss, 'class_loss': batch_class_loss, 'regression_loss': batch_regression_loss}
        
    def call(self, x, **kwargs):
        return self.model(x, **kwargs)

In [None]:
face_tracker = build_model()
model = FaceTracker(face_tracker)
model.compile(optimizer=opt, class_loss=class_loss, regression_loss=regression_loss)

In [None]:
#* save best model
import os
if not os.path.exists('models'):
    os.mkdir('models')
checkpoint_path = "models/best_model.h5"

model_checkpoint = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_path,
    save_weights_only=True,
    monitor='val_loss',
    mode='min',
    save_best_only=True)

#* early stopping
early_stopping = tf.keras.callbacks.EarlyStopping(
    monitor='val_loss',
    min_delta=0,
    patience=25,
    verbose=0,
    mode='auto',
    baseline=None,
    restore_best_weights=True)

In [None]:
logdir = 'logs'
tensorboard_callback = tf.keras.callbacks.TensorBoard(logdir)

In [None]:
hist = model.fit(
    train, epochs=500, validation_data=val, callbacks=[tensorboard_callback, model_checkpoint, early_stopping])

In [None]:
data_sample = train.as_numpy_iterator()
x, y = data_sample.next()

In [None]:
i = 4
img = x[i]
pred_coord_img = model.predict(x)[1][i]

In [None]:
pt1 = int(pred_coord_img[:2][0] * img.shape[0]), int(pred_coord_img[:2][1] * img.shape[1])
pt2 = int(pred_coord_img[2:][0] * img.shape[0]), int(pred_coord_img[2:][1] * img.shape[1])

cv2.rectangle(
    img= img,
    pt1 = pt1,
    pt2 = pt2,
    color = (0, 255, 0),
    thickness=1
)
plt.imshow(img)

In [None]:
model.save('models/face_tracker.tf')

In [None]:
model = tf.keras.models.load_model('models/face_tracker.tf')

In [None]:
cap = cv2.VideoCapture(1)
while cap.isOpened():
    _ , frame = cap.read()
    frame = frame[50:500, 50:500,:]
    
    rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    resized = tf.image.resize(rgb, (120,120))
    
    yhat = model.predict(np.expand_dims(resized/255,0))
    sample_coords = yhat[1][0]
    
    if yhat[0] > 0.5: 
        # Controls the main rectangle
        cv2.rectangle(frame, 
                      tuple(np.multiply(sample_coords[:2], [450,450]).astype(int)),
                      tuple(np.multiply(sample_coords[2:], [450,450]).astype(int)), 
                            (255,0,0), 2)
        # Controls the label rectangle
        cv2.rectangle(frame, 
                      tuple(np.add(np.multiply(sample_coords[:2], [450,450]).astype(int), 
                                    [0,-30])),
                      tuple(np.add(np.multiply(sample_coords[:2], [450,450]).astype(int),
                                    [80,0])), 
                            (255,0,0), -1)
        
        # Controls the text rendered
        cv2.putText(frame, 'face', tuple(np.add(np.multiply(sample_coords[:2], [450,450]).astype(int),
                                               [0,-5])),
                    cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2, cv2.LINE_AA)
    
    cv2.imshow('EyeTrack', frame)
    
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break
cap.release()
cv2.destroyAllWindows()

In [None]:
import cv2
import numpy as np
import tensorflow as tf

model = tf.keras.models.load_model('models/face_tracker.tf')

cap = cv2.VideoCapture()
while cap.isOpened():
    _ , frame = cap.read()
    frame = frame[50:500, 50:500,:]
    
    rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    resized = tf.image.resize(rgb, (120,120))
    
    yhat = model.predict(np.expand_dims(resized/255,0))
    sample_coords = yhat[1][0]
    
    if yhat[0] > 0.5: 
        # Controls the main rectangle
        cv2.rectangle(frame, 
                      tuple(np.multiply(sample_coords[:2], [450,450]).astype(int)),
                      tuple(np.multiply(sample_coords[2:], [450,450]).astype(int)), 
                            (255,0,0), 2)
        # Controls the label rectangle
        cv2.rectangle(frame, 
                      tuple(np.add(np.multiply(sample_coords[:2], [450,450]).astype(int), 
                                    [0,-30])),
                      tuple(np.add(np.multiply(sample_coords[:2], [450,450]).astype(int),
                                    [80,0])), 
                            (255,0,0), -1)
        
        # Controls the text rendered
        cv2.putText(frame, 'face', tuple(np.add(np.multiply(sample_coords[:2], [450,450]).astype(int),
                                               [0,-5])),
                    cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2, cv2.LINE_AA)
    
    cv2.imshow('frame', frame)
    
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break
cap.release()
cv2.destroyAllWindows()

In [None]:
import cv2

cap = cv2.VideoCapture(0, cv2.CAP_DSHOW)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)


while cap.isOpened():
    ret, frame = cap.read()
    cv2.imshow('frame', frame)
    if cv2.waitKey(1) == ord('q'):
        break
    

cap.release()
cv2.destroyAllWindows()

In [None]:
!pip uninstall opencv-python 

In [None]:
!pip install opencv-python