# 1. Setup

## 1.1 Install Dependencies

In [1]:
# !pip install tensorflow opencv-python

## 1.2 Import Dependencies

In [2]:
# Import standard dependencies
import cv2
import os
import shutil
import random
import numpy as np
from matplotlib import pyplot as plt
from sklearn.model_selection import train_test_split

In [47]:
# Import metric calculations
from tensorflow.keras.metrics import Precision, Recall

In [3]:
# Import uuid library to generate unique image names
import uuid

In [8]:
# Import tensorflow dependencies - Functional API
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten
import tensorflow as tf

In [9]:
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus: 
    tf.config.experimental.set_memory_growth(gpu, True)

In [10]:
gpus

[]

In [11]:
tf.config.list_physical_devices('GPU')

[]

## 1.3 Create Folder Structures

In [6]:
"""
Create paths to directories for negative, positive, and anchor samples.

The code block uses the `os.path.join()` function to create paths to directories 
for negative, positive, and anchor samples in a machine learning or computer vision project. 

Args:
-----
NEG_PATH : str
    The path to the directory containing negative samples.

POS_PATH : dict
    A dictionary containing paths to directories for positive samples, with keys 
    of the form "POS{i}_PATH", where `{i}` is an integer between 1 and 5.

ANC_PATH : dict
    A dictionary containing paths to directories for anchor samples, with keys 
    of the form "ANC{i}_PATH", where `{i}` is an integer between 1 and 5.

The values for the keys in `POS_PATH` and `ANC_PATH` are created using the `os.path.join()` 
function to join the path components `'data'`, `'positive{i}'` or `'anchor{i}'`, where `{i}` 
is an integer between 1 and 5.

Returns:
-------
None

""" 

NEG_PATH = os.path.join('data', 'negative')

POS_PATH={}
ANC_PATH={}
for i in range(1, 6):
    POS_PATH[f"POS{i}_PATH"] = os.path.join('data', f'positive{i}')
    ANC_PATH[f"ANC{i}_PATH"] = os.path.join('data', f'anchor{i}')

In [None]:
"""
Create directories for negative, positive, and anchor samples.

The code block creates directories for negative, positive, and anchor samples in a machine 
learning or computer vision project using the `os.makedirs()` function. 

Args:
-----
NEG_PATH : str
    The path to the directory containing negative samples.

POS_PATH : dict
    A dictionary containing paths to directories for positive samples, with keys 
    of the form "POS{i}_PATH", where `{i}` is an integer between 1 and 5.

ANC_PATH : dict
    A dictionary containing paths to directories for anchor samples, with keys 
    of the form "ANC{i}_PATH", where `{i}` is an integer between 1 and 5.

`os.makedirs()` is used to create the directories specified by `NEG_PATH`, `POS_PATH`, 
and `ANC_PATH`.

Returns:
-------
None

"""
os.makedirs(NEG_PATH)
paths=[POS_PATH,ANC_PATH]
for p in paths:
    for value in p.values():
        os.makedirs(value)

# 2. Collect Negatives, Positives and Anchors images

In order to create a diverse and comprehensive dataset for training and testing the model, I am using three different sources. 

1- The first source is the "Labeled Faces in the Wild" (LFW) dataset, which contains thousands of images of celebrities in various poses and lighting conditions. 

2- The DrivFace dataset, which contains images of drivers taken from inside a car, with varying expressions and lighting conditions. 

3-I am taking a set of images using my laptop's camera and the OpenCV library. 
This allows me to capture images of myself under various lighting conditions and facial expressions, and add them to the dataset for increased diversity.

By combining these three sources, I am creating a dataset that is robust and diverse enough to effectively train and test the model on a wide range of real-world scenarios.

To ensure that the dataset is balanced and contains both positive and negative examples, I will use the LFW dataset as the source for negative images, while the other sources will be used to create positive and anchor images. 

Positive images will be those that contain a face that matches the anchor image, while anchor images are those that are randomly selected from the dataset and used as a reference for comparison. 

By using the LFW dataset as the source for negative images, I can ensure that the model is trained to accurately distinguish between faces that are present in the dataset and those that are not, which is crucial for effective face recognition in real-world scenarios.

## 2.1 Negative Images: Untar LFW Dataset

The link http://vis-www.cs.umass.edu/lfw/ is the homepage of the "Labeled Faces in the Wild" (LFW) dataset, which is a popular benchmark dataset in computer vision. The dataset contains over 13,000 images of faces collected from the internet, and is widely used for tasks such as face recognition and verification. The LFW dataset is notable for its large size, diversity of subjects, and real-world conditions, such as variability in pose, lighting, and facial expressions. The dataset is freely available for download, and has been used in numerous research papers and competitions in computer vision.

In [None]:
"""
Extract files from a compressed tar archive.

The code block uses the `tar` command to extract files from a compressed tar 
archive file named `lfw.tgz`. The `-xf` options are used with the `tar` command
to extract the contents of the archive and preserve file permissions and
ownership.

""" 
# !tar -xf lfw.tgz

In [None]:
"""
Copy files from the LFW dataset to the negative sample directory.

The code block loops over each directory and file in the `lfw` directory, which contains 
the Labeled Faces in the Wild (LFW) dataset. It uses the `shutil.copy()` function to copy 
each file to the `NEG_PATH` directory, which is a directory for storing negative samples.

Args:
-----
NEG_PATH : str
    The path to the directory containing negative samples.

Returns:
-------
None

"""
for directory in os.listdir('lfw'):
    for file in os.listdir(os.path.join('lfw', directory)):
        EX_PATH = os.path.join('lfw', directory, file)
        NEW_PATH = os.path.join(NEG_PATH, file)
        shutil.copy(EX_PATH, NEW_PATH)

