# 1. Setup and Get Data

### 1.1 Install Dependencies and Setup

In [None]:
%pip install --user labelme tensorflow tensorflow-gpu opencv-python matplotlib albumentations split-folders imutils scikit-learn keras

# 2. Review Dataset and Build Image Loading Function

### 2.1 Import Tensorflow and Dependences

In [None]:
import os
import cv2
import json

import numpy as np
import tensorflow as tf
from matplotlib import pyplot as plt
from scipy.io import loadmat



### 2.2 Limit GPU Memory Growth

In [None]:
# Avoid OOM errors by setting GPU Memory Consumption Growth
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus: 
    tf.config.experimental.set_memory_growth(gpu, True)

In [None]:
tf.config.list_physical_devices('GPU')

### 2.3 Load Image into TF Data Pipeline

In [None]:
import os
import cv2
import numpy as np
import tensorflow as tf

# Define the path to the dataset
data_path = "data/WIDERFace"
AUTOTUNE = 1000
batch_size = 32

# Define the paths to the train, test, and val sets
train_path = os.path.join(data_path, "WIDER_train/images")
test_path = os.path.join(data_path, "WIDER_test/images")
val_path = os.path.join(data_path, "WIDER_val/images")
labels_path = os.path.join(data_path, "wider_face_split")

# Define the paths to the ground truth text files
train_labels_path = os.path.join(train_path, "wider_face_train_bbx_gt.txt")
test_labels_path = os.path.join(test_path, "wider_face_test_filelist.txt")
val_labels_path = os.path.join(val_path, "wider_face_val_bbx_gt.txt")

# Define a generator function to load images
def load_images(image_paths):
    for image_path in image_paths:
        yield cv2.imread(image_path)

# Define a generator function to load labels
def load_labels(label_file_path):
    with open(label_file_path, "r") as f:
        lines = f.readlines()
        for line in lines:
            # process the line to extract the label information
            label_info = line.strip().split()
            yield label_info

# Load the train, test, and val images using the generator function
train_images = load_images((os.path.join(root, file) for root, dirs, files in os.walk(train_path) for file in files if file.endswith(".jpg")))
test_images = load_images((os.path.join(root, file) for root, dirs, files in os.walk(test_path) for file in files if file.endswith(".jpg")))
val_images = load_images((os.path.join(root, file) for root, dirs, files in os.walk(val_path) for file in files if file.endswith(".jpg")))

# Load the ground truth text files using the generator function
train_labels = load_labels(train_labels_path)
test_labels = load_labels(test_labels_path)
val_labels = load_labels(val_labels_path)

# Use TensorFlow's data pipeline to preprocess the data
train_image_paths = [os.path.join(root, file) for root, dirs, files in os.walk(train_path) for file in files if file.endswith(".jpg")]
train_data = tf.data.Dataset.from_generator(load_images, args=[train_image_paths], output_types=tf.uint8, output_shapes=tf.TensorShape([None, None, 3]))
train_data = train_data.map(lambda x: tf.image.convert_image_dtype(x, tf.float32))
train_labels = tf.data.Dataset.from_generator(load_labels, args=[train_labels_path], output_types=tf.float32)
train_dataset = tf.data.Dataset.zip((train_data, train_labels))
train_dataset = train_dataset.batch(batch_size).repeat().prefetch(buffer_size=AUTOTUNE)
steps_per_epoch = len(list((os.path.join(root, file) for root, dirs, files in os.walk(train_path) for file in files if file.endswith(".jpg")))) // batch_size

# Load and preprocess test data images
test_image_paths = [os.path.join(root, file) for root, dirs, files in os.walk(test_path) for file in files if file.endswith(".jpg")]
test_data = tf.data.Dataset.from_generator(load_images, args=[test_image_paths], output_types=tf.uint8, output_shapes=tf.TensorShape([None, None, 3]))
test_data = test_data.map(lambda x: tf.image.convert_image_dtype(x, tf.float32))

# Load test labels
test_labels = tf.data.Dataset.from_generator(load_labels, args=[test_labels_path], output_types=tf.float32)

