In [None]:
# Import necessary libraries
import os
import time
import uuid
import cv2
import tensorflow as tf
import json
import numpy as np
from matplotlib import pyplot as plt
import albumentations as alb
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, Dense, GlobalMaxPooling2D
from tensorflow.keras.applications import VGG16

In [None]:
# Path to the directory containing images
IMAGES_PATH = os.path.join('data', 'images')

# Number of images to process
number_images = 30

In [None]:
# Use the correct camera index
cap = cv2.VideoCapture(0)

# Loop for capturing images
for imgnum in range(number_images):
    print('Collecting image {}'.format(imgnum))

    # Capture a frame from the camera
    ret, frame = cap.read()
    print(f'Ret: {ret}')

    # Check if the frame was captured successfully
    if ret:
        # Generate a unique image name
        imgname = os.path.join(IMAGES_PATH, f'{str(uuid.uuid1())}.jpg')
        
        # Save the captured frame as an image
        cv2.imwrite(imgname, frame)
        
        # Display the captured frame
        cv2.imshow('frame', frame)
        
        # Pause for a short duration
        time.sleep(0.5)
        
        # Check if the 'q' key was pressed to exit
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    else:
        print('Camera capture failed!')
        break

# Release the camera and close the windows
cap.release()
cv2.destroyAllWindows()


In [None]:
# annotate with labelme
!labelme

In [None]:
# Configure GPU memory consumption growth
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)

In [None]:
# List available GPUs
gpu_devices = tf.config.list_physical_devices('GPU')
print('Available GPUs:', gpu_devices)

In [None]:
# Load image file paths using tf.data.Dataset
# List all the image files with the '.jpg' extension in the 'data/images' directory
images = tf.data.Dataset.list_files('data\\images\\*.jpg')

In [None]:
# Convert the next image from the iterator to a numpy array
images.as_numpy_iterator().next()


In [None]:
# Define a function to load an image from a file path
def load_image(x): 
    byte_img = tf.io.read_file(x)
    img = tf.io.decode_jpeg(byte_img)
    return img


In [None]:
# Load and preprocess images using the defined load_image function
images = images.map(load_image)

In [None]:
# Convert the next image from the iterator to a numpy array
images.as_numpy_iterator().next()

In [None]:
# checj image type
type(images)

In [None]:
image_generator = images.batch(4).as_numpy_iterator()

In [None]:
# Create an image generator from the batched images
image_generator = images.batch(4).as_numpy_iterator()

In [None]:
# Create subplots with 4 columns and a larger figure size
fig, ax = plt.subplots(ncols=4, figsize=(20, 20))

# Iterate through the images and their corresponding axes
for idx, image in enumerate(plot_images):
    ax[idx].imshow(image)  # Display the image on the current axis

# Show the plot with all the images
plt.show()


In [None]:
import os
import random
import shutil

# Define paths
images_dir = 'data/images'
train_dir = 'data/train/images'
test_dir = 'data/test/images'
val_dir = 'data/val/images'

# Get a list of image file names in the images directory
image_files = os.listdir(images_dir)

# Shuffle the list of image files randomly
random.shuffle(image_files)

# Calculate the number of images for each partition based on percentages
total_images = len(image_files)
train_count = int(total_images * 0.7)
test_count = int(total_images * 0.15)
val_count = total_images - train_count - test_count

# Create directories if they don't exist
for dir_path in [train_dir, test_dir, val_dir]:
    os.makedirs(dir_path, exist_ok=True)

# Keep track of moved images
moved_images = []

# Iterate through the shuffled image list and move images to partitions
for idx, image_file in enumerate(image_files):
    if idx < train_count:
        dst_dir = train_dir
    elif idx < train_count + test_count:
        dst_dir = test_dir
    else:
        dst_dir = val_dir
    
    # Move the image file to the appropriate partition directory if not already moved
    if image_file not in moved_images:
        src_path = os.path.join(images_dir, image_file)
        dst_path = os.path.join(dst_dir, image_file)
        shutil.move(src_path, dst_path)
        moved_images.append(image_file)

print("Images have been randomly partitioned and moved without replication.")


