# **1. Installing important dependencies**


In [None]:
! pip3 install opencv-python matplotlib labelme albumentations tensorflow tensorflow-metal

In [None]:
import time
import os
import uuid
import cv2
import labelme
import tensorflow as tf
from matplotlib import pyplot as plt
import json
import shutil
import numpy as np
import albumentations as alb
from tensorflow import keras
from keras.applications import VGG16
from keras.layers import Input, Conv2D, Dense, GlobalMaxPool2D
from keras.models import Model
from keras.regularizers import l1, l2, l1_l2

# **2. Gather and label images**


### **2.1 Collecting images using OpenCV**

In [None]:
path = os.path.join("data", "images")
imgs_number = 30

cam = cv2.VideoCapture(1)
for i in range(imgs_number):
    print("Capturing image {}".format(i))
    ret, frame = cam.read()
    image_name = os.path.join(path, f"{str(uuid.uuid1())}.jpg")
    cv2.imwrite(image_name, frame)
    cv2.imshow("Frame", frame)
    time.sleep(0.5)

    if cv2.waitKey(1) & 0xFF == ord("q"):
        break
cam.release()
cv2.destroyAllWindows()

### **2.2 Labelling images using LabelMe**

In [None]:
!labelme

# **3. Review Dataset & Build Image Loading Function**

### **3.1 Loading images into TensorFlow Pipeline**

In [None]:
import tensorflow as tf
from matplotlib import pyplot as plt
import json
import numpy as np

In [None]:
# Loading images from the "data" directory by including the path of all images
images = tf.data.Dataset.list_files('data/images/*.jpg', shuffle=False)

# images.as_numpy_iterator().next() <---------- Testing if the images are loaded

# This function will take in the path of the images, turn them into bytes and decode them.
def load_images(x):
    byte_img = tf.io.read_file(x)
    img = tf.io.decode_jpeg(byte_img)
    return img

# Load the images in the function using "map"
# map: This transformation applies map_func to each element of this dataset, and returns a new dataset containing the transformed elements
images = images.map(load_images)

In [None]:
images.as_numpy_iterator().next()

In [None]:
type(images)

### **3.2 View raw images using MatPlotLib**

In [None]:
# Put the images in a batch of 4
image_generator = images.batch(4).as_numpy_iterator()

In [None]:
# Take the next batch
plot_images = image_generator.next()

# Visualize the images
fig, ax = plt.subplots(ncols=4, figsize=(20,20))
for idx, image in enumerate(plot_images):
    ax[idx].imshow(image)
plt.show()

# **4. Partition Unaugmented Data**

### **4.1 Split the data into Train, Test, and Validation**

In [None]:
import os
from sklearn.model_selection import train_test_split
import shutil
import random
import cv2

In [None]:
# Create directories "Train, Test, Val" with their subdirectories "Images, and Labels"
Folders = ['Train', 'Test', 'Val']
path = 'path/to/data'
for i in Folders:
    folders_path = os.path.join(path, i)
    if not os.path.exists(folders_path):
        os.mkdir(folders_path)
    if not os.path.exists(os.path.join(folders_path, "images")):
        os.mkdir(os.path.join(folders_path, "images"))
    if not os.path.exists(os.path.join(folders_path, "labels")):
        os.mkdir(os.path.join(folders_path, "labels"))

In [None]:
# Split images and labels into the different directories
image_list, label_list = [], []
train_perc, test_perc, val_perc = 0.7, 0.15, 0.15
image_path = 'path/to/data/images'
label_path = 'path/to/data/labels'

if os.path.exists(image_path) and os.path.isdir(image_path):
    for filename in os.listdir(image_path):
        if filename.endswith('.jpg'):
            image_list.append(os.path.join(image_path, filename))

if os.path.exists(label_path) and os.path.isdir(label_path):
    for filename in os.listdir(label_path):
        label_list.append(os.path.join(label_path, filename))