## 2.2 Collect Positive and Anchor Classes

In [None]:
"""
Crop images in a folder to a specified size.

The `crop_img()` function takes a folder path, height shift, and width shift as input 
arguments, and crops each image in the folder to a specified size. The function loops 
through each file in the folder, opens the image, calculates the crop box coordinates 
based on the current image size and the specified size, and crops the image. The cropped 
image is then saved with the original filename.

Args:
-----
folder_path : str
    The path to the folder containing the images to be cropped.
height_shift : int
    The amount to shift the height of the crop box.
width_shift : int
    The amount to shift the width of the crop box.

Returns:
-------
None

"""
def crop_img(folder_path,height_shift, width_shift):
    # set the size of the cropped image
    crop_size = (250, 250)

    # loop through all files in the folder
    for filename in os.listdir(folder_path):
        # open the image file
        image_path = os.path.join(folder_path, filename)
        img = cv2.imread(image_path)
        # get the current size of the image
        height, width, _ = img.shape
        # calculate the coordinates of the top-left corner of the crop box
        left = int(((width+width_shift) - crop_size[0]) / 2)
        top = int(((height+height_shift) - crop_size[1]) / 2)
        right = int(((width+width_shift) + crop_size[0]) / 2)
        bottom = int(((height+height_shift) + crop_size[1]) / 2)
        # crop the image
        cropped_img = img[top:bottom, left:right]
        # save the cropped image with the original filename
        cv2.imwrite(os.path.join(folder_path, filename), cropped_img)

Next step, we should performs the following operations:

1- Iterates through files in the DrivImages* directory that contain a substring '0{number}' in their name.

2- Creates a new subdirectory in the data directory with the name anchor{number} if it does not exist.

3- Copies the selected files from DrivImages to the new subdirectory.

4- Calls the crop_img() function to crop the images in the new subdirectory to a size of 250x250.

5- Selects the first half of the files in the new subdirectory.

6- Moves the selected files to a new directory with the name positive{number} in the data directory.

*/ The DrivFace dataset is a collection of images designed for use in computer vision and machine learning applications. 

The dataset was collected using a camera mounted inside a car and includes images of drivers with varying head poses and lighting conditions. The dataset contains approximately 606 images of 4 individuals, with each individual's face captured under a range of different driving conditions. 

Along with the images, the dataset also includes metadata such as the driver's age, gender, and driving experience. 

The DrivFace dataset has been widely used in research on facial recognition, head pose estimation, and other computer vision tasks, and is available for download from the UCI Machine Learning Repository https://archive.ics.uci.edu/ml/datasets/DrivFace.

In [None]:
"""

Args:
-----
folder_path : str
    A string representing the path to the directory where the data
    will be stored.

src_path : str
    A string representing the path to the directory containing the source
    images.

number : int
    An integer representing the number of the anchor image being processed.

Returns:
-------

None. The function performs file I/O and image processing operations on the
specified directories.

"""
# set the path of the folder containing the files
folder_path = 'data'
src_path = 'DrivImages'
for number in range (1,5):
    subfolder_path = os.path.join(folder_path, f'anchor{number}')
    # loop through all files in the folder
    for filename in os.listdir(src_path):
        # check if the file contains the substring '_Number_' in its name
        if f'_0{number}_' in filename:
            
            # create a new folder with the subfolder name if it doesn't already exist
            if not os.path.exists(subfolder_path):
                os.mkdir(subfolder_path)
            # copy the file to the new subfolder
            src_file_path = os.path.join(src_path, filename)
            dst_file_path = os.path.join(subfolder_path, filename)
            shutil.copy(src_file_path, dst_file_path)
# Crop images to 250x250
    if number==1:
        crop_img(subfolder_path,10, 50)
    elif number==3:
        crop_img(subfolder_path,150, 50)
    elif number==2 or number==4:
        crop_img(subfolder_path,-50, 50)
#Move half of images to positive folder
    # Calculate the index to split the files list in half
    half_index = len(os.listdir(subfolder_path))//2

    # Select the first half of the files to move
    files_to_move = os.listdir(subfolder_path)[:half_index]

    # Loop over the files to move and move each one to the destination folder
    for file_name in files_to_move:
        source_path = os.path.join(subfolder_path, file_name)
        destination_path = os.path.join(folder_path, f'positive{number}', file_name)
        shutil.move(source_path, destination_path)

Now, to creat the third package of images that is used in the training, we establishes a connection to the webcam and captures frames. 

It crops the frame to a size of 250x250 pixels and waits for the user to press the 'a' key to save the current frame as an anchor image or the 'p' key to save it as a positive image. 

It displays the frame to the screen and breaks the loop when the user presses the 'q' key. 

Finally, it releases the webcam and closes the window displaying the captured images.

In [None]:
"""

Establish a connection to the webcam and capture frames. The captured frame is 
cropped to a size of 250x250 pixels. The user can press the 'a' key to save the current 
frame as an anchor image or the 'p' key to save it as a positive image. The captured 
image is displayed to the screen. The loop ends when the user presses the 'q' key. 
The webcam is then released and the window displaying the captured images is closed.

"""

cap = cv2.VideoCapture(0)
while cap.isOpened(): 
    ret, frame = cap.read()
   
    # Cut down frame to 250x250px
    frame = frame[120:120+250,200:200+250, :]
    
    # Collect anchors 
    if cv2.waitKey(1) & 0XFF == ord('a'):
        # Create the unique file path 
        imgname = os.path.join(ANC_PATH['ANC5_PATH'], '{}.jpg'.format(uuid.uuid1()))
        # Write out anchor image
        cv2.imwrite(imgname, frame)
    
    # Collect positives
    if cv2.waitKey(1) & 0XFF == ord('p'):
        # Create the unique file path 
        imgname = os.path.join(POS_PATH['POS5_PATH'], '{}.jpg'.format(uuid.uuid1()))
        # Write out positive image
        cv2.imwrite(imgname, frame)
    
    # Show image back to screen
    cv2.imshow('Image Collection', frame)
    
    # Breaking gracefully
    if cv2.waitKey(1) & 0XFF == ord('q'):
        break
        