In [None]:
Iterate through the folders: 'train', 'test', and 'val'
for folder in ['train', 'test', 'val']:
    # Iterate through the files in the 'images' folder of each folder
    for file in os.listdir(os.path.join('data', folder, 'images')):
        # Get the filename without extension and create a corresponding JSON filename
        filename = file.split('.')[0] + '.json'
        existing_filepath = os.path.join('data', 'labels', filename)
        
        # Check if the JSON file already exists in 'labels' folder
        if os.path.exists(existing_filepath):
            new_filepath = os.path.join('data', folder, 'labels', filename)
            # Replace the existing JSON file with the new one in the respective folder
            os.replace(existing_filepath, new_filepath)


In [None]:
# Define the augmentation pipeline using Albumentations library
augmentor = alb.Compose([alb.RandomCrop(width=450, height=450), 
                         alb.HorizontalFlip(p=0.5), 
                         alb.RandomBrightnessContrast(p=0.2),
                         alb.RandomGamma(p=0.2), 
                         alb.RGBShift(p=0.2), 
                         alb.VerticalFlip(p=0.5)], 
                       bbox_params=alb.BboxParams(format='albumentations', 
                                                  label_fields=['class_labels']))

In [None]:
# Read an image using OpenCV
img = cv2.imread(os.path.join('data', 'train', 'images', '3a38cf81-4549-11ee-99a6-00e93a59cab9.jpg'))

In [None]:
img

In [None]:
# Read the JSON label file
with open(os.path.join('data', 'train', 'labels', '3a38cf81-4549-11ee-99a6-00e93a59cab9.json'), 'r') as f:
    label = json.load(f)


In [None]:
# retrieves the list of points from the first shape in the shapes field of the JSON label.
label['shapes'][0]['points']

In [None]:
# Extract coordinates from label data
coords = [0, 0, 0, 0]  # Initialize the coordinates list
coords[0] = label['shapes'][0]['points'][0][0]  # x-coordinate of the first point
coords[1] = label['shapes'][0]['points'][0][1]  # y-coordinate of the first point
coords[2] = label['shapes'][0]['points'][1][0]  # x-coordinate of the second point
coords[3] = label['shapes'][0]['points'][1][1]  # y-coordinate of the second point


In [None]:
coords

In [None]:
# Normalize the coordinates
coords = list(np.divide(coords, [640,480,640,480]))

In [None]:
coords

In [None]:
# Apply augmentation to the image and bounding box
augmented = augmentor(image=img, bboxes=[coords], class_labels=['face'])

In [None]:
# check type
type(augmented)

In [None]:
# check keys
augmented.keys()

In [None]:
# check shape
augmented['image'].shape


In [None]:
augmented['bboxes']

In [None]:
augmented['bboxes'][0][:2]

In [None]:
# Draw a rectangle on the augmented image using the augmented bounding box coordinates
cv2.rectangle(augmented['image'], 
              tuple(np.multiply(augmented['bboxes'][0][:2], [450,450]).astype(int)),
              tuple(np.multiply(augmented['bboxes'][0][2:], [450,450]).astype(int)), 
                    (255,0,0), 2)

plt.imshow(augmented['image'])

In [None]:
import os

# Define the directory names
base_dir = 'aug_data'
partitions = ['train', 'test', 'val']
sub_dirs = ['images', 'labels']

# Create the main directory if it doesn't exist
if not os.path.exists(base_dir):
    os.makedirs(base_dir)

# Create the subdirectories for each partition
for partition in partitions:
    partition_dir = os.path.join(base_dir, partition)
    if not os.path.exists(partition_dir):
        os.makedirs(partition_dir)
    for sub_dir in sub_dirs:
        sub_dir_path = os.path.join(partition_dir, sub_dir)
        if not os.path.exists(sub_dir_path):
            os.makedirs(sub_dir_path)