In [None]:
total_images, total_labels = len(image_list), len(label_list)
train_imgs_num, train_labels_num = int(total_images * 0.7), int(total_labels * 0.7)
test_imgs_num, test_labels_num = int(total_images * 0.15), int(total_labels * 0.15)
val_imgs_num, val_labels_num = int(total_images * 0.15), int(total_labels * 0.15)

In [None]:
random.seed(42)
random.shuffle(image_list)
random.shuffle(label_list)


In [None]:
train_imgs = image_list[:train_imgs_num]
test_imgs = image_list[:test_imgs_num]
val_imgs = image_list[:val_imgs_num]

train_labels = label_list[:train_labels_num]
test_labels = label_list[:test_labels_num]
val_labels = label_list[:val_labels_num]

In [None]:
def move_images(images, source_folder, destination_folder):
    for image in images:
        shutil.move(os.path.join(source_folder, image), os.path.join(path, destination_folder, 'images', image))

def move_labels(labels, source_folder, destination_folder):
    for label in labels:
        shutil.move(os.path.join(source_folder, label), os.path.join(path, destination_folder, 'labels', label))

In [None]:
move_images(train_imgs, image_path, 'Train')
move_images(test_imgs, image_path, 'Test')
move_images(val_imgs, image_path, 'Val')

move_labels(train_labels, label_path, 'Train')
move_labels(test_labels, label_path, 'Test')
move_labels(val_labels, label_path, 'Val')

# **5. Apply Image Augmentation on Images and Label using Albumentations**

### **5.1 Setup Albumentation Transform Pipeline**

In [None]:
import albumentations as alb
import cv2
import json
import numpy as np

In [None]:
augmentor = alb.Compose([alb.RandomCrop(width=450, height=450),
                         alb.HorizontalFlip(p=0.5),
                         alb.RandomBrightnessContrast(p=0.2),
                         alb.RandomGamma(p=0.2),
                         alb.RGBShift(p=0.2),
                         alb.VerticalFlip(p=0.5)],
                         bbox_params=alb.BboxParams(format='albumentations',
                                                    label_fields=['class_labels']))

### **5.2 Testing pipeline with loaded image using OpenCV and JSON**

In [None]:
img = cv2.imread('path/to/data/Train/images/image.jpg')
img.shape

In [None]:
with open('path/to/data/Train/labels/label.json', 'r') as f:
    label = json.load(f)

In [None]:
label['shapes']

### **5.3 Extract the coordinates and Rescale to match image resolution**

In [None]:
coords = [0, 0, 0, 0]
coords[0] = label['shapes'][0]['points'][0][0]
coords[1] = label['shapes'][0]['points'][0][1]
coords[2] = label['shapes'][0]['points'][1][0]
coords[3] = label['shapes'][0]['points'][1][1]
coords

In [None]:
# Transformation from VOC Pascal to Albnumentation
coords = list(np.divide(coords, [1920, 1080, 1920, 1080]))
coords

### **5.4 Apply Augmentations and See Results**

In [None]:
augmented = augmentor(image=img, bboxes=[coords], class_labels=['face'])

In [None]:
augmented['bboxes']

In [None]:
cv2.rectangle(augmented['image'],
              tuple(np.multiply(augmented['bboxes'][0][:2], [450,450]).astype(int)),
              tuple(np.multiply(augmented['bboxes'][0][2:],[450,450]).astype(int)),
              (255, 0, 0), 2)
plt.imshow(augmented['image'])

# **6. Build and Run Augmentation Pipeline**

### **6.1 Running the pipeline**