# Release the webcam
cap.release()
# Close the image show frame
cv2.destroyAllWindows()

### 2.3 Data Augmentation

Perform data augmentation on an input image by taking a single input parameter "img", which is the image to be augmented. 

Data augmentation can be useful for generating a larger and more diverse dataset for training deep learning models.

The data augmentation process involves applying a set of random transformations to the input image. Specifically, the function applies:

1- A stateless random brightness adjustment.

2- A stateless random contrast adjustment.

3- A stateless random flip left-right.

4- A stateless random JPEG quality adjustment.

5- A stateless random saturation adjustment. 

The function iterates over the image nine times, applying a new set of random transformations to the image at each iteration. 

The resulting augmented images are stored in a list called "data" and are returned by the function. 

In [None]:
"""

The data_aug function applies several data augmentation techniques to an input 
image and returns a list of the augmented images.

Args:
-----
img: 
    a tensor representing an image, with shape (height, width, channels).

Returns:
-------
data: list
    a list of 9 tensors representing the augmented images. 
    Each tensor has the same shape as the input image. The augmentation techniques 
    applied include random brightness adjustment, random contrast adjustment,
    random horizontal flipping, random JPEG quality reduction, and random 
    saturation adjustment.
    The random seed used for each augmentation is determined by a combination of
    fixed and randomly generated seeds.

"""

def data_aug(img):
    
    data = []  # initialize an empty list to store augmented images
    
    for i in range(9):  # iterate 9 times for data augmentation
        
        # Apply a stateless random brightness adjustment to the image
        img = tf.image.stateless_random_brightness(img, max_delta=0.02, seed=(1,2))
        
        # Apply a stateless random contrast adjustment to the image
        img = tf.image.stateless_random_contrast(img, lower=0.6, upper=1, seed=(1,3))
        
        # Apply a stateless random flip left-right to the image
        img = tf.image.stateless_random_flip_left_right(img, seed=(np.random.randint(100),np.random.randint(100)))
        
        # Apply a stateless random JPEG quality adjustment to the image
        img = tf.image.stateless_random_jpeg_quality(img, min_jpeg_quality=90, max_jpeg_quality=100, seed=(np.random.randint(100),np.random.randint(100)))
        
        # Apply a stateless random saturation adjustment to the image
        img = tf.image.stateless_random_saturation(img, lower=0.9,upper=1, seed=(np.random.randint(100),np.random.randint(100)))
        
        # Append the augmented image to the data list
        data.append(img)
    
    return data  # return the list of augmented images


In [None]:
"""

This code applies data augmentation to a set of positive and anchor images using
the TensorFlow library. The augmented images are saved to the same directory as
the original images with a unique file name generated using uuid.

"""
# Define a list of paths for positive and anchor images
paths=[POS_PATH,ANC_PATH]

# Loop through each path in the list of paths
for p in paths:
    
    # Loop through each value (directory path) in the current path
    for value in p.values():
        
        # Loop through each file name in the directory
        for file_name in os.listdir(os.path.join(value)):
            
            # Get the image path for the current file name
            img_path = os.path.join(value, file_name)
            
            # Read the image using OpenCV
            img = cv2.imread(img_path)
            
            # Apply data augmentation to the image using the data_aug function defined earlier
            augmented_images = data_aug(img) 
            
            # Save each augmented image to the same directory with a unique file name using uuid
            for image in augmented_images:
                cv2.imwrite(os.path.join(value, '{}.jpg'.format(uuid.uuid1())), image.numpy())


# 3. Load and Preprocess Images

## 3.1 Get Image Directories

In [7]:
"""

The code creates TensorFlow datasets of image paths for negative, anchor,
and positive images, each containing 450 images. 
The anchor and positive datasets are created for five different paths, 
identified by the loop variable i, with the paths determined by the variables 
ANC_PATH and POS_PATH that contain the path names for each anchor and positive
dataset. 
Each dataset is stored in a separate global variable, with the variable name
constructed using the string concatenation of "anchor" or "positive" with the
loop variable i.

Args:
-----
    NEG_PATH (str): 
        The path to the directory containing negative images.
    ANC_PATH (dict): 
        A dictionary containing the paths to directories containing anchor images.
    POS_PATH (dict): 
        A dictionary containing the paths to directories containing positive images.
    
Returns:
-------
    None

"""
# Create a TensorFlow dataset of negative image paths and take the first 450
negative = tf.data.Dataset.list_files(NEG_PATH+'\*.jpg').take(450)

# Loop from 1 to 5 (inclusive)
for i in range(1,6):

    # Create a TensorFlow dataset of anchor image paths and take the first 450, and store in a global variable
    globals()["anchor"+str(i)]=tf.data.Dataset.list_files(ANC_PATH[f'ANC{i}_PATH']+'\*.jpg').take(450)
    
    # Create a TensorFlow dataset of positive image paths and take the first 450, and store in a global variable
    globals()["positive"+str(i)]=tf.data.Dataset.list_files(POS_PATH[f'POS{i}_PATH']+'\*.jpg').take(450)

In [None]:
"""
Print each element in the `negative` dataset.
It is important to note that if negative was created using the list_files()
method, the order of the elements may be shuffled each time the dataset is 
called due to the shuffling behavior of list_files().

Args:
-----
    negative: 
        A TensorFlow dataset object containing a list of file paths to negative images.

Returns:
-------
    None

"""
for i in negative:
    print(i)

