In [None]:
import tensorflow as tf
import os 
import numpy as np 
import cv2 
import matplotlib.pyplot as plt 
import json
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, Dense, GlobalMaxPooling2D
from tensorflow.keras.applications import VGG16

In [None]:
def load_images(img) : 
    img = tf.io.read_file(img) 
    img = tf.io.decode_jpeg(img )
    img = img /255 
    img = tf.image.resize(img  , [120, 120])
    return img 



In [None]:
def load_labels(label_path ) :
    with open(label_path.numpy(), 'r', encoding = "utf-8")  as file : 
        label = json.load(file )
    return tf.cast(tf.one_hot(label['class'] ,depth=6 ), tf.uint8 ), label['bbox']


In [None]:
train_images_files = tf.data.Dataset.list_files(os.path.join(os.getcwd() , 'aug_data' , 'train' , 'images' , '*.jpg') , shuffle =False) 
train_labels_files = tf.data.Dataset.list_files(os.path.join(os.getcwd(), 'aug_data' , 'train' , 'labels', '*.json' )  , shuffle=False)


In [None]:
train_image_ds = train_images_files.map(load_images) 
train_labels_ds = train_labels_files.map(lambda x: tf.py_function(load_labels, [x], [tf.uint8, tf.float16])) 


In [None]:
val_images_files = tf.data.Dataset.list_files(os.path.join(os.getcwd() , 'aug_data' , 'val' , 'images' , '*.jpg') , shuffle =False) 
val_labels_files = tf.data.Dataset.list_files(os.path.join(os.getcwd(), 'aug_data' , 'val' , 'labels', '*.json' )  , shuffle=False)


In [None]:
val_image_ds = val_images_files.map(load_images) 
val_labels_ds = val_labels_files.map(lambda x: tf.py_function(load_labels, [x], [tf.uint8, tf.float16])) 


In [None]:
train_data  = tf.data.Dataset.zip((train_image_ds  , train_labels_ds)) 
val_data = tf.data.Dataset.zip((val_image_ds , val_labels_ds))

In [None]:
for image , label in train_data.take(1).as_numpy_iterator() : 
    plt.imshow(image)
    print(label)

In [None]:
train_data = train_data.shuffle(5000).batch(8).prefetch(4)
val_data = val_data.shuffle(1000).batch(8).prefetch(4)

In [None]:
vgg = VGG16(include_top=False)

In [None]:
def build_model(): 
    input_layer = Input(shape=(120,120,3))
    
    vgg = VGG16(include_top=False)(input_layer)

    # Classification Model  
    f1 = GlobalMaxPooling2D()(vgg)
    class1 = Dense(2048, activation='relu')(f1)
    class2 = Dense(6, activation='softmax')(class1)
    
    # Bounding box model
    f2 = GlobalMaxPooling2D()(vgg)
    regress1 = Dense(2048, activation='relu')(f2)
    regress2 = Dense(4, activation='sigmoid')(regress1)
    
    facetracker = Model(inputs=input_layer, outputs=[class2, regress2])
    return facetracker

In [None]:

facetracker = build_model()

In [None]:
batches_per_epoch = len(train_data) 
lr_decay = (1./0.75 -1)/batches_per_epoch
opt = tf.keras.optimizers.Adam(learning_rate=0.0001, decay=lr_decay)

In [None]:
def localization_loss(y_true, yhat):            
    delta_coord = tf.reduce_sum(tf.square(y_true[:,:2] - yhat[:,:2]))
    h_true = y_true[:,3] - y_true[:,1] 
    w_true = y_true[:,2] - y_true[:,0] 

    h_pred = yhat[:,3] - yhat[:,1] 
    w_pred = yhat[:,2] - yhat[:,0] 
    
    delta_size = tf.reduce_sum(tf.square(w_true - w_pred) + tf.square(h_true-h_pred))
    
    return delta_coord + delta_size

In [None]:
classloss = tf.keras.losses.CategoricalCrossentropy()
regressloss = localization_loss

In [None]:
class FaceTracker(Model): 
    def __init__(self, facetracker,  **kwargs): 
        super().__init__(**kwargs)
        self.model = facetracker

    def compile(self, opt, classloss, localizationloss, **kwargs):
        super().compile(**kwargs)
        self.closs = classloss
        self.lloss = localizationloss
        self.opt = opt
    
    def train_step(self, batch, **kwargs): 
        
        X, y = batch
        
        with tf.GradientTape() as tape: 
            classes, coords = self.model(X, training=True)
            
            batch_classloss = self.closs(y[0], classes)
            batch_localizationloss = self.lloss(tf.cast(y[1], tf.float32), coords)
            
            total_loss = batch_localizationloss+0.5*batch_classloss
            
            grad = tape.gradient(total_loss, self.model.trainable_variables)
        
        opt.apply_gradients(zip(grad, self.model.trainable_variables))
        
        return {"total_loss":total_loss, "class_loss":batch_classloss, "regress_loss":batch_localizationloss}
    
    def test_step(self, batch, **kwargs): 
        X, y = batch
        
        classes, coords = self.model(X, training=False)
        
        batch_classloss = self.closs(y[0], classes)
        batch_localizationloss = self.lloss(tf.cast(y[1], tf.float32), coords)
        total_loss = batch_localizationloss+0.5*batch_classloss
        
        return {"total_loss":total_loss, "class_loss":batch_classloss, "regress_loss":batch_localizationloss}
        
    def call(self, X, **kwargs): 
        return self.model(X, **kwargs)

In [None]:
facetracker = tf.keras.models.load_model('facetracker')

In [None]:
model = FaceTracker(facetracker)

In [None]:
model.compile(opt, classloss, regressloss)

In [None]:
hist = model.fit(train_data, epochs=5, validation_data=val_data)

In [None]:
facetracker.save('facetracker_1')

In [None]:
test_sample  = val_data.as_numpy_iterator().next() 
yhat = facetracker.predict(test_sample[0])

In [None]:

fig, ax = plt.subplots(ncols=4, figsize=(20,20))
for idx in range(4): 
    sample_image = test_sample[0][idx]
    sample_coords = yhat[1][idx]
    cv2.rectangle(sample_image, 
                      tuple(np.multiply(sample_coords[:2], [120,120]).astype(int)),
                      tuple(np.multiply(sample_coords[2:], [120,120]).astype(int)), 
                            (255,0,0), 2)
    
    ax[idx].imshow(sample_image)