In [None]:
!pip install Cmake

In [None]:
!pip install tensorflow opencv-python albumentations matplotlib

In [None]:
!pip install tensorflow-cpu

In [None]:
import tensorflow as tf
import numpy as np
import cv2
import os
import json
from matplotlib import pyplot as plt
from tensorflow.keras.models import load_model

In [None]:
#Avoid OOM errors by setting GPU Memory Consumption Growth
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)

In [None]:
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))

In [None]:
images = tf.data.Dataset.list_files('data/images/*.jpg', shuffle=False)

In [None]:
def load_image(x):
    byte_img = tf.io.read_file(x)
    img = tf.io.decode_jpeg(byte_img)
    return img

In [None]:
images = images.map(load_image)

In [None]:
images.as_numpy_iterator().next()

In [None]:
image_generator = images.batch(4).as_numpy_iterator()

In [None]:
plot_images = image_generator.next()

In [None]:
fig, ax = plt.subplots(ncols=4, figsize=(20,20))
for idx, image in enumerate(plot_images):
    ax[idx].imshow(image)
plt.show()

In [None]:
for folder in ['train', 'test', 'val']:
    for file in os.listdir(os.path.join('data', folder, 'images')):
        
        filename = file.split('.')[0]+'.json'
        existing_filepath = os.path.join('data', 'labels', filename)
        if os.path.exists(existing_filepath):
            new_filepath = os.path.join('data', folder, 'labels', filename)
            os.replace(existing_filepath, new_filepath)

In [None]:
import albumentations as alb

In [None]:
augmentor = alb.Compose([alb.RandomCrop(width=1500, height=1500),
                        alb.HorizontalFlip(p=0.5),
                        alb.RandomBrightnessContrast(p=0.2),
                        alb.RandomGamma(p=0.2),
                        alb.VerticalFlip(p=0.5),
                        alb.RGBShift(p=0.2)],
                       bbox_params=alb.BboxParams(format='albumentations', label_fields=['class_labels']))

In [None]:
img = cv2.imread(os.path.join('data','train', 'images', '1685344292297.jpg'))

In [None]:
with open(os.path.join('data','train','labels', '1685344292297.json'), 'r') as f:
    label = json.load(f)

In [None]:
label['shapes'][0]['points']

In [None]:
coords = [0,0,0,0]
coords[0] = label['shapes'][0]['points'][0][0]
coords[1] = label['shapes'][0]['points'][0][1]
coords[2] = label['shapes'][0]['points'][1][0]
coords[3] = label['shapes'][0]['points'][1][1]

In [None]:
img.shape

In [None]:
coords = list(np.divide(coords, [1840, 3264, 1840, 3264]))

In [None]:
coords

In [None]:
augmented = augmentor(image=img, bboxes=[coords], class_labels=['face'])

In [None]:
augmented['image'].shape

In [None]:
cv2.rectangle(augmented['image'],
             tuple(np.multiply(augmented['bboxes'][0][:2], [1500, 1500]).astype(int)),
                tuple(np.multiply(augmented['bboxes'][0][2:], [1500, 1500]).astype(int)),
                        (0,255,0), 3)
plt.imshow(augmented['image'])

In [None]:
for partition in ['train', 'val', 'test']:
    for image in os.listdir(os.path.join('data', partition, 'images')):
        img = cv2.imread(os.path.join('data', partition, 'images', image))
        
        coords = [0,0,0.00001,0.00001]
        label_path = os.path.join('data', partition, 'labels', f'{image.split(".")[0]}.json')
        if os.path.exists(label_path):
            with open(label_path, 'r') as f:
                label = json.load(f)
            
            coords[0] = label['shapes'][0]['points'][0][0]
            coords[1] = label['shapes'][0]['points'][0][1]
            coords[2] = label['shapes'][0]['points'][1][0]
            coords[3] = label['shapes'][0]['points'][1][1]
            coords = list(np.divide(coords, [1840, 3264, 1840, 3264]))
            
        try:
            for x in range(50):
                augmented = augmentor(image=img, bboxes=[coords], class_labels=['face'])
                cv2.imwrite(os.path.join('aug_data', partition, 'images', f'{image.split(".")[0]}.{x}.jpg'), augmented['image'])
                
                annotation = {}
                annotation['image'] = image
                
                if os.path.exists(label_path):
                    if len(augmented['bboxes']) == 0:
                        annotation['bbox'] = [0,0,0,0]
                        annotation['class'] = 0
                    else:
                        annotation['bbox'] = augmented['bboxes'][0]
                        annotation['class'] = 1
                else:
                    annotation['bbox'] = [0,0,0,0]
                    annotation['class'] = 0
                    
                with open(os.path.join('aug_data', partition, 'labels', f'{image.split(".")[0]}.{x}.json'), 'w') as f:
                    json.dump(annotation, f)
        except Exception as e:
            print(e)

