In [None]:
import tensorflow as tf

from IPython.display import clear_output
from matplotlib.patches import Rectangle
import matplotlib.pyplot as plt
import albumentations as alb
from sklearn import metrics
import numpy as np
import skimage
import shutil
import time
import math
import glob
import json
import cv2
import os

# Set GPU Growth

In [None]:
# Avoid out of memory errors
gpus = tf.config.experimental.list_physical_devices("GPU")
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)

In [None]:
gpus

# Build Data Pipeline

In [None]:
files = os.listdir("images/my_face/")

filenames = [files[i][:-4] for i in range(0, len(files), 2)]

In [None]:
def load_image(file):
    byte_img = tf.io.read_file(file)
    img = tf.io.decode_jpeg(byte_img)
    return img

In [None]:
augmentor = alb.Compose([alb.HorizontalFlip(p=0.5), 
                         alb.RandomBrightnessContrast(p=0.3),
                         alb.RandomGamma(p=0.3),
                         alb.RGBShift(p=0.3),
                         alb.VerticalFlip(p=0.5)], 
                       bbox_params=alb.BboxParams(format='albumentations', 
                                                  label_fields=['class_labels']))

In [None]:
# Detection dataset
filenames = os.listdir("images/detection/")
filenames = filenames + os.listdir("images/my_face")
filenames = [filenames[i][:-4] for i in range(0, len(filenames), 2)]

faces = 0
total = 0

folder = "none"

for file in filenames:
    if os.path.exists(f"images/detection/{file}.json"):
        folder = "detection"
    elif os.path.exists(f"images/my_face/{file}.json"):
        folder = "my_face"
        
    if folder != "none":
        img = cv2.imread(os.path.join('images', folder, file + ".jpg"))
        
        with open(f"images/{folder}/" + file + ".json", "r") as f:
            data = json.load(f)
            class_id = 0
            if len(data["shapes"]) > 0:
                class_id = 1
                faces += 1
            total += 1

        for i in range(10):
            augmented = augmentor(image=np.array(img), bboxes=[[0,0,0.00001,0.00001]], class_labels=["face"])
                
            cv2.imwrite(os.path.join('images', 'augmented', 'detection_images', f'{file}.{i}.jpg'), augmented['image'])

            annotation = {}
            annotation['image'] = file

            annotation['class'] = class_id

            with open(os.path.join('images', 'augmented', 'detection_labels', f'{file}.{i}.json'), 'w') as f:
                json.dump(annotation, f)
    folder = "none"

In [None]:
print("True outputs: ", faces)
print("False outputs: ", total - faces)

In [None]:
# Location dataset
filenames = os.listdir("images/my_face")
filenames = [filenames[i][:-4] for i in range(0, len(files), 2)]

for file in filenames:
    if os.path.exists(f"images/my_face/{file}.json"):
        img = cv2.imread(os.path.join('images', 'my_face', file + ".jpg"))
        
        with open("images/my_face/" + file + ".json", "r") as f:
            data = json.load(f)
            coordinates = [0,0,0.00001,0.00001]
            if len(data["shapes"]) > 0:
                point = data["shapes"][0]["points"]
                coordinates[0] = point[0][0] / img.shape[1]
                coordinates[1] = point[0][1] / img.shape[0]
                coordinates[2] = point[1][0] / img.shape[1]
                coordinates[3] = point[1][1] / img.shape[0]
                faces += 1

                for i in range(10):
                    augmented = augmentor(image=np.array(img), bboxes=[coordinates], class_labels=["face"])

                    cv2.imwrite(os.path.join('images', 'augmented', 'detection_images', f'{file}.{i}.jpg'), augmented['image'])

                    annotation = {}
                    annotation['image'] = file


                    cv2.imwrite(os.path.join('images', 'augmented', 'location_images', f'{file}.{i}.jpg'), augmented['image'])
                    annotation['bbox'] = augmented['bboxes'][0]
                    with open(os.path.join('images', 'augmented', 'location_labels', f'{file}.{i}.json'), 'w') as f:
                        json.dump(annotation, f)

In [None]:
detection_images = tf.data.Dataset.list_files('images\\augmented\\detection_images\\*.jpg', shuffle=False)
detection_images = detection_images.map(load_image)
detection_images = detection_images.map(lambda x: tf.image.resize(x, (120,120)))
detection_images = detection_images.map(lambda x: x/255)

In [None]:
location_images = tf.data.Dataset.list_files("images\\augmented\\location_images\\*.jpg", shuffle=False)
location_images = location_images.map(load_image)
location_images = location_images.map(lambda x: tf.image.resize(x, (120, 120)))
location_images = location_images.map(lambda x: x/255)

In [None]:
def load_detection_labels(label_path):
    with open(label_path.numpy(), 'r', encoding = "utf-8") as f:
        label = json.load(f)
        
    return [label['class']]

In [None]:
def load_location_labels(label_path):
    with open(label_path.numpy(), 'r', encoding = "utf-8") as f:
        label = json.load(f)
        
    return [label['bbox']]