# Concatenate test data and labels
test_dataset = tf.data.Dataset.concatenate(test_data, test_labels)




# Use TensorFlow's data pipeline to preprocess the data
val_image_paths = [os.path.join(root, file) for root, dirs, files in os.walk(val_path) for file in files if file.endswith(".jpg")]
val_data = tf.data.Dataset.from_generator(load_images, args=[val_image_paths], output_types=tf.uint8, output_shapes=tf.TensorShape([None, None, 3]))
val_data = val_data.map(lambda x: tf.image.convert_image_dtype(x, tf.float32))
val_labels = tf.data.Dataset.from_generator(load_labels, args=[val_labels_path], output_types=tf.float32)
val_dataset = tf.data.Dataset.zip((val_data, val_labels))
val_dataset = val_dataset.batch(batch_size).repeat().prefetch(buffer_size=AUTOTUNE)
steps_per_epoch = len(list((os.path.join(root, file) for root, dirs, files in os.walk(train_path) for file in files if file.endswith(".jpg")))) // batch_size




In [None]:
train_dir = 'path/to/train/directory'
test_dir = 'path/to/test/directory'
img_height = 224
img_width = 224
batch_size = 32

# Create image data generator for preprocessing
train_datagen = tf.keras.preprocessing.image.ImageDataGenerator(
    rescale=1./255,
    rotation_range=20,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    fill_mode='nearest'
)

test_datagen = tf.keras.preprocessing.image.ImageDataGenerator(rescale=1./255)

# Load the images and labels using the flow_from_directory method
train_generator = train_datagen.flow_from_directory(
    directory=train_dir,
    target_size=(img_height, img_width),
    batch_size=batch_size,
    class_mode='categorical',
    shuffle=True
)

test_generator = test_datagen.flow_from_directory(
    directory=test_dir,
    target_size=(img_height, img_width),
    batch_size=batch_size,
    class_mode='categorical',
    shuffle=False
)

In [None]:
def load_image(x): 
    byte_img = tf.io.read_file(x)
    img = tf.io.decode_jpeg(byte_img)
    return img

In [None]:
train_images = train_images.map(load_image)

In [None]:
train_images.as_numpy_iterator().next()

In [None]:
type(train_images)

### 2.4 View Raw Images with Matplotlib

In [None]:
image_generator = train_images.batch(4).as_numpy_iterator()

In [None]:
plot_images = next(image_generator)

In [None]:
fig, ax = plt.subplots(ncols=4, figsize=(20,20))
for idx, image in enumerate(plot_images):
    ax[idx].imshow(image) 
plt.show()

# 3. Partition Unargumented Data

### 3.1 Split data into training and testing and valid data

In [None]:
import splitfolders
splitfolders.ratio('./data/img', output="./data/ttvimg", seed=1337, ratio=(.8, 0.1,0.1))

### 3.2 Move the Matching Labels

In [None]:
for folder in ['train','test','val']:
    for file in os.listdir(os.path.join('data', folder, 'images')):
        
        filename = file.split('.')[0]+'.json'
        existing_filepath = os.path.join('data','labels', filename)
        if os.path.exists(existing_filepath): 
            new_filepath = os.path.join('data',folder,'labels',filename)
            os.replace(existing_filepath, new_filepath)      

# 4. Setup Albumentations

### 4.1 Setup Albumentations Transform Pipeline

In [None]:
from tensorflow.keras.preprocessing.image import ImageDataGenerator

# Create an instance of the ImageDataGenerator class with desired augmentation options
train_datagen = ImageDataGenerator(
    rescale=1./255,
    rotation_range=20,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    fill_mode='nearest')

# Set up the generator to read images from the train directory and apply the augmentation
train_generator = train_datagen.flow_from_directory(
    train_path,
    target_size=(224, 224),
    batch_size=batch_size,
    class_mode='binary')

# Set up a separate generator for the validation data without augmentation
val_datagen = ImageDataGenerator(rescale=1./255)
val_generator = val_datagen.flow_from_directory(
    val_path,
    target_size=(224, 224),
    batch_size=batch_size,
    class_mode='binary')