In [None]:
train_images = tf.data.Dataset.list_files('aug_data/train/images/*.jpg', shuffle=False)
train_images = train_images.map(load_image)
train_images = train_images.map(lambda img: tf.image.resize(img, (120,120)))
train_images = train_images.map(lambda img: img/255)

In [None]:
val_images = tf.data.Dataset.list_files('aug_data/val/images/*.jpg', shuffle=False)
val_images = val_images.map(load_image)
val_images = val_images.map(lambda img: tf.image.resize(img, (120,120)))
val_images = val_images.map(lambda img: img/255)

In [None]:
test_images = tf.data.Dataset.list_files('aug_data/test/images/*.jpg', shuffle=False)
test_images = test_images.map(load_image)
test_images = test_images.map(lambda img: tf.image.resize(img, (120,120)))
test_images = test_images.map(lambda img: img/255)

In [None]:
def load_labels(label_path):
    with open(label_path.numpy(), 'r', encoding = 'utf-8') as f:
        label = json.load(f)
    return [label['class']], label['bbox']

In [None]:
train_labels = tf.data.Dataset.list_files('aug_data/train/labels/*.json', shuffle=False)
train_labels = train_labels.map(lambda lbl: tf.py_function(load_labels, [lbl], [tf.uint8, tf.float16]))

In [None]:
val_labels = tf.data.Dataset.list_files('aug_data/val/labels/*.json', shuffle=False)
val_labels = val_labels.map(lambda lbl: tf.py_function(load_labels, [lbl], [tf.uint8, tf.float16]))

In [None]:
test_labels = tf.data.Dataset.list_files('aug_data/test/labels/*.json', shuffle=False)
test_labels = test_labels.map(lambda lbl: tf.py_function(load_labels, [lbl], [tf.uint8, tf.float16]))

#  Combine Label and Image Samples

In [None]:
len(train_images), len(val_images), len(test_images)

In [None]:
train = tf.data.Dataset.zip((train_images, train_labels)).shuffle(6000).batch(8).prefetch(4)

In [None]:
val = tf.data.Dataset.zip((val_images, val_labels)).shuffle(1000).batch(8).prefetch(4)

In [None]:
test = tf.data.Dataset.zip((test_images, test_labels)).shuffle(1000).batch(8).prefetch(4)

In [None]:
train.as_numpy_iterator().next()[0].shape

In [None]:
data_samples = train.as_numpy_iterator()

In [None]:
res = data_samples.next()

In [None]:
fig, ax = plt.subplots(ncols=4, figsize=(20,20))
for i in range(4):
    sample_image = res[0][i]
    sample_coords = res[1][1][i]
    
    cv2.rectangle(sample_image,
                 tuple(np.multiply(sample_coords[:2], [120,120]).astype(int)),
                 tuple(np.multiply(sample_coords[2:], [120, 120]).astype(int)),
                 (0,255,0), 1)
    
    ax[i].imshow(sample_image)

# Neural Network

In [None]:
vgg = tf.keras.applications.VGG16(include_top=False)

In [None]:
vgg.summary()

In [None]:
def build_model():
    input_layer = tf.keras.layers.Input(shape=(120,120,3))
    
    vgg = tf.keras.applications.VGG16(include_top=False)(input_layer)
    
    f1 = tf.keras.layers.GlobalMaxPooling2D()(vgg)
    clas_1 = tf.keras.layers.Dense(2048, activation='relu')(f1)
    clas_2 = tf.keras.layers.Dense(1, activation='sigmoid')(clas_1)
    
    f2 = tf.keras.layers.GlobalMaxPooling2D()(vgg)
    reg_1 = tf.keras.layers.Dense(2048, activation='relu')(f2)
    reg_2 = tf.keras.layers.Dense(4, activation='sigmoid')(reg_1)
    
    facedetector = tf.keras.models.Model(inputs=input_layer, outputs=[clas_2, reg_2])
    return facedetector

In [None]:
facedetector = build_model()

In [None]:
facedetector.summary()

In [None]:
X, y = train.as_numpy_iterator().next()

In [None]:
X.shape

In [None]:
classes, coords = facedetector.predict(X)