## 3.2 Image Preprocessing - Scale and Resize

Image preprocessing is an essential step in many computer vision applications, including object detection and recognition. 

One of the critical preprocessing steps is **Scaling and Resizing** the input images to a fixed size suitable for the model's input.

Resizing and scaling images help in reducing the complexity of the model and improving its performance. Typically, image scaling and resizing are performed using a library like OpenCV or TensorFlow, which provide efficient and straightforward functions for these operations. 

Scaling and resizing images can also help in reducing the amount of memory required for storing the images, making it easier to process large datasets. 

In [8]:
 """
 
Preprocesses an image file given its path, including reading it in, decoding it,
resizing it to a fixed size, and scaling it to be between 0 and 1.

Args:
-----
 file_path: str
        A string representing the file path of the image.

Returns:
-------
    A tensor representing the preprocessed image with dimensions (105, 105, 3).

"""
def preprocess(file_path):
    
    # Read in image from file path
    byte_img = tf.io.read_file(file_path)
    
    # Load in the image 
    img = tf.io.decode_jpeg(byte_img)
    
    # Preprocessing steps - resizing the image to be 105x105x3
    img = tf.image.resize(img, (105,105))
    
    # Scale image to be between 0 and 1 
    img = img / 255.0

    # Return image
    return img

## 3.3 Create Labelled Dataset

Once these images have been preprocessed and augmented, the next step is to combine them into a single dataset that can be used for training and testing. 

This is done by zipping the anchor, positive, and negative examples together, and then concatenating them into a single dataset. By doing this, we can ensure that the model is trained on a balanced set of anchor-positive-negative triplets, which are necessary for learning effective face recognition. 

Once the dataset is constructed, it can be used to train and test the model, and further evaluated using various performance metrics such as accuracy, precision, and recall.

In [9]:
"""

Construct a dataset by zipping anchor, positive, and negative examples and
concatenating them. 

Args:
-----
    positives: A list of positive samples.
    anchors: A list of anchor samples.
    negative: A negative sample to be paired with anchor samples.

Returns:
-------
    data: tf.data.Dataset
        A dataset constructed by zipping anchor, positive, and negative examples and concatenating them.

"""

# Define datasets for positive and anchor data
positives = [globals()["positive{}".format(i)] for i in range(1, 6)]
anchors = [globals()["anchor{}".format(i)] for i in range(1, 6)]

# Create datasets for negative and positive pairs
datasets = []
for i in range(5):
    neg_dataset = tf.data.Dataset.zip((anchors[i], negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchors[i])))))
    pos_dataset = tf.data.Dataset.zip((anchors[i], positives[i], tf.data.Dataset.from_tensor_slices(tf.ones(len(anchors[i])))))
    datasets.extend([neg_dataset, pos_dataset])

# Concatenate all the datasets
data = datasets[0]
for i in range(1, len(datasets)):
    data = data.concatenate(datasets[i])
    

In [11]:
# Print the first element in the concatenated dataset
print(next(iter(data)))

(<tf.Tensor: shape=(), dtype=string, numpy=b'data\\anchor1\\f1b67e89-de4b-11ed-b422-14abc5de9ec7.jpg'>, <tf.Tensor: shape=(), dtype=string, numpy=b'data\\negative\\Kim_Hong-up_0001.jpg'>, <tf.Tensor: shape=(), dtype=float32, numpy=0.0>)


In [12]:
type(data)

tensorflow.python.data.ops.concatenate_op._ConcatenateDataset

In [13]:
# Length of the concatenated dataset
print(f'Length of the concatenated dataset: {len(data)} rows')

Length of the concatenated dataset: 4500 rows


## 3.4 Build Train and Test datasets

In [14]:
"""

Preprocesses a pair of input and validation images, and their corresponding
label.
    
Args:
-----
    - input_img (tf.Tensor): A tensor representing the input image.
    - validation_img (tf.Tensor): A tensor representing the validation image.
    - label (tf.Tensor): A tensor representing the label of the input image.

Returns:
-------
    - A tuple containing three preprocessed tensors:
        - The preprocessed input image tensor.
        - The preprocessed validation image tensor.
        - The original label tensor.
        
"""
# Define a function called 'preprocess_twin' that takes in three parameters.
def preprocess_twin(input_img, validation_img, label):
    
    # Return a tuple of three elements: 
    # 1- the preprocessed version of the input image using a function called 'preprocess',
    # 2- the preprocessed version of the validation image using the same function, and
    # 3- the label itself without any processing.
    return(preprocess(input_img), preprocess(validation_img), label)


In [15]:
"""

Preprocesses the input data using the `preprocess_twin` function, then caches 
the preprocessed data, and finally shuffles it.

Args:
-----
    data: A dataset containing the input data to be preprocessed, cached, and shuffled.

Returns:
-------
    The preprocessed, cached, and shuffled dataset.
    
"""

# Apply the function called 'preprocess_twin' to the input data using the 'map' function, which maps each element of the dataset through a function.
data = data.map(preprocess_twin)

# Cache the preprocessed dataset in memory to speed up access time for subsequent operations.
data = data.cache()

# Shuffle the preprocessed dataset using a buffer of size 10000 to ensure that the samples are randomly ordered.
data = data.shuffle(buffer_size=10000)


In [29]:
"""

Split the input data into training and testing datasets using the train_test_split function from the sklearn.model_selection module. 

args:
-----
    data (numpy array or pandas dataframe): 
        Input data to be split into training and testing datasets.
        
    test_size (float or int, default=0.3): 
        The proportion of the dataset to include in the test split.

Returns:
--------
    tuple: 
        A tuple containing the training and testing datasets.

"""

