In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import tensorflow as tf
import json
import numpy as np
from matplotlib import pyplot as plt
from pathlib import Path
import cv2
import math
import wandb
from utils import extractors, transformers, modelers
from utils.extractors import Pathways

# 1 Weights & Biases Configuration

In [None]:
run = wandb.init(
    # set the wandb project where this run will be logged
    project='raspi-facial-recognition',

    # track hyperparameters and run metadata
    config={
        'learning_rate': 0.0001,
        'image_cnt': len(list(Pathways.img_path.glob('*'))),
        'epochs': 75,
        'batch_size': 128,
        'lr_decay_divider': 0.75,
        'dropout_rate': 0.2,
        'classloss_weight': 0.1,
        'loss_delta_coord_multiplier': 1
        
    }
)

class LRLogger(tf.keras.callbacks.Callback):
    def __init__(self, optimizer):
      super(LRLogger, self).__init__()
      self.optimizer = optimizer

    def on_epoch_end(self, epoch, logs):
      lr = opt._decayed_lr(tf.float32).numpy()
      wandb.log({"lr": lr}, commit=False)

config = wandb.config

### 2.2 Limit GPU Memory Growth

In [None]:
# Avoid OOM errors by setting GPU Memory Consumption Growth
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus: 
    tf.config.experimental.set_memory_growth(gpu, True)

In [None]:
tf.config.list_physical_devices('GPU')

# Create Final Datasets (Images/Labels)

In [None]:
train = transformers.create_ds('train')
test = transformers.create_ds('test')
val = transformers.create_ds('val')

# 7.3 View Images and Annotations

In [None]:
data_samples = train.as_numpy_iterator()

In [None]:
res = data_samples.next()
fig, ax = plt.subplots(ncols=4, figsize=(20,20))
for idx in range(4): 
    sample_image = res[0][idx]
    sample_coords = res[1][1][idx]
    sample_labels = ', '.join(res[1][0][idx].astype(str).tolist())

    cv2.rectangle(sample_image, 
                  tuple(np.multiply(sample_coords[0:2], [120,120]).astype(int)),
                  tuple(np.multiply(sample_coords[2:4], [120,120]).astype(int)), (255,0,0), 2)
    cv2.rectangle(sample_image, 
                  tuple(np.multiply(sample_coords[4:6], [120,120]).astype(int)),
                  tuple(np.multiply(sample_coords[6:8], [120,120]).astype(int)), (0,255,0), 2)

    ax[idx].imshow(sample_image)
    ax[idx].set_title(sample_labels)

# 8. Build Deep Learning using the Functional API

### 8.1 Import Layers and Base Network

In [None]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, Dense, GlobalMaxPooling2D, Dropout
from tensorflow.keras.applications import VGG16

### 8.2 Download VGG16

In [None]:
vgg = VGG16(include_top=False)

In [None]:
vgg.summary()

### 8.3 Build instance of Network

In [None]:
def build_model(): 
    input_layer = Input(shape=(120,120,3))
    
    vgg = VGG16(include_top=False)(input_layer)

    # Classification Model  
    f1 = GlobalMaxPooling2D()(vgg)
    class1 = Dense(2048, activation='relu')(f1)
    class1 = tf.keras.layers.Dropout(config.dropout_rate)(class1)
    class2 = Dense(1, activation='sigmoid')(class1)
    
    # Bounding box model
    f2 = GlobalMaxPooling2D()(vgg)
    regress1 = Dense(2048, activation='relu')(f2)
    regress1 = Dropout(config.dropout_rate)(regress1)
    regress2 = Dense(8, activation='sigmoid')(regress1)
    
    facetracker = Model(inputs=input_layer, outputs=[class2, regress2])
    return facetracker

### 8.4 Test out Neural Network

In [None]:
facetracker = build_model()

In [None]:
facetracker.summary()

In [None]:
X, y = train.as_numpy_iterator().next()

# 9. Define Losses and Optimizers

### 9.1 Define Optimizer and LR

In [None]:
batches_per_epoch = len(train)
lr_decay = (1./config.lr_decay_divider -1)/batches_per_epoch
f'{lr_decay:,.4}'

In [None]:
opt = tf.keras.optimizers.Adam(learning_rate=config.learning_rate, decay=lr_decay)

### 9.2 Create Localization Loss and Classification Loss

In [None]:
classloss = tf.keras.losses.BinaryCrossentropy()
regressloss = modelers.localization_loss

# 10. Train Neural Network