### 4.2 Load a Test Image and Annotation with OpenCV and JSON

In [None]:
testImage = cv2.imread(os.path.join('data', 'WIDERFace', 'train','0--Parade','0_Parade_marchingband_1_5.jpg'))

In [None]:
with open(os.path.join('data', 'train', 'labels', '1.json'), 'r') as f:
    label = json.load(f)

In [None]:
label['shapes'][1]['points']

### 4.3 Extract Coordinates and Rescale to Match Image Resolution

In [None]:
coords = [0,0,0,0]
coords[0] = label['shapes'][1]['points'][0][0]
coords[1] = label['shapes'][1]['points'][0][1]
coords[2] = label['shapes'][1]['points'][1][0]
coords[3] = label['shapes'][1]['points'][1][1]

In [None]:
coords

In [None]:
coords = list(np.divide(coords, [1920,1080,1920,1080]))

In [None]:
coords

### 4.4 Apply Augmentations and View Results

In [None]:
augmented = augmentor(image=testImage, bboxes=[coords], class_labels=['face'])

In [None]:
augmented['bboxes'][0][2:]

In [None]:
augmented['bboxes']

In [None]:
cv2.rectangle(augmented['image'], 
              tuple(np.multiply(augmented['bboxes'][0][:2], [1280, 720]).astype(int)),
              tuple(np.multiply(augmented['bboxes'][0][2:], [1280, 720]).astype(int)), 
                    (255,0,0), 2)

plt.imshow(augmented['image'])

# 5. Build and Run Augmentation Pipeline

### 5.1 Run Augmentation Pipeline

In [None]:
for partition in ['train']: 
    for image in os.listdir(os.path.join('data', partition, 'images')):
        img = cv2.imread(os.path.join('data', partition, 'images', image))

        coords = [0,0,0.00001,0.00001]
        label_path = os.path.join('data', partition, 'labels', f'{image.split(".")[0]}.json')
        if os.path.exists(label_path):
            with open(label_path, 'r') as f:
                label = json.load(f)

            coords[0] = label['shapes'][0]['points'][0][0]
            coords[1] = label['shapes'][0]['points'][0][1]
            coords[2] = label['shapes'][0]['points'][1][0]
            coords[3] = label['shapes'][0]['points'][1][1]
            coords = list(np.divide(coords, [1920,1080,1920,1080]))

        try: 
            for x in range(60):
                augmented = augmentor(image=img, bboxes=[coords], class_labels=['face'])
                cv2.imwrite(os.path.join('augmented-data', partition, 'images', f'{image.split(".")[0]}.{x}.jpg'), augmented['image'])

                annotation = {}
                annotation['image'] = image

                if os.path.exists(label_path):
                    if len(augmented['bboxes']) == 0: 
                        annotation['bbox'] = [0,0,0,0]
                        annotation['class'] = 0 
                    else: 
                        annotation['bbox'] = augmented['bboxes'][0]
                        annotation['class'] = 1
                else: 
                    annotation['bbox'] = [0,0,0,0]
                    annotation['class'] = 0 


                with open(os.path.join('augmented-data', partition, 'labels', f'{image.split(".")[0]}.{x}.json'), 'w') as f:
                    json.dump(annotation, f)

        except Exception as e:
            print(e)

### 5.2 Load Augmented Images to Tensorflow Dataset

In [None]:
train_images = tf.data.Dataset.list_files('augmented-data\\train\\images\\*.jpg', shuffle=False)
train_images = train_images.map(load_image)
train_images = train_images.map(lambda x: tf.image.resize(x, (240,240)))
train_images = train_images.map(lambda x: x/255)

In [None]:
test_images = tf.data.Dataset.list_files('augmented-data\\test\\images\\*.jpg', shuffle=False)
test_images = test_images.map(load_image)
test_images = test_images.map(lambda x: tf.image.resize(x, (240,240)))
test_images = test_images.map(lambda x: x/255)