# Split the data into training and testing sets with 70-30 ratio
train_data = data.take(round(len(data)*.7))
test_data = data.skip(round(len(data)*.7))
test_data = test_data.take(round(len(data)*.3))

"""
Group the samples in the training and testing datasets into batches of size 16
for more efficient processing.

Pre-fetch the training and testing datasets to load the next batch of samples
while the current batch is being processed, which can further speed up training.
"""
train_data = train_data.batch(16).prefetch(8)
test_data = test_data.batch(16).prefetch(8)

In [30]:
# train_data.save("train_data")
# test_data.save("test_data")

In [31]:
# loaded_train_data = tf.data.Dataset.load("train_data")
# loaded_test_data = tf.data.Dataset.load("test_data")

# 4. Model Engineering

The paper "Siamese Neural Networks for One-shot Image Recognition" by Koch et al. provided the inspiration for building a model for one-shot image recognition. 

The paper proposes a method for learning a similarity metric between two images using a Siamese neural network architecture. 

The Siamese network consists of two identical sub-networks that share the same set of weights, and the input images are passed through these sub-networks to produce feature vectors. The distance between the feature vectors of two images is then used as a measure of similarity between those images. 

This approach allows the model to learn to recognize objects even with very few training examples. The paper's experimental results showed that their method outperformed previous methods for one-shot image recognition, demonstrating the potential of Siamese networks in this domain. 

By adapting the Siamese network architecture and training process described in this paper to my specific use case, I was able to build a model that achieved impressive accuracy in recognizing similar faces in images with only a few examples.

## 4.1 Build Embedding Layer

The function make_embedding() defines a sequential model to generate the embeddings for the Siamese network. 

The model has four convolutional layers with max pooling in between each layer. 

The first convolutional layer has 64 filters with a kernel size of (10,10) and an activation function of ReLU.

The second and third convolutional layers have 128 filters each, with kernel sizes of (7,7) and (4,4), respectively, and both have ReLU activation functions.

The final convolutional layer has 256 filters with a kernel size of (4,4) and also uses ReLU activation.

The output of this layer is flattened and passed through a dense layer with 4096 units and a sigmoid activation function.

This function returns the embedding model.

In [32]:
def make_embedding(): 
    
    # define the sequential model
    return Sequential([
        
        # First block
        Conv2D(64, (10,10), activation='relu', input_shape=(105,105,3)),
        MaxPooling2D(64, (2,2), padding='same'),
        
        # Second block
        Conv2D(128, (7,7), activation='relu'),
        MaxPooling2D(64, (2,2), padding='same'),

        # Third block
        Conv2D(128, (4,4), activation='relu'),
        MaxPooling2D(64, (2,2), padding='same'),
        
        # Final embedding block
        Conv2D(256, (4,4), activation='relu'),
        Flatten(),
        Dense(4096, activation='sigmoid')
    ], name='embedding')

In [35]:
"""

A function to create a sequential neural network model that consists of several
convolutional layers and a dense layer with a sigmoid activation function.
The resulting model is then assigned to the variable embedding. 
This model will be used for facial recognition in the subsequent steps of the 
code.

"""

embedding = make_embedding()

In [36]:
"""

This will print a summary of the embedding model's architecture, including the 
shape of the input and output tensors of each layer, the number of trainable
parameters, and the activation functions used.

"""

embedding.summary()