In [None]:
# Loop through each partition ('train', 'test', 'val')
for partition in ['train', 'test', 'val']:
    # Loop through each image in the current partition
    for image in os.listdir(os.path.join('data', partition, 'images')):
        img = cv2.imread(os.path.join('data', partition, 'images', image))

        # Initialize coordinates for the bounding box
        coords = [0, 0, 0.00001, 0.00001]
        
        # Check if a label file exists for the current image
        label_path = os.path.join('data', partition, 'labels', f'{image.split(".")[0]}.json')
        if os.path.exists(label_path):
            with open(label_path, 'r') as f:
                label = json.load(f)
            
            # Extract bounding box coordinates from label
            coords[0] = label['shapes'][0]['points'][0][0]
            coords[1] = label['shapes'][0]['points'][0][1]
            coords[2] = label['shapes'][0]['points'][1][0]
            coords[3] = label['shapes'][0]['points'][1][1]
            
            # Normalize bounding box coordinates
            coords = list(np.divide(coords, [640, 480, 640, 480]))

        try:
            # Augment the image multiple times (60 times in this case)
            for x in range(60):
                # Apply augmentations to the image and bounding box
                augmented = augmentor(image=img, bboxes=[coords], class_labels=['face'])
                
                # Save the augmented image
                cv2.imwrite(os.path.join('aug_data', partition, 'images', f'{image.split(".")[0]}.{x}.jpg'), augmented['image'])

                # Create an annotation dictionary for the augmented image
                annotation = {}
                annotation['image'] = image

                # Check if bounding box exists in augmented image
                if os.path.exists(label_path):
                    if len(augmented['bboxes']) == 0:
                        annotation['bbox'] = [0, 0, 0, 0]
                        annotation['class'] = 0
                    else:
                        annotation['bbox'] = augmented['bboxes'][0]
                        annotation['class'] = 1
                else:
                    annotation['bbox'] = [0, 0, 0, 0]
                    annotation['class'] = 0

                # Save the annotation as a JSON file
                with open(os.path.join('aug_data', partition, 'labels', f'{image.split(".")[0]}.{x}.json'), 'w') as f:
                    json.dump(annotation, f)

        except Exception as e:
            print(e)


In [None]:
# Load augmented image file paths for training
train_images = tf.data.Dataset.list_files('aug_data\\train\\images\\*.jpg', shuffle=False)

# Map the load_image function to decode image files
train_images = train_images.map(load_image)

# Resize images to a common size
train_images = train_images.map(lambda x: tf.image.resize(x, (120,120)))

# Normalize pixel values to [0, 1]
train_images = train_images.map(lambda x: x/255)


In [None]:
# Load augmented image file paths for training
test_images = tf.data.Dataset.list_files('aug_data\\test\\images\\*.jpg', shuffle=False)

# Map the load_image function to decode image files
test_images = test_images.map(load_image)

# Resize images to a common size
test_images = test_images.map(lambda x: tf.image.resize(x, (120,120)))

# Normalize pixel values to [0, 1]
test_images = test_images.map(lambda x: x/255)


In [None]:
# Load augmented image file paths for training
val_images = tf.data.Dataset.list_files('aug_data\\val\\images\\*.jpg', shuffle=False)

# Map the load_image function to decode image files
val_images = val_images.map(load_image)

# Resize images to a common size
val_images = val_images.map(lambda x: tf.image.resize(x, (120,120)))

# Normalize pixel values to [0, 1]
val_images = val_images.map(lambda x: x/255)


In [None]:
# Convert the next image from the iterator to a numpy array
train_images.as_numpy_iterator().next()

In [None]:
def load_labels(label_path):
    # Load the JSON label file
    with open(label_path.numpy(), 'r', encoding="utf-8") as f:
        label = json.load(f)
        
    # Extract the class label and bounding box coordinates
    return [label['class']], label['bbox']

In [None]:
# Load train labels
train_labels = tf.data.Dataset.list_files('aug_data\\train\\labels\\*.json', shuffle=False)
# Map the load_labels function to each label file path
train_labels = train_labels.map(lambda x: tf.py_function(load_labels, [x], [tf.uint8, tf.float16]))

In [None]:
# Load test labels
test_labels = tf.data.Dataset.list_files('aug_data\\test\\labels\\*.json', shuffle=False)
# Map the load_labels function to each test label file path
test_labels = test_labels.map(lambda x: tf.py_function(load_labels, [x], [tf.uint8, tf.float16]))