In [None]:
detection_labels = tf.data.Dataset.list_files('images\\augmented\\detection_labels\\*.json', shuffle=False)
detection_labels = detection_labels.map(lambda x: tf.py_function(load_detection_labels, [x], [tf.uint8]))

In [None]:
location_labels = tf.data.Dataset.list_files("images\\augmented\\location_labels\\*.json", shuffle=False)
location_labels = location_labels.map(lambda x: tf.py_function(load_location_labels, [x], [tf.float16]))

In [None]:
detection_data = tf.data.Dataset.zip((detection_images, detection_labels))
detection_data = detection_data.shuffle(5000)
detection_data = detection_data.batch(8)
detection_data = detection_data.prefetch(4)

In [None]:
location_data = tf.data.Dataset.zip((location_images, location_labels))
location_data = location_data.shuffle(5000)
location_data = location_data.batch(8)
location_data = location_data.prefetch(4)

In [None]:
TRAIN_SIZE = int(len(detection_data) * .8)
VAL_SIZE = int((len(detection_data) - TRAIN_SIZE) / 2)

detection_train = detection_data.take(TRAIN_SIZE)
detection_val = detection_data.skip(TRAIN_SIZE).take(VAL_SIZE)
detection_test = detection_data.skip(TRAIN_SIZE + VAL_SIZE)

In [None]:
TRAIN_SIZE = int(len(location_data) * .8)
VAL_SIZE = int((len(location_data) - TRAIN_SIZE) / 2)

location_train = location_data.take(TRAIN_SIZE)
location_val = location_data.skip(TRAIN_SIZE).take(VAL_SIZE)
location_test = location_data.skip(TRAIN_SIZE + VAL_SIZE)

In [None]:
index = np.random.randint(8)
index = 6
rand_batch = np.random.randint(33)
for image, labels in location_train.skip(rand_batch).take(1):
    plt.imshow(image[index])
    
    point = np.array(labels[0][index]) * 120

    print(labels[0][index])
    print(point)
    plt.gca().add_patch(Rectangle((point[0],point[1]), point[2]-point[0], point[3]-point[1], 
                                  edgecolor="green", facecolor="none", lw=3))

In [None]:
index = np.random.randint(8)
index = 6
rand_batch = np.random.randint(162)
for image, labels in detection_train.skip(rand_batch).take(1):
    plt.imshow(image[index])
    
    class_id = labels[0][index]

    print(class_id)

# Create Model

In [None]:
def detection_model():
    input = tf.keras.Input(shape=(120, 120, 3))
    
    base_model = tf.keras.applications.VGG19(include_top=False, weights="imagenet", input_shape=(120, 120, 3))
#     base_model = tf.keras.applications.MobileNetV2(include_top=False, weights="imagenet", input_shape=(120, 120, 3))
    out = base_model(input)
    
    x = tf.keras.layers.GlobalMaxPooling2D()(out)
    x = tf.keras.layers.Dense(256, activation="relu")(x)
    x = tf.keras.layers.Dense(1, activation="sigmoid")(x)
    
    model = tf.keras.Model(inputs=input, outputs=x)
    return model

In [None]:
def location_model():
    input = tf.keras.Input(shape=(120, 120, 3))
    
    base_model = tf.keras.applications.VGG19(include_top=False, weights="imagenet", input_shape=(120, 120, 3))
#     base_model = tf.keras.applications.MobileNetV2(include_top=False, weights="imagenet", input_shape=(120, 120, 3))
    out = base_model(input)
    
    x = tf.keras.layers.GlobalMaxPooling2D()(out)
    x = tf.keras.layers.Dense(512, activation="relu")(x)
    x = tf.keras.layers.Dense(4, activation="sigmoid")(x)
    
    model = tf.keras.Model(inputs=input, outputs=x)
    return model

In [None]:
detection = detection_model()
location = location_model()

In [None]:
detection.summary()

In [None]:
location.summary()

In [None]:
def localization_loss(y_true, yhat):            
    delta_coord = tf.reduce_sum(tf.square(y_true[:,:2] - yhat[:,:2]))
                  
    h_true = y_true[:,3] - y_true[:,1] 
    w_true = y_true[:,2] - y_true[:,0] 

    h_pred = yhat[:,3] - yhat[:,1] 
    w_pred = yhat[:,2] - yhat[:,0] 
    
    delta_size = tf.reduce_sum(tf.square(w_true - w_pred) + tf.square(h_true-h_pred))
    
    return delta_coord + delta_size

In [None]:
detection.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4),
                  loss=tf.keras.losses.BinaryCrossentropy())
location.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4),
                 loss=localization_loss)

In [None]:
detection_history = detection.fit(detection_train, epochs=50, validation_data=detection_val)

In [None]:
actual = []
predicted = []
true_positive = 0
true_negative = 0
false_positive = 0
false_negative = 0