### 10.1 Create Custom Model Class

In [None]:
class FaceTracker(Model): 
    def __init__(self, eyetracker,  **kwargs): 
        super().__init__(**kwargs)
        self.model = eyetracker

    def compile(self, opt, classloss, localizationloss, **kwargs):
        super().compile(**kwargs)
        self.closs = classloss
        self.lloss = localizationloss
        self.opt = opt

    def lossfn(self, y_classes, y_coords, yhat_classes, yhat_coords):
        batch_classloss = self.closs(y_classes, yhat_classes)
        batch_localizationloss = self.lloss(tf.cast(y_classes, tf.float32), tf.cast(y_coords, tf.float32), yhat_coords)
        
        total_loss = batch_localizationloss + config.classloss_weight*batch_classloss
        return total_loss, batch_classloss, batch_localizationloss
    
    def train_step(self, batch, **kwargs): 
        
        X, (y_classes, y_coords) = batch
        
        with tf.GradientTape() as tape: 
            yhat_classes, yhat_coords = self.model(X, training=True)
            
            total_loss, batch_classloss, batch_localizationloss = self.lossfn(y_classes, y_coords, yhat_classes, yhat_coords)
            
            grad = tape.gradient(total_loss, self.model.trainable_variables)
        
        opt.apply_gradients(zip(grad, self.model.trainable_variables))
        
        return {"total_loss":total_loss, "class_loss":batch_classloss, "regress_loss":batch_localizationloss}
    
    def test_step(self, batch, **kwargs): 
        X, (y_classes, y_coords) = batch
        
        yhat_classes, yhat_coords = self.model(X, training=False)
        total_loss, batch_classloss, batch_localizationloss = self.lossfn(y_classes, y_coords, yhat_classes, yhat_coords)
        
        return {"total_loss":total_loss, "class_loss":batch_classloss, "regress_loss":batch_localizationloss}
        
    def call(self, X, **kwargs): 
        return self.model(X, **kwargs)

model = FaceTracker(facetracker)

model.compile(opt, classloss, regressloss)

### 10.2 Train

In [None]:
logdir='logs'

In [None]:
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=logdir)

In [None]:
hist = model.fit(train, epochs=config.epochs, validation_data=val, 
                 callbacks=[tensorboard_callback, wandb.keras.WandbMetricsLogger(), LRLogger(opt)])

run.finish()

### 10.3 Plot Performance

In [None]:
fig, ax = plt.subplots(ncols=3, figsize=(20,5))

ax[0].plot(hist.history['total_loss'], color='teal', label='loss')
ax[0].plot(hist.history['val_total_loss'], color='orange', label='val loss')
ax[0].title.set_text('Loss')
ax[0].legend()

ax[1].plot(hist.history['class_loss'], color='teal', label='class loss')
ax[1].plot(hist.history['val_class_loss'], color='orange', label='val class loss')
ax[1].title.set_text('Classification Loss')
ax[1].legend()

ax[2].plot(hist.history['regress_loss'], color='teal', label='regress loss')
ax[2].plot(hist.history['val_regress_loss'], color='orange', label='val regress loss')
ax[2].title.set_text('Regression Loss')
ax[2].legend()

plt.show()

# 11. Make Predictions

### 11.1 Make Predictions on Test Set

In [None]:
test_data = val.as_numpy_iterator()
test_sample = test_data.next()

In [None]:
test_sample = test_data.next()

yhat = facetracker.predict(test_sample[0])

fig, ax = plt.subplots(ncols=4, figsize=(20,20))
for idx in range(4): 
    sample_image = test_sample[0][idx]
    sample_coords = yhat[1][idx]
    
    if yhat[0][idx][0] > 0.9:
        cv2.rectangle(sample_image, 
                      tuple(np.multiply(sample_coords[0:2], [120,120]).astype(int)),
                      tuple(np.multiply(sample_coords[2:4], [120,120]).astype(int)), (255,0,0), 2)
    if yhat[0][idx][1] > 0.9:
        cv2.rectangle(sample_image, 
                      tuple(np.multiply(sample_coords[4:6], [120,120]).astype(int)),
                      tuple(np.multiply(sample_coords[6:8], [120,120]).astype(int)), (0,255,0), 2)
    
    ax[idx].imshow(sample_image)
    ax[idx].set_title(', '.join([f'{x:,.0%}' for x in yhat[0][idx]]))

### 11.2 Save the Model

In [None]:
facetracker.save('facetracker.h5')