Model: "embedding"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d (Conv2D)             (None, 96, 96, 64)        19264     
                                                                 
 max_pooling2d (MaxPooling2D  (None, 48, 48, 64)       0         
 )                                                               
                                                                 
 conv2d_1 (Conv2D)           (None, 42, 42, 128)       401536    
                                                                 
 max_pooling2d_1 (MaxPooling  (None, 21, 21, 128)      0         
 2D)                                                             
                                                                 
 conv2d_2 (Conv2D)           (None, 18, 18, 128)       262272    
                                                                 
 max_pooling2d_2 (MaxPooling  (None, 9, 9, 128)        0 

## 4.2 Build Distance Layer

L1 distance, also known as Manhattan distance, is a measure of distance between two points in a space. 

In the context of embeddings, the L1 distance between two embeddings is calculated by taking the absolute difference between the values in each corresponding position of the two embeddings, and summing them up. 

It is a common distance metric used in image recognition and other related fields, where it is used to compare the similarity between two images represented as embeddings.

The smaller the L1 distance between two embeddings, the more similar the corresponding images are considered to be.

In [38]:
class L1Dist(Layer):
    """
    Layer that calculates the L1 distance between two embeddings.

    Args:
    -----
        **kwargs: additional arguments to pass to parent class.

    Methods:
        call(self, input_embedding, validation_embedding): Returns the L1 distance between
            input_embedding and validation_embedding using TensorFlow's abs() function.

    """

    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def call(self, input_embedding, validation_embedding):
        """
        Compute the L1 distance between two embeddings.

        Args:
        -----
            input_embedding (tf.Tensor): the input embedding tensor.
            validation_embedding (tf.Tensor): the validation embedding tensor.

        Returns:
        -------
            The L1 distance between the two input tensors.

        """
        return tf.math.abs(input_embedding - validation_embedding)


## 4.3 Make Siamese Model

Create a function that creates a Siamese Network model that takes in two images (anchor and validation) and returns the probability that they belong to the same class. 

It uses the make_embedding() function to create an embedding layer that is used to calculate the L1 distance between the two input images.

The L1 distance is then passed through a classification layer to predict whether the two images belong to the same class. 

The function returns the Siamese Network model.

In [39]:
"""

Creates a Siamese Network model that takes in two images (anchor and validation) 
and returns the probability that they belong to the same class. 

Returns:
-------
     Model: Siamese Network model

"""

def make_siamese_model(): 
    
    # Anchor image input in the network
    input_image = Input(name='input_img', shape=(105,105,3))
    
    # Validation image in the network 
    validation_image = Input(name='validation_img', shape=(105,105,3))
    
    # Combine siamese distance components
    siamese_layer = L1Dist()
    siamese_layer._name = 'distance'
    distances = siamese_layer(embedding(input_image), embedding(validation_image))
    
    # Classification layer 
    classifier = Dense(1, activation='sigmoid')(distances)
    
    return Model(inputs=[input_image, validation_image], outputs=classifier, name='SiameseNetwork')

In [40]:
# Creates a siamese network model using make_siamese_model() function.
siamese_model = make_siamese_model()

# Prints the model summary.
siamese_model.summary()

Model: "SiameseNetwork"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_img (InputLayer)         [(None, 105, 105, 3  0           []                               
                                )]                                                                
                                                                                                  
 validation_img (InputLayer)    [(None, 105, 105, 3  0           []                               
                                )]                                                                
                                                                                                  
 embedding (Sequential)         (None, 4096)         38960448    ['input_img[0][0]',              
                                                                  'validation_img[0][

# 5. Training

## 5.1 Setup Loss and Optimizer

In [41]:
"""

Create an instance of the BinaryCrossentropy class from the TensorFlow losses
module and assigns it to the variable binary_cross_loss.

This loss function is typically used in binary classification tasks where the 
goal is to predict a binary label (e.g. positive or negative). 

It measures the cross-entropy loss between the predicted probabilities and the
true labels.

The binary_cross_loss instance can be later used to compute the loss between 
the predicted output and the ground truth during the training of a model.

"""
binary_cross_loss = tf.losses.BinaryCrossentropy()

In [42]:
"""

Using the Adam optimizer from the Keras API in TensorFlow. 

The optimizer is being initialized with a learning rate of 0.0001 (1e-4), 
which will be used to update the model's weights during training.

The Adam optimizer is a popular optimization algorithm used for training deep
neural networks.

It is an adaptive learning rate optimization algorithm that computes individual
adaptive learning rates for different parameters from estimates of first and
second moments of the gradients.

This helps to update the weights in a more efficient way compared to traditional
optimization methods, such as stochastic gradient descent (SGD).

In this code, the Adam optimizer is being used with a learning rate of 0.0001,
which is a common value for many deep learning tasks. 

"""

opt = tf.keras.optimizers.Adam(1e-4) # 0.0001

## 5.2 Establish Checkpoints

Set up a TensorFlow checkpointing mechanism for saving and restoring the state of a model during training.

Once this tf.train.Checkpoint object is created, it can be used to save and restore the state of the optimizer and the model during training.

For example, you can call:
```python
checkpoint.save(file_prefix=checkpoint_prefix) 
```
to save the current state of the model and optimizer to a checkpoint file.

Later, you can restore the model and optimizer to the state saved in the checkpoint by calling :
    
```python
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir)).
```

In [43]:
"""a directory called "training_checkpoints" is created in the current working 
directory. This is the directory where the checkpoint files will be stored."""
checkpoint_dir = './training_checkpoints'

"""a variable checkpoint_prefix is defined, which is the prefix that will be 
used for the checkpoint files. The prefix is defined as the path to the 
checkpoint directory followed by the string "ckpt"."""
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')

"""a tf.train.Checkpoint object is created. This object takes two arguments: 
opt and siamese_model. 
The opt argument is the optimizer object that will be checkpointed, which in 
this case is the Adam optimizer defined earlier. 
The siamese_model argument is the Siamese network model that will be 
checkpointed."""
checkpoint = tf.train.Checkpoint(opt=opt, siamese_model=siamese_model)

## 5.3 Build Train Step Function

Define a training step function for a Siamese neural network using TensorFlow. 

In [44]:
"""
Perform a single training step for a Siamese neural network.

Args:
-----
    batch: A batch of training data, containing the anchor and positive/negative images
           and their corresponding labels.
           
Returns:
-------
    The loss value for this batch.
"""

"""
A decorator that converts the train_step() function into a TensorFlow graph.

This can improve performance by allowing the function to be compiled and 
optimized by TensorFlow's XLA compiler.
"""
@tf.function

#Define a training step function for a Siamese neural network using TensorFlow. 
def train_step(batch):
    
    # Create a gradient tape context, which records all the operations that occur within this block.
    with tf.GradientTape() as tape:     
       
        # Get the first two elements from the batch input, which should be the anchor and positive/negative images.
        X = batch[:2]
        
        # Get the third element from the batch input, which should be the label.
        y = batch[2]
        
        # Perform a forward pass through the Siamese network using the input images X, with the training=True argument indicating that we're in training mode.
        yhat = siamese_model(X, training=True)
        
        # calculates the binary cross-entropy loss between the predicted yhat and the true label y.
        loss = binary_cross_loss(y, yhat)
    
    # This can be useful for debugging or monitoring the training process, but can be removed if desired.
    print(loss)
        
    # Calculate the gradients of the loss with respect to the trainable variables in the Siamese model using the gradient tape.
    grad = tape.gradient(loss, siamese_model.trainable_variables)
    
    # Apply the calculated gradients to the trainable variables using the Adam optimizer.
    opt.apply_gradients(zip(grad, siamese_model.trainable_variables))
        
    # Return the value of the loss for this batch.
    return loss

## 5.4 Build Training Loop

Precision and Recall are commonly used to evaluate the performance of classification models.

**Precision** measures the proportion of true positives (i.e., correctly classified positive examples) among all examples classified as positive.

It is calculated as:
```
true_positives / (true_positives + false_positives).
```
**Recall** measures the proportion of true positives among all actual positive examples. 

It is calculated as:
```
true_positives / (true_positives + false_negatives).
```
Both metrics range from 0 to 1, with higher values indicating better performance.

They can be used to evaluate binary classification models or multiclass models that have been converted to a set of binary classification problems.

In [46]:
# Function takes two inputs: data, which is the training dataset, and EPOCHS, which is the number of epochs to train for.
def train(data, EPOCHS):
    
    # Loop through epochs
    for epoch in range(1, EPOCHS+1):
        
        #Prints the current epoch number to the console.
        print(f'\n Epoch {epoch}/{EPOCHS}')
        
        #Initializes a progress bar object to track the progress of the training.
        progbar = tf.keras.utils.Progbar(len(data))
        
        """Initializes the Recall and Precision metrics, which will be used to 
        track the model's performance during training."""
        r = Recall()
        p = Precision()
        
        # Loop through each batch
        for idx, batch in enumerate(data):
            
            #Calls the train_step() function to perform a single training step on the current batch.
            loss = train_step(batch)
            
            #Update the Recall and Precision metrics using the true labels batch[2] and the predicted labels yhat.
            yhat = siamese_model.predict(batch[:2])
            r.update_state(batch[2], yhat)
            p.update_state(batch[2], yhat) 
            
            #Updates the progress bar to show the progress of the current epoch.
            progbar.update(idx+1)
        
        #Prints the loss and the current values of the Recall and Precision metrics to the console.
        print(loss.numpy(), r.result().numpy(), p.result().numpy())
        
        """saves a checkpoint of the model weights every 10 epochs, 
        using the (checkpoint_prefix) that was defined earlier."""
        if epoch % 10 == 0: 
            checkpoint.save(file_prefix=checkpoint_prefix)

## 5.5 Train the model

Set the number of epochs to 50 and calls the train function with the train_data and EPOCHS as arguments to train the Siamese neural network model for 50 epochs.

The train function iterates over the training data for each epoch and updates the weights of the model to minimize the loss between the anchor, positive, and negative images. 

After each epoch, the function prints the average loss and accuracy of the training data. 

By training for more epochs, the model will have more opportunities to learn the features and patterns of the input images and potentially improve its performance on the task of one-shot image recognition.

In [None]:
EPOCHS = 50
train(train_data, EPOCHS)

# 6. Evaluate Model

## 6.1 Calculate Metrics

In [None]:
"""
Evaluate the trained siamese model on the test data and calculate precision and recall metrics.

Args:
-----
- test_data: a tensorflow Dataset object containing test data in the form of (input_1, input_2, label) tuples
- siamese_model: the trained siamese model to evaluate

Returns:
-------
- The calculated recall and precision values for the model on the test data
    
"""

# Initialize precision and recall metric objects
r = Recall()
p = Precision()

# Loop through each batch in the test data and update the metrics
for test_input, test_val, y_true in test_data.as_numpy_iterator():
    # Make predictions on the current batch
    yhat = siamese_model.predict([test_input, test_val])
    
    # Update the precision and recall metrics with the true labels and predicted labels
    r.update_state(y_true, yhat)
    p.update_state(y_true,yhat) 

# Print the final values of precision and recall
print(r.result().numpy(), p.result().numpy())

## 6.2 Show the results

In [None]:
"""
Plots the first image from the `test_input` and `test_val` datasets 
side-by-side.
"""

# Set plot size 
plt.figure(figsize=(10,8))

# Set first subplot
plt.subplot(1,2,1)
plt.imshow(test_input[0])

# Set second subplot
plt.subplot(1,2,2)
plt.imshow(test_val[0])

# Renders the plot on the screen
plt.show()

# 7. Save Model

## 7.1 Save the model as a regular TensorFlow model.

In [None]:
# Save the weights of the siamese_model to a file named "siamesemodel_v1.h5".
siamese_model.save('siamesemodel_v1.h5')

## 7.2 Save the model as a TFLite model.

A TFLite model is designed to be more efficient and lightweight compared to a regular TensorFlow model. It is optimized for mobile and edge devices, making it suitable for running models on mobile phones or other devices with limited resources. Running a TFLite model in the cloud can also be more efficient as it reduces the amount of data that needs to be transferred, improving the overall performance of the model.

The code provided is likely written to prepare the project for a TFLite model by optimizing it for mobile and edge devices. This may include simplifying the model architecture or reducing the size of the input and output data. 

Unfortunately, creating a TFLite model can be challenging, especially with limited resources. The Google Colab environment has some limitations in the free phase, including RAM limitations, which may make it difficult to train a model and convert it to TFLite format. However, it is possible to use other cloud services or dedicated hardware to train and convert TFLite models.





In [None]:
converter = tf.lite.TFLiteConverter.from_keras_model(siamese_model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]

tflite_qaware_model = converter.convert()
with open('tflite_model.tflite', 'wb') as f:
    f.write(tflite_qaware_model)

# 8. Real Time Test

## 8.1 Load a saved model

In [None]:
"""

 Load a previously saved model with custom loss functions and returns the 
 loaded model.
 
Parameters:
-----------

siamese_model: 
    A tf.keras.Model object that will be used to load the saved model.
    
'siamesemodel_v1.h5': 
    A string representing the name of the file where the model was saved.
    
custom_objects: 
    is used to specify additional custom layers or functions that are not 
    included in the standard Keras modules. When loading a model with custom 
    layers or loss functions, it is important to provide the corresponding 
    implementation so that the model can be properly reconstructed.
    
L1Dist:
    A custom distance metric that was used in the construction of the 
    Siamese network model. The L1Dist function computes the L1 distance between
    two vectors. It is not a standard Keras loss function and thus needs to be
    passed as a custom object to load_model() function, otherwise the loading 
    process would fail with an error.
    
BinaryCrossentropy:
    An object defines a custom loss function.

Returns:
-------

A tf.keras.Model object that contains the loaded model with the custom loss
functions.

"""
siamese_model = tf.keras.models.load_model(
    'siamesemodel_v1.h5', custom_objects={
        'L1Dist':L1Dist, 'BinaryCrossentropy':tf.losses.BinaryCrossentropy
    }
)

# View model summary
siamese_model.summary()

## 8.2 Use the model to make predictions

In [None]:
"""
Using the trained Siamese neural network model to make predictions on the test 
input pairs test_input and test_val.

The model then returns a distance measure between the two inputs.

The distance measure can be interpreted as a measure of similarity, 
with smaller distances indicating greater similarity between the inputs.

The actual interpretation of the distance measure will depend on the specific
implementation of the model.
"""

siamese_model.predict([test_input, test_val])

## 8.3 Verification Function

In [None]:
"""
Verify the given model using a set of verification images.

Args:
-----
    model (keras.Model): 
        A trained Keras model to be verified.
    
    detection_threshold (float): 
        The threshold above which a prediction is considered positive.
    
    verification_threshold (float): 
        The proportion of positive predictions required to pass verification.

Returns:
-------
    Tuple: A tuple containing:
        - results:
            A list of results for each verification image
        - verified:
            boolean indicating whether the verification threshold has been met.
"""

def verify(model, detection_threshold, verification_threshold):
    # Build results array
    results = []
    # Loop through each verification image
    for image in os.listdir(os.path.join('application_test', 'verification_images')):
        # Preprocess the input image
        input_img = preprocess(os.path.join('application_test', 'input_image', 'input_image.jpg'))
        # Preprocess the current verification image
        validation_img = preprocess(os.path.join('application_test', 'verification_images', image))
        
        # Make Predictions 
        result = model.predict(list(np.expand_dims([input_img, validation_img], axis=1)))
        # Add the prediction result to the results array
        results.append(result)
    
    # Detection Threshold: Metric above which a prediction is considered positive 
    detection = np.sum(np.array(results) > detection_threshold)
    
    # Verification Threshold: Proportion of positive predictions / total positive samples 
    verification = detection / len(os.listdir(os.path.join('application_test', 'verification_images'))) 
    # Check if the verification threshold has been met
    verified = verification > verification_threshold
    
    # Return the results and the verification status
    return results, verified


## 8.4  Real Time Verification

In [None]:
"""
Captures video from the default camera, crops the frame to a fixed size,
and displays it on screen.

Waits for the 'v' key to be pressed to trigger verification using a pre-trained
Siamese neural network model.

Prints the verification result to the console and continues to display the
camera feed until the 'q' key is pressed.
"""

# Set up the video capture object to read from the default camera (index 0)
cap = cv2.VideoCapture(0)

# Loop over the video frames as long as the capture object is open
while cap.isOpened():
    # Read a frame from the video capture object
    ret, frame = cap.read()
    # Crop the frame to a fixed size to remove unwanted parts
    frame = frame[120:120+250,200:200+250, :]
    
    # Display the cropped frame on screen using OpenCV's imshow function
    cv2.imshow('Verification', frame)
    
    # Wait for a key event to occur (with a delay of 10 milliseconds)
    if cv2.waitKey(10) & 0xFF == ord('v'):
        # If the 'v' key is pressed, save the current frame as the input image and run verification
        cv2.imwrite(os.path.join('application_data', 'input_image', 'input_image.jpg'), frame)
        results, verified = verify(siamese_model, 0.5, 0.5)
        # Print the verification result (True or False) to the console
        print(verified)
    
    # If the 'q' key is pressed, break out of the loop and stop the video capture
    if cv2.waitKey(10) & 0xFF == ord('q'):
        break

# Release the video capture resources and destroy the OpenCV windows
cap.release()
cv2.destroyAllWindows()


## 8.5 Load a saved TFLite model

In [None]:
"""
Load a TFLite model, preprocess input images, and run inference to get predicted values.

Args:
-----
    model_path (str): The path to the TFLite model file.
    input_img_path (str): The path to the input image file.
    validation_img_path (str): The path to the validation image file.
    input_shape (tuple): The shape of the input tensor.
    
Returns:
-------
    output_data (np.ndarray): A numpy array of the predicted values.
"""
# Load TFLite model
interpreter = tf.lite.Interpreter(model_path="tflite_model.tflite")
interpreter.allocate_tensors()

# Get input and output tensor details
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

# Define input shape
input_shape = (105, 105, 3)

# Load and preprocess input images
input_img = cv2.imread("application_test/9/aaa.jpg")
validation_img = cv2.imread("application_test/9/bbb.jpg")

# Resize input and validation images to (105, 105)
input_img = cv2.resize(input_img, input_shape[:2])
validation_img = cv2.resize(validation_img, input_shape[:2])

# Rescale input and validation images to [0, 1]
input_img = input_img.astype(np.float32) / 255.0
validation_img = validation_img.astype(np.float32) / 255.0

# Create input data array with two images
input_data = np.array([input_img, validation_img], dtype=np.float32)

# Select first image in the input data array
input_data = input_data[0]

# Add batch dimension to the selected image
input_data = np.expand_dims(input_data, axis=0)

# Set input tensor data
interpreter.set_tensor(input_details[0]['index'], input_data)

# Run inference
interpreter.invoke()

# Get output tensor data
output_data = interpreter.get_tensor(output_details[0]['index'])

# Print predicted values
print(output_data[0])