# 1. Import datasets

## 1.1 Import libraries and define paths

In [None]:
import os
import json
import tensorflow as tf

# Define the base directory for datasets
BASE_DATASET_PATH = "data/datasets"

# Dataset types and dataset sources
DATASET_TYPES = ["train", "test", "val"]
DATASET_SOURCES = ["webcam", "seccam"]

# Function to generate dataset paths dynamically
def get_dataset_paths(dataset_source, dataset_type):
    base_path = os.path.join(BASE_DATASET_PATH, dataset_source, dataset_type)
    return {
        "images": os.path.join(base_path, "images", "*.jpg"),
        "labels": os.path.join(base_path, "labels", "*.json")
    }

## 1.2 Define image and label loading function

In [2]:
def load_image(image_path):
    """
    Reads and decodes an image from a given file path.
    
    Args:
        image_path (tf.Tensor): The file path of the image.
    
    Returns:
        tf.Tensor: A decoded and normalized image tensor.
    """
    image = tf.io.read_file(image_path)  # Read the image file
    image = tf.image.decode_jpeg(image, channels=3)  # Decode the image (ensure 3 color channels)
    image = tf.image.convert_image_dtype(image, tf.float32)  # Normalize pixel values to [0,1]
    return image

def load_labels(label_path):
    try:
        with open(label_path.numpy(), 'r', encoding="utf-8") as f:
            label = json.load(f)
        return [label['class']], label['bbox']
    except FileNotFoundError:
        print(f"File not found: {label_path}")
        return [], []
    except json.JSONDecodeError:
        print(f"Error decoding JSON from file {label_path}")
        return [], []
    except Exception as e:
        print(f"Error loading label file {label_path}: {e}")
        return [], []

## 1.3 Create dataset pielines for labels

In [3]:
def create_datasets(dataset_source):
    datasets = {}

    for dataset_type in DATASET_TYPES:
        paths = get_dataset_paths(dataset_source, dataset_type)

        # Load image dataset
        images = tf.data.Dataset.list_files(paths["images"], shuffle=False)
        images = images.map(load_image, num_parallel_calls=tf.data.AUTOTUNE)

        # Load label dataset
        labels = tf.data.Dataset.list_files(paths["labels"], shuffle=False)
        labels = labels.map(lambda x: tf.py_function(load_labels, [x], [tf.uint8, tf.float16]),
                            num_parallel_calls=tf.data.AUTOTUNE)

        # Zip images and labels
        dataset = tf.data.Dataset.zip((images, labels))
        dataset = dataset.shuffle(5000 if dataset_type == "train" else 1000).batch(8).prefetch(tf.data.AUTOTUNE)

        datasets[dataset_type] = dataset

    return datasets


## 1.4 Combine image and label datasets to prepare the final datasets

In [4]:
# Create datasets for specified sources
webcam_datasets = create_datasets("webcam")

# Access datasets like:
train = webcam_datasets["train"]
test = webcam_datasets["test"]
val = webcam_datasets["val"]

InvalidArgumentError: Expected 'tf.Tensor(False, shape=(), dtype=bool)' to be true. Summarized data: b'No files matched pattern: datasets/augmented\\webcam\\train\\images\\*.jpg'

# 8. Build Deep Learning using the Functional API

### 8.1 Import Layers and Base Network

In [2]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, Dense, GlobalMaxPooling2D
from tensorflow.keras.applications import VGG16

### 8.2 Download VGG16

In [3]:
vgg = VGG16(include_top=False)

In [5]:
vgg.summary()

Model: "vgg16"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_1 (InputLayer)        [(None, None, None, 3)]   0         
                                                                 
 block1_conv1 (Conv2D)       (None, None, None, 64)    1792      
                                                                 
 block1_conv2 (Conv2D)       (None, None, None, 64)    36928     
                                                                 
 block1_pool (MaxPooling2D)  (None, None, None, 64)    0         
                                                                 
 block2_conv1 (Conv2D)       (None, None, None, 128)   73856     
                                                                 
 block2_conv2 (Conv2D)       (None, None, None, 128)   147584    
                                                                 
 block2_pool (MaxPooling2D)  (None, None, None, 128)   0     

### 8.3 Build instance of Network

In [6]:
def build_model(): 
    input_layer = Input(shape=(120,120,3))
    
    vgg = VGG16(include_top=False)(input_layer)

    # Classification Model  
    f1 = GlobalMaxPooling2D()(vgg)
    class1 = Dense(2048, activation='relu')(f1)
    class2 = Dense(1, activation='sigmoid')(class1)
    
    # Bounding box model
    f2 = GlobalMaxPooling2D()(vgg)
    regress1 = Dense(2048, activation='relu')(f2)
    regress2 = Dense(4, activation='sigmoid')(regress1)
    
    facetracker = Model(inputs=input_layer, outputs=[class2, regress2])
    return facetracker

### 8.4 Test out Neural Network

In [7]:
facetracker = build_model()

In [8]:
facetracker.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_2 (InputLayer)           [(None, 120, 120, 3  0           []                               
                                )]                                                                
                                                                                                  
 vgg16 (Functional)             (None, None, None,   14714688    ['input_2[0][0]']                
                                512)                                                              
                                                                                                  
 global_max_pooling2d (GlobalMa  (None, 512)         0           ['vgg16[0][0]']                  
 xPooling2D)                                                                                  