In [None]:
# Load validation labels
val_labels = tf.data.Dataset.list_files('aug_data\\val\\labels\\*.json', shuffle=False)
# Map the load_labels function to each validation label file path
val_labels = val_labels.map(lambda x: tf.py_function(load_labels, [x], [tf.uint8, tf.float16]))

In [None]:
# Convert the next image from the iterator to a numpy array
val_labels.as_numpy_iterator().next()

In [None]:
# Calculate the lengths
len(train_images), len(train_labels), len(test_images), len(test_labels), len(val_images), len(val_labels)

In [None]:
# Create the train dataset by zipping together train_images and train_labels
train = tf.data.Dataset.zip((train_images, train_labels))

# Shuffle the train dataset with a buffer size of 5000
train = train.shuffle(5000)

# Batch the train dataset with a batch size of 8
train = train.batch(8)

# Prefetch the train dataset to improve performance by overlapping data preprocessing and model execution
train = train.prefetch(4)


In [None]:
# Create the test dataset by zipping together test_images and test_labels
test = tf.data.Dataset.zip((test_images, test_labels))

# Shuffle the test dataset with a buffer size of 1300
test = test.shuffle(1300)

# Batch the test dataset with a batch size of 8
test = test.batch(8)

# Prefetch the test dataset to improve performance by overlapping data preprocessing and model execution
test = test.prefetch(4)


In [None]:
# Create the validation dataset by zipping together val_images and val_labels
val = tf.data.Dataset.zip((val_images, val_labels))

# Shuffle the validation dataset with a buffer size of 1000
val = val.shuffle(1000)

# Batch the validation dataset with a batch size of 8
val = val.batch(8)

# Prefetch the validation dataset to improve performance by overlapping data preprocessing and model execution
val = val.prefetch(4)


In [None]:
# Fetch the shape of the first image in the next batch from the training dataset
train.as_numpy_iterator().next()[0].shape

In [None]:
# Fetch the labels from the next batch in the training dataset
train.as_numpy_iterator().next()[1]

In [None]:
# Create an iterator for the training dataset
data_samples = train.as_numpy_iterator()

In [None]:
# Retrieve the next batch of data from the iterator
res = data_samples.next()

In [None]:
Create a figure with 4 subplots
fig, ax = plt.subplots(ncols=4, figsize=(20,20))

# Iterate over the first 4 samples in the batch
for idx in range(4): 
    sample_image = res[0][idx]
    sample_coords = res[1][1][idx]
    
#     Draw a rectangle around the detected face
    cv2.rectangle(sample_image, 
                  tuple(np.multiply(sample_coords[:2], [120,120]).astype(int)),
                  tuple(np.multiply(sample_coords[2:], [120,120]).astype(int)), 
                        (255,0,0), 2)

#     Display the sample image in the corresponding subplot
    ax[idx].imshow(sample_image)


In [None]:
# Load the VGG16 model without the fully connected layers (top)
vgg = VGG16(include_top=False)


In [None]:
# check summary
vgg.summary()


In [None]:
def build_model(): 
    # Define the input layer with shape (120, 120, 3)
    input_layer = Input(shape=(120, 120, 3))
    
    # Load the VGG16 model without the fully connected layers (top)
    vgg = VGG16(include_top=False)(input_layer)

    # Classification Model: GlobalMaxPooling2D followed by Dense layers
    f1 = GlobalMaxPooling2D()(vgg)
    class1 = Dense(2048, activation='relu')(f1)
    class2 = Dense(1, activation='sigmoid')(class1)
    
    # Bounding box model: GlobalMaxPooling2D followed by Dense layers
    f2 = GlobalMaxPooling2D()(vgg)
    regress1 = Dense(2048, activation='relu')(f2)
    regress2 = Dense(4, activation='sigmoid')(regress1)
    
    # Create the combined model with both classification and bounding box outputs
    facetracker = Model(inputs=input_layer, outputs=[class2, regress2])
    return facetracker


In [None]:
# Create the facetracker model using the build_model function
facetracker = build_model()

In [None]:
# Display a summary of the facetracker model's architecture
facetracker.summary()