print("Calculating...")
for image, label in detection_test:
    for batch in range(len(label)):
        class_id = detection.predict(np.expand_dims(image[batch], axis=0), verbose=0)

        actual.append(label[0][batch])
        predicted.append(np.round(class_id[0][0]))

        if np.round(class_id[0][0]) == label[0][batch]:
            if class_id == 1:
                true_positive += 1
            else:
                true_negative += 1
        else:
            if class_id == 1:
                false_positive += 1
            else:
                false_negative += 1
print("Done")

confusion_matrix = metrics.confusion_matrix(actual, predicted)
cm_display = metrics.ConfusionMatrixDisplay(confusion_matrix=confusion_matrix, display_labels=["Hand", "No hand"])

cm_display.plot()
plt.show()

In [None]:
location_history = location.fit(true_train, epochs=400, validation_data=true_val)

In [None]:
fig, ax = plt.subplots(ncols=2, figsize=(20,5))

ax[0].plot(detection_history.history['loss'], label='loss')
ax[0].plot(detection_history.history['val_loss'], label='val_loss')
ax[1].plot(location_history.history['loss'], label='loss')
ax[1].plot(location_history.history['val_loss'], label='val_loss')

plt.ylim([0, max(plt.ylim())])
plt.xlabel('Epoch #')
plt.ylabel('CE/token')
plt.legend()

In [None]:
for image, label in detection_test.skip(1).take(1):
    image = image[0]
    plt.imshow(np.array(image))
    
    class_id = detection.predict(np.array([image]))
    point = location.predict(np.array([image]))
    point = point[0] * 120.0
    
    print(class_id[0][0])
    print(point)
    
    if class_id[0][0] > .1:
        plt.gca().add_patch(Rectangle((point[0], point[1]), point[2]-point[0], point[3]-point[1],
                                     edgecolor="green", facecolor="none", lw=3))

In [None]:
image = load_image("images/Nathan1.jpeg")
shape = image.shape
plt.imshow(np.array(image)/255.0)

image = np.array([tf.image.resize(image, (120, 120))])/255.0

start = time.time()
class_id = detection.predict(image)
point = location.predict(image)
print(time.time() - start)
point = point[0] * [shape[1], shape[0], shape[1], shape[0]]

print(class_id[0][0])
print(point)

if class_id[0][0] > 0:
    plt.gca().add_patch(Rectangle((point[0],point[1]), point[2]-point[0], point[3]-point[1], 
                                  edgecolor="green", facecolor="none", lw=3))

# Implement

In [None]:
detection.save("face_detection")
location.save("face_location")

In [None]:
detection = tf.keras.models.load_model("face_detection")
location = tf.keras.models.load_model("face_location", compile=False)

In [None]:
cap = cv2.VideoCapture(0)

while True:
    _, frame = cap.read()

    rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    img = tf.image.resize(rgb, (120, 120))
    class_id = detection.predict(np.array([img])/255.0, verbose=0)
    point = location.predict(np.array([img])/255.0, verbose=0)
    point = point[0] * [640, 480, 640, 480]

    if class_id[0][0] > 0.5:
        frame = cv2.putText(frame, f"Score: {class_id[0][0]}", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2)
        frame = cv2.rectangle(frame, (int(point[0]), int(point[1])), 
                            (int(point[2]), int(point[3])), color=(0, 255, 0))
    
    print(frame.shape)
    cv2.imshow("Window", frame)
    
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()

# Convert to TFLite

In [None]:
def representative_dataset():
    for _ in range(100):
        data = np.random.rand(1, 120, 120, 3)
        yield [data.astype(np.float16)]

In [None]:
converter = tf.lite.TFLiteConverter.from_saved_model("face_detection")
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.representative_dataset = representative_dataset
converter.target_spec.supported_types = [tf.float16]
tflite_model = converter.convert()

with open("face_detection.tflite", "wb") as f:
    f.write(tflite_model)

In [None]:
converter = tf.lite.TFLiteConverter.from_saved_model("face_location")
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.representative_dataset = representative_dataset
converter.target_spec.supported_types = [tf.float16]
tflite_model = converter.convert()

with open("face_location.tflite", "wb") as f:
    f.write(tflite_model)

In [None]:
detection_interpreter = tf.lite.Interpreter("face_detection.tflite")
detection_signature = detection_interpreter.get_signature_runner()

location_interpreter = tf.lite.Interpreter("face_location.tflite")
location_signature = location_interpreter.get_signature_runner()

In [None]:
image = load_image("images/Nathan1.jpeg")
shape = image.shape
plt.imshow(np.array(image) / 255.0)

image = tf.image.resize(image, (120, 120)) / 255.0
image = np.array([image])

start = time.time()
class_id = detection_signature(input_43=tf.cast(image, tf.float32))
class_id = class_id["dense_41"]
coordinates = location_signature(input_45=tf.cast(image, tf.float32))
coordinates = coordinates["dense_43"]
print("Time taken: ", time.time() - start)

point = coordinates[0] * [shape[1], shape[0], shape[1], shape[0]]

print(class_id[0][0])
print(point)

if class_id[0][0] > 0:
    plt.gca().add_patch(Rectangle((point[0],point[1]), point[2]-point[0], point[3]-point[1], 
                                  edgecolor="green", facecolor="none", lw=3))