In [None]:
folders = ['Train', 'Test', 'Val']
for partition in folders:
    for image in os.listdir(os.path.join('data', partition, 'images')):
        img = cv2.imread(os.path.join('data', partition,'images', image))

        
        label_path = os.path.join('data', partition, 'labels', f'{image.split(".")[0]}.json')
        if os.path.exists(label_path):
            with open(label_path, 'r') as f:
                label = json.load(f)
        
            coords = [0, 0, 0.00001, 0.00001]
            coords[0] = label['shapes'][0]['points'][0][0]
            coords[1] = label['shapes'][0]['points'][0][1]
            coords[2] = label['shapes'][0]['points'][1][0]
            coords[3] = label['shapes'][0]['points'][1][1]
            
            coords = list(np.divide(coords, [1920, 1080, 1920, 1080]))
            if coords[0] > coords[2]:
                coords[0], coords[2] = coords[2], coords[0]


            print(image)
            print(f"Original coordinates: {label['shapes'][0]['points']}")
            print(f"Adjusted coordinates: {coords}")

            try:
                for x in range(60):
                    augmented = augmentor(image=img, bboxes=[coords], class_labels=['face'])
                    cv2.imwrite(os.path.join('aug_data', partition, 'images', f'{image.split(".")[0]}.{x}.jpg'), augmented['image'])

                    annotation = {}
                    annotation['image'] = image

                    if os.path.exists(label_path):
                        if len(augmented['bboxes']) == 0:
                            annotation['bbox'] = [0,0,0,0]
                            annotation['class'] = 0
                        else:
                            annotation['bbox'] = augmented['bboxes'][0]
                            annotation['class'] = 1
                    else:
                        annotation['bbox'] = [0,0,0,0]
                        annotation['class'] = 0

                    with open(os.path.join('aug_data', partition, 'labels', f'{image.split(".")[0]}.{x}.json'), 'w') as f:
                        json.dump(annotation, f)
                
            except Exception as e:
                print(e)

In [None]:
cv2.rectangle(augmented['image'],
              tuple(np.multiply(augmented['bboxes'][0][:2], [450,450]).astype(int)),
              tuple(np.multiply(augmented['bboxes'][0][2:],[450,450]).astype(int)),
              (255, 0, 0), 2)
plt.imshow(augmented['image'])

### **6.2 Load augmented images into TensorFlow Dataset**

In [None]:
def load_images(x):
    byte_img = tf.io.read_file(x)
    img = tf.io.decode_jpeg(byte_img)
    return img

In [None]:
train_images = tf.data.Dataset.list_files('aug_data/Train/images/*.jpg', shuffle=False)
train_images = train_images.map(load_images)
train_images = train_images.map(lambda x: tf.image.resize(x, (120, 120)))
train_images = train_images.map(lambda x: x/255)

In [None]:
test_images = tf.data.Dataset.list_files('aug_data/Test/images/*.jpg', shuffle=False)
test_images = test_images.map(load_images)
test_images = test_images.map(lambda x: tf.image.resize(x, (120, 120)))
test_images = test_images.map(lambda x: x/255)

In [None]:
val_images = tf.data.Dataset.list_files('aug_data/Val/images/*.jpg', shuffle=False)
val_images = val_images.map(load_images)
val_images = val_images.map(lambda x: tf.image.resize(x, (120, 120)))
val_images = val_images.map(lambda x: x/255)

# **7. Prepare Labels**

### **7.1 Build Label Loading Function**

In [None]:
def load_labels(label_path):
    with open(label_path.numpy(), 'r', encoding='utf-8') as f:
        label = json.load(f)

    return label['class'], label['bbox']

### **7.2 Load labels into TensorFlow Function**

In [None]:
train_labels = tf.data.Dataset.list_files('aug_data/Train/labels/*.json', shuffle=False)
train_labels = train_labels.map(lambda x: tf.py_function(load_labels, [x], [tf.uint8, tf.float16]))

In [None]:
test_labels = tf.data.Dataset.list_files('aug_data/Test/labels/*.json', shuffle=False)
test_labels = test_labels.map(lambda x: tf.py_function(load_labels, [x], [tf.uint8, tf.float16]))

In [None]:
val_labels = tf.data.Dataset.list_files('aug_data/Val/labels/*.json', shuffle=False)
val_labels = val_labels.map(lambda x: tf.py_function(load_labels, [x], [tf.uint8, tf.float16]))

# **8. Combine Labels & Images**

### **8.1 Check Partition lengths**

In [None]:
import tensorflow as tf

len(train_images), len(train_labels), len(test_images), len(test_labels), len(val_images), len(val_labels)

### **8.2 Create Final Datasets (Images + Labels)**