In [None]:
# Get the next batch of training data
X, y = train.as_numpy_iterator().next()

In [None]:
# shape
X.shape


In [None]:
# Make predictions using the facetracker model
classes, coords = facetracker.predict(X)

In [None]:
classes, coords

In [None]:
# Calculate the number of batches per epoch
batches_per_epoch = len(train)

# Calculate the learning rate decay based on the specified formula
lr_decay = (1./0.75 - 1) / batches_per_epoch

In [None]:
# # Create an Adam optimizer with the specified learning rate and decay
# opt = tf.keras.optimizers.Adam(learning_rate=0.0001, decay=lr_decay)

In [None]:
# Create an Adam optimizer with the specified learning rate and decay
opt = tf.keras.optimizers.legacy.Adam(learning_rate=0.0001, decay=lr_decay)


In [None]:
def localization_loss(y_true, yhat):
    # Calculate the squared difference between true and predicted coordinates
    delta_coord = tf.reduce_sum(tf.square(y_true[:, :2] - yhat[:, :2]))

    # Calculate the height and width of true and predicted bounding boxes
    h_true = y_true[:, 3] - y_true[:, 1]
    w_true = y_true[:, 2] - y_true[:, 0]

    h_pred = yhat[:, 3] - yhat[:, 1]
    w_pred = yhat[:, 2] - yhat[:, 0]

    # Calculate the squared difference in size of bounding boxes
    delta_size = tf.reduce_sum(tf.square(w_true - w_pred) + tf.square(h_true - h_pred))

    # Return the sum of squared coordinate difference and size difference
    return delta_coord + delta_size

In [None]:
# Define the classification loss using Binary Crossentropy
classloss = tf.keras.losses.BinaryCrossentropy()

# Define the localization loss using your previously defined localization_loss function
regressloss = localization_loss

In [None]:
localization_loss(y[1], coords)


In [None]:
classloss(y[0], classes)


In [None]:
regressloss(y[1], coords)

In [None]:
# Define the FaceTracker model as a subclass of tf.keras.Model
class FaceTracker(Model): 
    def __init__(self, eyetracker,  **kwargs): 
        super().__init__(**kwargs)
        self.model = eyetracker

    # Override the compile method to set loss functions and optimizer
    def compile(self, opt, classloss, localizationloss, **kwargs):
        super().compile(**kwargs)
        self.closs = classloss
        self.lloss = localizationloss
        self.opt = opt
    
    # Override the training step to define the training process
    def train_step(self, batch, **kwargs): 
        X, y = batch
        
        # Use GradientTape to compute gradients
        with tf.GradientTape() as tape: 
            classes, coords = self.model(X, training=True)
            
            batch_classloss = self.closs(y[0], classes)
            batch_localizationloss = self.lloss(tf.cast(y[1], tf.float32), coords)
            
            total_loss = batch_localizationloss + 0.5 * batch_classloss
            
            grad = tape.gradient(total_loss, self.model.trainable_variables)
        
        # Apply gradients using the optimizer
        self.opt.apply_gradients(zip(grad, self.model.trainable_variables))
        
        # Return loss values for monitoring
        return {"total_loss": total_loss, "class_loss": batch_classloss, "regress_loss": batch_localizationloss}
    
    # Override the testing step to define the testing process
    def test_step(self, batch, **kwargs): 
        X, y = batch
        
        classes, coords = self.model(X, training=False)
        
        batch_classloss = self.closs(y[0], classes)
        batch_localizationloss = self.lloss(tf.cast(y[1], tf.float32), coords)
        total_loss = batch_localizationloss + 0.5 * batch_classloss
        
        # Return loss values for monitoring
        return {"total_loss": total_loss, "class_loss": batch_classloss, "regress_loss": batch_localizationloss}
        
    # Override the call method to pass inputs through the model
    def call(self, X, **kwargs): 
        return self.model(X, **kwargs)


In [None]:
# Create an instance of the FaceTracker model
model = FaceTracker(facetracker)

In [None]:
# Compile the model with optimizer and loss functions
model.compile(opt, classloss, regressloss)

In [None]:
# Specify the log directory for TensorBoard
logdir = 'logs'