In [None]:
X, y = train.as_numpy_iterator().next()

NameError: name 'train' is not defined

In [None]:
X.shape

NameError: name 'X' is not defined

In [None]:
classes, coords = facetracker.predict(X)

In [None]:
classes, coords

# 9. Define Losses and Optimizers

### 9.1 Define Optimizer and LR

In [None]:
batches_per_epoch = len(train)
lr_decay = (1./0.75 -1)/batches_per_epoch

In [None]:
opt = tf.keras.optimizers.Adam(learning_rate=0.0001, decay=lr_decay)

### 9.2 Create Localization Loss and Classification Loss

In [None]:
def localization_loss(y_true, yhat):            
    delta_coord = tf.reduce_sum(tf.square(y_true[:,:2] - yhat[:,:2]))
                  
    h_true = y_true[:,3] - y_true[:,1] 
    w_true = y_true[:,2] - y_true[:,0] 

    h_pred = yhat[:,3] - yhat[:,1] 
    w_pred = yhat[:,2] - yhat[:,0] 
    
    delta_size = tf.reduce_sum(tf.square(w_true - w_pred) + tf.square(h_true-h_pred))
    
    return delta_coord + delta_size

In [None]:
classloss = tf.keras.losses.BinaryCrossentropy()
regressloss = localization_loss

### 9.3 Test out Loss Metrics

In [None]:
localization_loss(y[1], coords)

In [None]:
classloss(y[0], classes)

In [None]:
regressloss(y[1], coords)

# 10. Train Neural Network

### 10.1 Create Custom Model Class

In [None]:
class FaceTracker(Model): 
    def __init__(self, eyetracker,  **kwargs): 
        super().__init__(**kwargs)
        self.model = eyetracker

    def compile(self, opt, classloss, localizationloss, **kwargs):
        super().compile(**kwargs)
        self.closs = classloss
        self.lloss = localizationloss
        self.opt = opt
    
    def train_step(self, batch, **kwargs): 
        
        X, y = batch
        
        with tf.GradientTape() as tape: 
            classes, coords = self.model(X, training=True)
            
            batch_classloss = self.closs(y[0], classes)
            batch_localizationloss = self.lloss(tf.cast(y[1], tf.float32), coords)
            
            total_loss = batch_localizationloss+0.5*batch_classloss
            
            grad = tape.gradient(total_loss, self.model.trainable_variables)
        
        opt.apply_gradients(zip(grad, self.model.trainable_variables))
        
        return {"total_loss":total_loss, "class_loss":batch_classloss, "regress_loss":batch_localizationloss}
    
    def test_step(self, batch, **kwargs): 
        X, y = batch
        
        classes, coords = self.model(X, training=False)
        
        batch_classloss = self.closs(y[0], classes)
        batch_localizationloss = self.lloss(tf.cast(y[1], tf.float32), coords)
        total_loss = batch_localizationloss+0.5*batch_classloss
        
        return {"total_loss":total_loss, "class_loss":batch_classloss, "regress_loss":batch_localizationloss}
        
    def call(self, X, **kwargs): 
        return self.model(X, **kwargs)

In [None]:
model = FaceTracker(facetracker)

In [None]:
model.compile(opt, classloss, regressloss)

### 10.2 Train

In [None]:
logdir='logs'

In [None]:
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=logdir)

In [None]:
hist = model.fit(train, epochs=10, validation_data=val, callbacks=[tensorboard_callback])

### 10.3 Plot Performance

In [None]:
hist.history

In [None]:
from matplotlib import pyplot as plt

fig, ax = plt.subplots(ncols=3, figsize=(20,5))

ax[0].plot(hist.history['total_loss'], color='teal', label='loss')
ax[0].plot(hist.history['val_total_loss'], color='orange', label='val loss')
ax[0].title.set_text('Loss')
ax[0].legend()

ax[1].plot(hist.history['class_loss'], color='teal', label='class loss')
ax[1].plot(hist.history['val_class_loss'], color='orange', label='val class loss')
ax[1].title.set_text('Classification Loss')
ax[1].legend()

ax[2].plot(hist.history['regress_loss'], color='teal', label='regress loss')
ax[2].plot(hist.history['val_regress_loss'], color='orange', label='val regress loss')
ax[2].title.set_text('Regression Loss')
ax[2].legend()

plt.show()

# 11. Make Predictions

### 11.1 Make Predictions on Test Set

In [None]:
test_data = test.as_numpy_iterator()

In [None]:
test_sample = test_data.next()

In [None]:
yhat = facetracker.predict(test_sample[0])

In [None]:
import cv2
import numpy as np
import matplotlib.pyplot as plt

fig, ax = plt.subplots(ncols=4, figsize=(20,20))
for idx in range(4): 
    sample_image = test_sample[0][idx]
    sample_coords = yhat[1][idx]
    
    if yhat[0][idx] > 0.9:
        cv2.rectangle(sample_image, 
                      tuple(np.multiply(sample_coords[:2], [120,120]).astype(int)),
                      tuple(np.multiply(sample_coords[2:], [120,120]).astype(int)), 
                            (255,0,0), 2)
    
    ax[idx].imshow(sample_image)

### 11.2 Save the Model

In [None]:
facetracker.save('custom_model.h5')