In [None]:
train_dataset = tf.data.Dataset.zip((train_images, train_labels))
train_dataset = train_dataset.shuffle(3000)
train_dataset = train_dataset.batch(10)
train_dataset = train_dataset.prefetch(4)

In [None]:
test_dataset = tf.data.Dataset.zip((test_images, test_labels))
test_dataset = test_dataset.shuffle(3000)
test_dataset = test_dataset.batch(10)
test_dataset = test_dataset.prefetch(4)

In [None]:
val_dataset = tf.data.Dataset.zip((val_images, val_labels))
val_dataset = val_dataset.shuffle(3000)
val_dataset = val_dataset.batch(10)
val_dataset = val_dataset.prefetch(4)

In [None]:
train_dataset.as_numpy_iterator().next()[1]

### **8.3 View Images and Annotations**

In [None]:
data_samples = train_dataset.as_numpy_iterator()
res = data_samples.next()

In [None]:
fig, ax = plt.subplots(ncols=4, figsize=(20,20))
for idx in range(4):
    sample_image = res[0][idx]
    sample_coords = res[1][1][idx]

    cv2.rectangle(sample_image,
                  tuple(np.multiply(sample_coords[:2], [120,120]).astype(int)),
                  tuple(np.multiply(sample_coords[2:], [120, 120]).astype(int)),
                        (255,0,0),2)
    ax[idx].imshow(sample_image)

# **9. Building Deep Learning model using the Functional API**

### **9.1 Import Layers and Base Network**

In [None]:
from tensorflow import keras

# This is the base of what all tensorflow models are built from
from keras.models import Model

# These are several layers used
from keras.layers import Input, Conv2D, Dense, GlobalMaxPool2D, Dropout

# Neural network built for image classification
from keras.applications import VGG16

from keras.optimizers import SGD
from keras.models import load_model
from keras.callbacks import LearningRateScheduler

### **9.2 Download VGG16**

In [None]:
# to get rid of these top layers cuz vgg16 is a classification model
weights = 'path/to/vgg16_weights_tf_dim_ordering_tf_kernels_notop.h5'
vgg = VGG16(include_top=False, weights=weights)

### **9.3 Build Instance of Network**

####  *When it comes to building a neural network, you build an instance as a function. Building a neural network includes multiple inputs and either one output or multiple outputs.*

In [None]:
def build_model():

    # Define an input layer with a shape of (width, height, channels)
    input_layer = Input(shape=(120,120,3))

    # Create a VGG16 model with pre-trained weights, excluding the top (fully conntected) layers
    vgg = VGG16(include_top=False, weights=weights)(input_layer)

    # Apply Global max pooling to the output of the VGG16 model
    f1 = GlobalMaxPool2D()(vgg)

    # Add a Dense layer with 2048 neurons and ReLU Activation
    class1 = Dense(2048, activation="relu", kernel_regularizer=l2(0.001))(f1)
    class1 = Dropout(0.5)(class1)
    class2 = Dense(1, activation="sigmoid")(class1)


    # Apply Global max pooling to the output of the VGG16 model
    f2 = GlobalMaxPool2D()(vgg)

    # Add a Desne layer with 2048 neurons and ReLU Activation
    regress1 = Dense(2048, activation="relu", kernel_regularizer=l2(0.001))(f2)
    regress1 = Dropout(0.5)(regress1)
    # Add a Dense layer with 4 neurons and Sigmoid Activation (Regression)
    regress2 = Dense(4, activation="sigmoid")(regress1)

    # Create model with the input and output layers from classification and regression branches
    facetracker = Model(inputs=input_layer, outputs=[class2, regress2])
    return facetracker

### **9.4 Test out Neural Network**

In [None]:
facetracker = build_model()

In [None]:
x, y = train_dataset.as_numpy_iterator().next()

In [None]:
classes, coords = facetracker.predict(x)

In [None]:
y

# **10. Building Loss Function & Optimizer**