In [None]:
classes, coords

# Losses and Optimizers

In [None]:
batch = len(train)
lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
    initial_learning_rate=0.0001,
    decay_steps=10000,
    decay_rate=(1./0.75-1)/batch)

In [None]:
optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule)

In [None]:
def localization_loss(y_true, yhat):
    delta_coord = tf.reduce_sum(tf.square(y_true[:,:2] - yhat[:,:2]))
    
    h_true = y_true[:,3] - y_true[:,1]
    w_true = y_true[:,2] - y_true[:,0]
    
    h_pred = yhat[:,3] - yhat[:,1]
    w_pred = yhat[:,2] - yhat[:,0]
    
    delta_size = tf.reduce_sum(tf.square(w_true-w_pred) + tf.square(h_true-h_pred))
    
    return delta_coord + delta_size

In [None]:
clasLoss = tf.keras.losses.BinaryCrossentropy()
regLoss = localization_loss

In [None]:
regLoss(y[1], coords)

In [None]:
clasLoss(y[0], classes)

# Train Neural Network

In [None]:
class FaceDetector(tf.keras.models.Model):
    def __init__(self, eyedetector, **kwargs):
        super().__init__(**kwargs)
        self.model = eyedetector
    
    def compile(self, optimizer, clasLoss, localizationloss, **kwargs):
        super().compile(**kwargs)
        self.closs = clasLoss
        self.lloss = localizationloss
        self.optimizer = optimizer
        
    def train_step(self, batch, **kwargs):
        
        X, y = batch
        
        with tf.GradientTape() as tape:
            classes, coords = self.model(X, training=True)
            
            batch_classloss = self.closs(y[0], classes)
            batch_localloss = self.lloss(tf.cast(y[1], tf.float32), coords)
            
            total_loss = batch_localloss+0.5*batch_classloss
            
            grad = tape.gradient(total_loss, self.model.trainable_variables)
            
        optimizer.apply_gradients(zip(grad, self.model.trainable_variables))
        
        return {"total_loss": total_loss, "class_loss": batch_classloss, "regress_loss": batch_localloss}
    
    def test_step(self, batch, **kwargs):
        X, y = batch
        
        classes, coords = self. model(X, training=False)
        
        batch_classloss = self.closs(y[0], classes)
        batch_localloss = self.lloss(tf.cast(y[1], tf.float32), coords)    
        total_loss = batch_localloss+0.5*batch_classloss
        
        return {"total_loss": total_loss, "class_loss": batch_classloss, "regress_loss": batch_localloss}
    
    def call(self, X, **kwargs):
        return self.model(X, **kwargs)

In [None]:
model = FaceDetector(facedetector)

In [None]:
model.compile(optimizer, clasLoss, regLoss)

In [None]:
history = model.fit(train.take(25), epochs=10, validation_data=val, callbacks=[tf.keras.callbacks.TensorBoard(log_dir='logdir')])

In [None]:
fig, ax = plt.subplots(ncols=3, figsize=(20,5))

ax[0].plot(history.history['total_loss'], color='teal', label='loss')
ax[0].plot(history.history['val_total_loss'], color='orange', label='val loss')
ax[0].title.set_text('Loss')
ax[0].legend()

ax[1].plot(history.history['class_loss'], color='teal', label='class loss')
ax[1].plot(history.history['val_class_loss'], color='orange', label='val class loss')
ax[1].title.set_text('Classification Loss')
ax[1].legend()

ax[2].plot(history.history['regress_loss'], color='teal', label='regressloss')
ax[2].plot(history.history['val_regress_loss'], color='orange', label='val regress loss')
ax[2].title.set_text('Regression Loss')
ax[2].legend()

plt.show()

In [None]:
test_data = test.as_numpy_iterator()

In [None]:
test_sample = test_data.next()

In [None]:
yhat = facedetector.predict(test_sample[0])

In [None]:
fig, ax= plt.subplots(ncols=4, figsize=(20,20))
for i in range(4):
    sample_image = test_sample[0][i]
    sample_coords = yhat[1][i]
    
    if yhat[0][i] > 0.5:
        cv2. rectangle(sample_image,
                      tuple(np.multiply(sample_coords[:2], [120,120]).astype(int)),
                       tuple(np.multiply(sample_coords[2:], [120,120]).astype(int)),
                      (0,255,0), 1)
        
    ax[i].imshow(sample_image)

In [None]:
facedetector.save('facedetector.h5')

In [None]:
facedetector = load_model('facedetector.h5')