In [None]:
val_images = tf.data.Dataset.list_files('augmented-data\\val\\images\\*.jpg', shuffle=False)
val_images = val_images.map(load_image)
val_images = val_images.map(lambda x: tf.image.resize(x, (240,240)))
val_images = val_images.map(lambda x: x/255)

In [None]:
train_images.as_numpy_iterator().next()

# 6. Prepare Labels

### 6.1 Build Label Loading Function

In [None]:
def load_labels(label_path):
    with open(label_path.numpy(), 'r', encoding = "utf-8") as f:
        label = json.load(f)
        
    return [label['class']], label['bbox']

### 5.2 Load Labels to Tensorflow Dataset

In [None]:
train_labels = tf.data.Dataset.list_files('augmented-data\\train\\labels\\*.json', shuffle=False)
train_labels = train_labels.map(lambda x: tf.py_function(load_labels, [x], [tf.uint8, tf.float16]))

# Load the annotations file
train_annotations_path = 'data\\wider_face_split\\wider_face_train_bbx_gt.txt'
train_annotations = []
with open(train_annotations_path, 'r') as file:
    for line in file:
        train_annotations.append(list(map(int, line.strip().split())))

In [None]:
# Load the annotations file
train_annotations_path = 'data\\wider_face_split\\wider_face_train_bbx_gt.txt'
train_annotations = []
with open(train_annotations_path, 'r') as file:
    for line in file:
        train_annotations.append(list(map(int, line.strip().split())))

In [None]:
# Load the annotations file
val_annotations_path = 'data\\wider_face_split\\wider_face_train_bbx_gt.txt'
val_annotations = []
with open(val_annotations_path, 'r') as file:
    for line in file:
        val_annotations.append(list(map(int, line.strip().split())))

In [None]:
train_annotations.as_numpy_iterator().next()

# 6. Combine Labels and Image Samples

### 6.1 Check Partition Length

In [None]:
len(train_images), len(test_images), len(val_images), len(train_annotations), len(test_annotations), len(val_annotations)

### 6.2 Final Datasets (Images/Labels)

In [None]:
train = tf.data.Dataset.zip((train_images, train_labels))
train = train.shuffle(5000)
train = train.batch(8)
train = train.prefetch(4)

In [None]:
test = tf.data.Dataset.zip((test_images, test_labels))
test = test.shuffle(1300)
test = test.batch(8)
test = test.prefetch(4)

In [None]:
val = tf.data.Dataset.zip((val_images, val_labels))
val = val.shuffle(1000)
val = val.batch(8)
val = val.prefetch(4)

In [None]:
train.as_numpy_iterator().next()

### 6.3 View Images and Annotations

In [None]:
data_samples = train.as_numpy_iterator()

In [None]:
res = data_samples.next()

In [None]:
fig, ax = plt.subplots(ncols=4, figsize=(40,40))
for idx in range(4): 
    sample_image = res[0][idx]
    sample_coords = res[1][1][idx]
    
    cv2.rectangle(sample_image, 
                  tuple(np.multiply(sample_coords[:2], [240,240]).astype(int)),
                  tuple(np.multiply(sample_coords[2:], [240,240]).astype(int)), 
                        (255,0,0), 2)

    ax[idx].imshow(sample_image)

# 7 Build Deep Learning Model using the Functional API

### 8.1 Import Layers and Base Network

In [None]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, Dense, GlobalMaxPooling2D
from tensorflow.keras.applications import VGG16

### 7.2 Download VGG16

In [None]:
vgg = VGG16(include_top=False)

In [None]:
vgg.summary()

### 7.3 Build Instance of Network

In [None]:
def build_model(): 
    input_layer = Input(shape=(240,240,3))
    
    vgg = VGG16(include_top=False)(input_layer)

    # Classification Model  
    f1 = GlobalMaxPooling2D()(vgg)
    class1 = Dense(2048, activation='relu')(f1)
    class2 = Dense(1, activation='sigmoid')(class1)
    
    # Bounding box model
    f2 = GlobalMaxPooling2D()(vgg)
    regress1 = Dense(2048, activation='relu')(f2)
    regress2 = Dense(4, activation='sigmoid')(regress1)
    
    facedetector = Model(inputs=input_layer, outputs=[class2, regress2])
    return facedetector