#### **The loss function** *(or objective function or cost function) quantifies how well the model is performing on a particular task. It measures the difference between the predicted output and the true target values. During training, the goal is to minimize this loss. In other words, the model aims to find the set of parameters (weights and biases) that result in the smallest possible loss.*

#### **The optimizer** *is an algorithm that adjusts the model's parameters during training to minimize the loss function. It's responsible for updating the weights and biases based on the gradients of the loss with respect to those parameters. Optimizers play a crucial role in the training process. They determine how quickly the model learns, how it responds to gradients, and how it avoids getting stuck in local minima.*

### **10.1 Define Optimizer and LR**

In [None]:
batches_per_epoch = len(train_dataset)
LR_decay = (1/0.75 - 1)/batches_per_epoch

In [None]:
opt = keras.optimizers.legacy.Adam(learning_rate=0.00001)

### **10.2 Creating Localization Loss & Classification Loss**

In [None]:
from keras.losses import MeanSquaredError
def localization_loss(y_true, y_pred):
    # Use Mean Squared Error loss for regression
    mse_loss = MeanSquaredError()(y_true, y_pred)
    return mse_loss

In [None]:
# We are defining the loss funcitons for both classification and regression models

class_loss = tf.keras.losses.BinaryCrossentropy()
regressloss = localization_loss

In [None]:
localization_loss(y[1],coords).numpy()

In [None]:
class_loss(y[0], classes)

# **11. Train Neural Network**

### **10.1 Create Custom Model Class**

In [None]:
class FaceTracker(Model):
    def __init__(self, facetracker, **kwargs):
        super().__init__(**kwargs)
        self.model = facetracker
    
    def compile(self, opt, class_loss, localization_loss, **kwargs):
        super().compile(**kwargs)
        self.closs = class_loss
        self.lloss = localization_loss
        self.opt = opt
    
    def train_step(self, batch, **kwargs):
        X, y = batch

        with tf.GradientTape() as tape:
            classes, coords = self.model(X, training=True)

            batch_classloss = self.closs(y[0], classes)
            batch_localizationloss = self.lloss(tf.cast(y[1], tf.float32), coords)

            total_loss = batch_localizationloss + 0.5 * batch_classloss

            grad = tape.gradient(total_loss, self.model.trainable_variables)
        
        opt.apply_gradients(zip(grad, self.model.trainable_variables))

        return{"total_loss": total_loss, "class_loss": batch_classloss, "regress_loss": batch_localizationloss}

    def test_step(self, batch, **kwargs):
        X, y = batch

        classes, coords = self.model(X, training=True)

        batch_classloss = self.closs(y[0], classes)
        batch_localizationloss = self.lloss(tf.cast(y[1], tf.float32), coords)
        total_loss = batch_localizationloss + 0.5 * batch_classloss

        return{"total_loss": total_loss, "class_loss": batch_classloss, "regress_loss": batch_localizationloss}

    def call(self, X, **kwargs):
        return self.model(X, **kwargs)

In [None]:
model = FaceTracker(facetracker)

In [None]:
model.compile(opt, class_loss, regressloss, metrics=['accuracy'])

### **11.2 Train**

In [None]:
hist = model.fit(train_dataset, epochs=30, validation_data=val_dataset)

In [None]:
fig, ax = plt.subplots(ncols=3, figsize=(20,5))

ax[0].plot(hist.history['total_loss'], color='teal', label='loss')
ax[0].plot(hist.history['val_total_loss'], color='orange', label='val loss')
ax[0].title.set_text('Loss')
ax[0].legend()

ax[1].plot(hist.history['class_loss'], color='teal', label='class loss')
ax[1].plot(hist.history['val_class_loss'], color='orange', label='val class loss')
ax[1].title.set_text('Classification Loss')
ax[1].legend()

ax[2].plot(hist.history['regress_loss'], color='teal', label='regress loss')
ax[2].plot(hist.history['val_regress_loss'], color='orange', label='val regress loss')
ax[2].title.set_text('Regression Loss')
ax[2].legend()

plt.show()

# **12. Make Predictions**

### **12.1 Make Predictions on Test Set**

In [None]:
# facetracker = tf.keras.models.load_model('path/to/facetracker.h5')