In [None]:
# Create a TensorBoard callback to log training progress
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=logdir)

In [None]:
# Train the model using the train dataset and include the TensorBoard callback
model.fit(train, epochs=num_epochs, callbacks=[tensorboard_callback])

In [None]:
hist.history

In [None]:
fig, ax = plt.subplots(ncols=3, figsize=(20,5))

# Plotting the total loss and validation loss
ax[0].plot(hist.history['total_loss'], color='teal', label='loss')
ax[0].plot(hist.history['val_total_loss'], color='orange', label='val loss')
ax[0].title.set_text('Loss')  # Set title for the subplot
ax[0].legend()  # Add legend to the plot

# Plotting the classification loss and validation classification loss
ax[1].plot(hist.history['class_loss'], color='teal', label='class loss')
ax[1].plot(hist.history['val_class_loss'], color='orange', label='val class loss')
ax[1].title.set_text('Classification Loss')  # Set title for the subplot
ax[1].legend()  # Add legend to the plot

# Plotting the regression loss and validation regression loss
ax[2].plot(hist.history['regress_loss'], color='teal', label='regress loss')
ax[2].plot(hist.history['val_regress_loss'], color='orange', label='val regress loss')
ax[2].title.set_text('Regression Loss')  # Set title for the subplot
ax[2].legend()  # Add legend to the plot

plt.show()  # Display the subplots


In [None]:
# Get an iterator for the test dataset
test_data = test.as_numpy_iterator()

In [None]:
# Get a batch of test data
test_sample = test_data.next()

In [None]:
# Use the face detection model to predict on the test batch
yhat = facetracker.predict(test_sample[0])

In [None]:
# Display the test samples with predicted face bounding boxes
fig, ax = plt.subplots(ncols=4, figsize=(20,20))
for idx in range(4): 
    sample_image = test_sample[0][idx]
    sample_coords = yhat[1][idx]
    
    if yhat[0][idx] > 0.9:
        cv2.rectangle(sample_image, 
                      tuple(np.multiply(sample_coords[:2], [120,120]).astype(int)),
                      tuple(np.multiply(sample_coords[2:], [120,120]).astype(int)), 
                            (255,0,0), 2)
    
    ax[idx].imshow(sample_image)

In [None]:
# Save the trained facetracker model
facetracker.save('facetracker.h5')


In [None]:
# Load the saved facetracker model
facetracker = load_model('facetracker.h5')

In [None]:
import cv2
import tensorflow as tf

# Load the face detection model
facetracker = tf.keras.models.load_model('facetracker.h5')

# Open the webcam
cap = cv2.VideoCapture(0)

while cap.isOpened():
    # Read a frame from the webcam
    ret, frame = cap.read()

    # Crop the frame to a specific region
    frame = frame[50:500, 50:500, :]

    # Convert the frame to RGB and resize it for processing
    rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    resized = tf.image.resize(rgb, (120, 120))

    # Predict using the face detection model
    yhat = facetracker.predict(np.expand_dims(resized / 255, 0))
    sample_coords = yhat[1][0]

    if yhat[0] > 0.5:
        # Controls the main rectangle
        cv2.rectangle(frame,
                      tuple(np.multiply(sample_coords[:2], [450, 450]).astype(int)),
                      tuple(np.multiply(sample_coords[2:], [450, 450]).astype(int)),
                      (255, 0, 0), 2)
        
        # Controls the label rectangle
        cv2.rectangle(frame,
                      tuple(np.add(np.multiply(sample_coords[:2], [450, 450]).astype(int),
                                   [0, -30])),
                      tuple(np.add(np.multiply(sample_coords[:2], [450, 450]).astype(int),
                                   [80, 0])),
                      (255, 0, 0), -1)
        
        # Controls the text rendered
        cv2.putText(frame, 'face', tuple(np.add(np.multiply(sample_coords[:2], [450, 450]).astype(int),
                                                 [0, -5])),
                    cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)

    # Display the modified frame
    cv2.imshow('EyeTrack', frame)

    # Exit loop if 'q' is pressed
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# Release the webcam and close windows
cap.release()
cv2.destroyAllWindows()