### 7.4 Test out Neural Network

In [None]:
facedetector = build_model()

In [None]:
facedetector.summary()

In [None]:
X, y = train.as_numpy_iterator().next()

In [None]:
X.shape

In [None]:
classes, coords = facedetector.predict(X)

In [None]:
classes, coords

# 8 Define Losses and Optimisers

### 8.1 Define Optimiser and LR

In [None]:
batches_per_epoch = len(train)
lr_decay = (1./0.75-1)/batches_per_epoch

In [None]:
opt = tf.keras.optimizers.Adam(learning_rate=0.0001, decay=lr_decay)

### 8.2 Create Localisation Loss and Classification Loss

In [None]:
def localisation_loss(y_true, y_pred):
    delta_coord = tf.reduce_sum(tf.square(y_true[:,:2] - y_pred[:,:2]))

    h_true = y_true[:,3] - y_true[:,1]
    w_true = y_true[:,2] - y_true[:,0]

    h_pred = y_pred[:,3] - y_pred[:,1]
    w_pred = y_pred[:,2] - y_pred[:,0]

    delta_size = tf.reduce_sum(tf.square(w_true - w_pred) + tf.square(h_true - h_pred))

    return delta_coord + delta_size

In [None]:
classloss = tf.keras.losses.BinaryCrossentropy()
regressloss = localisation_loss

### 7.3 Test out Loss Metrics

In [None]:
localisation_loss(y[1], coords)

In [None]:
classloss(y[0], classes)

In [None]:
regressloss(y[1], coords)

# 8 Train Neural Network

### 9.1 Create Custom Model Class

In [None]:
class FaceTracker(Model):
    def __init__(self, facecctv, **kwargs):
        super().__init__(**kwargs)
        self.model = facecctv

    def compile(self, classloss, localisation_loss, opt, **kwargs):
        super().compile(**kwargs)
        self.clsloss = classloss
        self.localloss = localisation_loss
        self.opt = opt

    def train_step(self, batch, **kwargs):

        X, y = batch

        with tf.GradientTape() as tape:
            classes, coords = self.model(X, training=True)

            batch_class_loss = self.clsloss(y[0], classes)
            batch_localisation_loss = self.localloss(tf.cast(y[1], tf.float32), coords)

            total_loss = (0.5*batch_class_loss) + batch_localisation_loss

            gradient = tape.gradient(total_loss, self.model.trainable_variables)
        
        self.opt.apply_gradients(zip(gradient, self.model.trainable_variables))

        return {"total_loss":total_loss, "class_loss":batch_class_loss, "regress_loss":batch_localisation_loss}

    def test_step(self, batch, **kwargs):
        X, y = batch

        classes, coords = self.model(X, training=False)

        batch_class_loss = self.clsloss(y[0], classes)
        batch_localisation_loss = self.localloss(tf.cast(y[1], tf.float32), coords)
        total_loss = (0.5*batch_class_loss) + batch_localisation_loss

        return {"total_loss":total_loss, "class_loss":batch_class_loss, "regress_loss":batch_localisation_loss}

    def call(self, X, **kwargs): 
        return self.model(X, **kwargs)

In [None]:
model = FaceTracker(facedetector)

In [None]:
model.compile(classloss, localisation_loss, opt)

### 9.2 Train

In [None]:
logdir='logs'

In [None]:
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=logdir)

In [None]:
hist = model.fit(train, epochs=10, validation_data=val, callbacks=[tensorboard_callback])

### 9.3 Plot Performance

In [None]:
hist.history

In [None]:
fig, ax = plt.subplots(ncols=3, figsize=(20,5))

ax[0].plot(hist.history['total_loss'], color='teal', label='loss')
ax[0].plot(hist.history['val_total_loss'], color='orange', label='val loss')
ax[0].title.set_text('Loss')
ax[0].legend()