In [None]:
test_data = test_dataset.as_numpy_iterator()

In [None]:
test_sample = test_data.next()

In [None]:
yhat = facetracker.predict(test_sample[0])

In [None]:
fig, ax = plt.subplots(ncols=4, figsize=(20,20))
for idx in range(4):
    sample_image = test_sample[0][idx]
    sample_coords = yhat[1][idx]

    if yhat[0][idx] > 0.5:
        cv2.rectangle(sample_image,
                      tuple(np.multiply(sample_coords[:2], [120,120]).astype(int)),
                      tuple(np.multiply(sample_coords[2:], [120,120]).astype(int)),
                      (255, 0, 0), 2)
    
    ax[idx].imshow(sample_image)

### **12.2 Save Model**

In [None]:
facetracker.save('facetracker.h5')

### **12.3 Real Time Prediction**

In [None]:
cam = cv2.VideoCapture(1)
while cam.isOpened():
    ret, frame = cam.read()
    frame2 = frame[50:500, 50:500,:]

    rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    resize = tf.image.resize(rgb, (120,120))

    yhat = facetracker.predict(np.expand_dims(resize/255, 0))
    sample_coords = yhat[1][0]

    if yhat[0] > 0.5:
        # Controls the main rectangle
        cv2.rectangle(frame2,
                      tuple(np.multiply(sample_coords[:2], [450, 450]).astype(int)),
                      tuple(np.multiply(sample_coords[2:], [450, 450]).astype(int)),
                      (255, 0, 0), 2)
        # Controls the label rectangle
        cv2.rectangle(frame2, 
                      tuple(np.add(np.multiply(sample_coords[:2], [450, 450]).astype(int),
                                   [0,-30])),
                      tuple(np.add(np.multiply(sample_coords[:2], [450, 450]).astype(int),
                                   [80, 0])),
                                   (255, 0, 0), -1)
        # Controls the text rendered
        cv2.putText(frame2, 'face', tuple(np.add(np.multiply(sample_coords[:2], [450,450]).astype(int),
                                                [0, -5])),
                    cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
        
    cv2.imshow("Face Tracker", frame)

    key = cv2.waitKey(1) & 0xFF
    if key == ord('q'):
        break

cam.release()
cv2.destroyAllWindows()


## THIS IS FOR TESTING

In [None]:
import cv2
import numpy as np
import tensorflow as tf

# Assuming 'facetracker' is your loaded model
# facetracker = load_model('/path/to/saved_model_directory')

cam = cv2.VideoCapture(1)

while cam.isOpened():
    ret, frame = cam.read()

    # Resize the frame for prediction
    resized_frame = cv2.resize(frame, (120, 120))

    # Preprocess the frame for prediction
    rgb = cv2.cvtColor(resized_frame, cv2.COLOR_BGR2RGB)
    resize = tf.image.resize(rgb, (120, 120))

    # Make predictions
    yhat = facetracker.predict(np.expand_dims(resize / 255, 0))
    sample_coords = yhat[1][0]

    if yhat[0] > 0.5:
        # Scale coordinates to the original frame size
        x1, y1, x2, y2 = np.round(sample_coords * frame.shape[1]).astype(int)

        # Ensure the rectangle stays within the frame bounds
        x1, y1 = max(0, x1), max(0, y1)
        x2, y2 = min(frame.shape[1], x2), min(frame.shape[0], y2)

        # Draw the main rectangle
        cv2.rectangle(frame, (x1, y1), (x2, y2), (255, 0, 0), 2)

        # Controls the label rectangle
        label_rect_height = 30  # Adjust as needed
        cv2.rectangle(frame, (x1, y1 - label_rect_height), (x1 + 80, y1), (255, 0, 0), -1)

        # Controls the text rendered
        cv2.putText(frame, 'face', (x1, y1 - 5),
                    cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)

    cv2.imshow("Face Tracker", frame)

    key = cv2.waitKey(1) & 0xFF
    if key == ord('q'):
        break

cam.release()
cv2.destroyAllWindows()