ax[1].plot(hist.history['class_loss'], color='teal', label='class loss')
ax[1].plot(hist.history['val_class_loss'], color='orange', label='val class loss')
ax[1].title.set_text('Classification Loss')
ax[1].legend()

ax[2].plot(hist.history['regress_loss'], color='teal', label='regress loss')
ax[2].plot(hist.history['val_regress_loss'], color='orange', label='val regress loss')
ax[2].title.set_text('Regression Loss')
ax[2].legend()

plt.show()

# 10. Make Predictions

### 10.1 Make Predictions 

In [None]:
test_data = test.as_numpy_iterator()

In [None]:
test_sample = test_data.next()

In [None]:
yhat = facedetector.predict(test_sample[0])

In [None]:
fig, ax = plt.subplots(ncols=4, figsize=(40,40))
for idx in range(4): 
    sample_image = test_sample[0][idx]
    sample_coords = yhat[1][idx]
    
    if yhat[0][idx] > 0.9:
        cv2.rectangle(sample_image, 
                      tuple(np.multiply(sample_coords[:2], [240,240]).astype(int)),
                      tuple(np.multiply(sample_coords[2:], [240,240]).astype(int)), 
                            (255,0,0), 2)
    
    ax[idx].imshow(sample_image)

### 10.2 Save the Model

In [None]:
from keras.models import load_model

In [None]:
facedetector.save('facecctv.h5')

# 12 Testing

### 12.1 Loading Models

In [None]:
facedetector = load_model('./models/facetracker.h5')
facecctv = load_model('facecctv.h5')

### 12.3 Testing using dataset static images

In [None]:
image = cv2.imread('data\\footage\\0.jpg')
plt.imshow(image)

rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
resized = tf.image.resize(rgb, (120, 120))

yhat = facedetector.predict(np.expand_dims(resized/255,0))
sample_coords = yhat[1][0]

if yhat[0] > 0.5: 
    # Controls the main rectangle
    cv2.rectangle(image, 
                    tuple(np.multiply(sample_coords[:2], [1280, 720]).astype(int)),
                    tuple(np.multiply(sample_coords[2:], [1280, 720]).astype(int)), 
                        (255,0,0), 2)
    # Controls the label rectangle
    cv2.rectangle(image, 
                    tuple(np.add(np.multiply(sample_coords[:2], [1280, 720]).astype(int), 
                                [0,-30])),
                    tuple(np.add(np.multiply(sample_coords[:2], [1280, 720]).astype(int),
                                [80,0])), 
                        (255,0,0), -1)
    
    # Controls the text rendered
    cv2.putText(image, 'face', tuple(np.add(np.multiply(sample_coords[:2], [1280, 720]).astype(int),
                                            [0,-5])),
                cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2, cv2.LINE_AA)

cv2.imshow('EyeTrack', image)



In [None]:
import os
import tensorflow as tf
from tensorflow import keras
import numpy as np
import cv2

# load the saved model
model = keras.models.load_model("facecctv.h5")

# load an image into memory
image = cv2.imread("data\\WIDERFace\\WIDER_test\\images\\19--Couple\\19_Couple_Couple_19_2.jpg")

rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
resized = tf.image.resize(rgb, (240, 240))

prediction = model.predict(np.expand_dims(resized/255,0))
coords = prediction[0]

# interpret the prediction
if coords[0] > 0.5:
    # extract the bounding box coordinates
    x1, y1, x2, y2 = coords[1:5]
    
    # scale the coordinates to the original image size
    h, w, _ = image.shape
    x1, y1, x2, y2 = x1 * w, y1 * h, x2 * w, y2 * h
    
    # draw the bounding box
    cv2.rectangle(image, (int(x1), int(y1)), (int(x2), int(y2)), (255, 0, 0), 2)
    
    # display the image with the bounding box
    cv2.imshow('Image', image)
    cv2.waitKey(0)
    cv2.destroyAllWindows()
    
    print("Face detected")
else:
    print("No face detected")
