In [None]:
# Installs Dependencies
#!pip install matplotlib
#!pip install opencv-python
#!pip install tensorflow
#!pip install tensorflow-gpu

In [None]:
# Imports Dependencies
import cv2
import numpy as np
import os
import random
import tensorflow as tf

# For generating unique image names, stands for Universally Unique Identifier (UUID)
import uuid

from matplotlib import pyplot as plt

from tensorflow.keras.layers import Conv2D
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Flatten
from tensorflow.keras.layers import Input
from tensorflow.keras.layers import Layer
from tensorflow.keras.layers import MaxPooling2D

# Imports model metric calculations
from tensoflow.keras.metrics import Precision
from tensoflow.keras.metrics import Precision

from tensorflow.keras.models import Model

In [None]:
'''
Sets GPU Growth

Important in order to avoid potential Out Of Memory (OOM) erros as tensforlow by default will
    take up as much memory as it can when running, therefore limiting GPU memory consumption
    growth is rather important
'''
gpus = tf.config.experimental.list_physical_devices('GPU')

for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)
    print(gpu)

In [None]:
# Creates Folder Structures

# Sets up data file paths
data_file_path = '../../Data/FacialRecognition'

POS_IMGS_PATH = os.path.join(data_file_path, 'Positive')
NEG_IMGS_PATH = os.path.join(data_file_path, 'Negative')
ANC_IMGS_PATH = os.path.join(data_file_path, 'Anchor')

In [None]:
# Generates the actual files and directories
os.makedirs(POS_IMGS_PATH)
os.makedirs(NEG_IMGS_PATH)
os.makedirs(ANC_IMGS_PATH)

In [None]:
# Collects Positive & Negative Images

# Untars Labelled Faces In The Wild Dataset

# Decompresses the downloaded filed
!tar -xf lfw.tar

In [None]:
# Moves all images to the negative images data repository and folder
for directory in os.listdir('lfw'):
    
    # Use of the below if statement is due to MacOS file system structure
    if directory != '.DS_Store':
        for file in os.listdir(os.path.join('lfw', directory)):
            EX_PATH = os.path.join('lfw', directory, file)
            NEW_PATH = os.path.join(NEG_IMGS_PATH, file)
            os.replace(EX_PATH, NEW_PATH)

In [None]:
# Collects Positve & Anchor Classes

# Establishes a connection to the computer's webcam
image_capture = cv2.VideoCapture(0)

while image_capture.isOpened():
    
    # Returns the value as well as the actual captured image frame itself
    return_value, image_frame = image_capture.read()
    
    '''
    Slicing of the captured image frame to limit it to the desired 250 by 250 pixel dimensions
        Specifying the range of values desired from the image capture
    '''
    image_frame = image_frame[120:120 + 250, 200:200 + 250, : ]
    
    # Collects an anchor image upon hitting the 'a' key upon the keyboard
    if cv2.waitKey(1) & 0XFF == ord('a'):
        
        # Generates a unique file path
        image_name = os.path.join(ANC_IMGS_PATH, '{}.jpg'.format(uuid.uuid1()))
        
        # Writes out and saves the actual anchor image along with it's given generated name
        cv2.imwrite(image_name, image_frame)
    
    # Collects a positive image upon hitting the 'p' key upon the keyboard
    if cv2.waitKey(1) & 0XFF == ord('p'):
        image_name = os.path.join(POS_IMGS_PATH, '{}.jpg'.format(uuid.uuid1()))
        
        # Writes out and saves the actual anchor image along with it's given generated name
        cv2.imwrite(image_name, image_frame)
    
    # Renders back and shows the captured image frame onto the screen
    cv2.imshow('Image Collection: ', image_frame)
    
    # Breaking out of the loop gracefully, waiting for 1 milliseconds before waiting for the quitting key
    if cv2.waitKey(1) & 0XFF == ord('q'):
        break

# Releases the webcam
image_capture.release()

# Closes the frame displaying the captured image
cv2.destroyAllWindows()

In [None]:
# Loads & Preprocesses Images


'''
Obtains Image Directories

Creates multiple segregated datasets from the previously established image directories
Act as pipelines for all of the different images within the various directories
Utilizes wild card searches for any files that end with the '.jpg' extension
'''
positive_dataset = tf.data.Dataset.list_files(POS_IMGS_PATH + '\*.jpg').take(300)
negative_dataset = tf.data.Dataset.list_files(NEG_IMGS_PATH + '\*.jpg').take(300)
anchor_dataset = tf.data.Dataset.list_files(ANC_IMGS_PATH + '\*.jpg').take(300)

In [None]:
# Scales & Resizes Images

# Essentially returns the numpy equivalent of the image after processing
def preprocess(file_path):
    
    # Reads in the image as a byte slice object from the given passed in file path
    byte_image = tf.io.read_file(file_path)
    
    # Loads in the actual image
    image = tf.io.decode_jpeg(byte_image)
    
    # Preprocessing step, resizing the image to fit 100 by 100 pixels with 3 color channels
    image = tf.image.resize(image, (100, 100))
    
    # Scales the image values to be between 0 and 1
    image = image / 255.0
    
    return image

In [None]:
test_image = preprocess('image.jpg')

In [None]:
plt.imshow(test_image)

In [None]:
# Creates A Labelled Dataset

'''
Zips up the datasets with a generated, equally long dataset of 1 values 
    in order to mark said images as positives and categorized, creating
    a tuple of anchor and postive image file paths, along with the int
    value labelling them
'''
labelled_positive_dataset = tf.data.Dataset.zip((anchor_dataset, 
                                                 positive_dataset, 
                                                 tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))

'''
Zips up the datasets with a generated, equally long dataset of 0 values 
    in order to mark said images as negatives and uncategorized
'''
labelled_negative_dataset = tf.data.Dataset.zip((anchor_dataset, 
                                                 negative_dataset, 
                                                 tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))

labelled_dataset = labelled_positive_dataset.concatenate(labelled_negative_dataset)

In [None]:
sample_data = labelled_dataset.as_numpy_iterator()

In [None]:
sample_data_point = sample_data.next()

In [None]:
sample_data_point

In [None]:
# Build Training & Testing Partitions

# Preprocesses the dataset tuples' images
def preprocess_twin(input_image, validation_image, label):
    return(preprocess(input_image), preprocess(validation_image), label)

In [None]:
sample_preprocessing_result = preprocess_twin(*sample_data_point)

In [None]:
plt.imshow(sample_preprocessing_result[1])

In [None]:
# Builds the data loader pipeline
final_dataset = labelled_dataset.map(preprocess_twin)
final_dataset = final_dataset.cache()
final_dataset = final_dataset.shuffle(buffer_size = 1024)

In [None]:
final_dataset_samples = final_dataset.as_numpy_iterator()

In [None]:
final_dataset_samples.next()

In [None]:
len(final_dataset_samples.next())

In [None]:
# Establishes the training partition
training_dataset = final_dataset(round(len(final_dataset) * 0.7))
training_dataset = training_dataset.batch(16)
training_dataset = training_dataset.prefetch(8)

In [None]:
training_dataset_samples = training_dataset.as_numpy_iterator()

In [None]:
sample_training_datapoint= training_dataset_samples.next()

In [None]:
# Establishes the testing partition
testing_dataset = final_dataset.skip(round(len(final_dataset) * 0.7))
testing_dataset = testing_dataset.take(round(len(final_dataset) * 0.3))
testing_dataset = testing_dataset.batch(16)
testing_dataset = testing_dataset.prefetch(8)

In [None]:
# Model Engineering

'''
Builds the model embedding layer
    Translating the input images of faces into an embedded layer feature vector
    2 Rivers of data flowing through the neural network, the anchor and then positive or negative image
        Forms the basis of the 1 shot classification outcome
        Each river will output a feature vector of 4096 units
'''
def make_embedding_layer():
    input1 = Input(shape = (100, 100, 3), name = 'input_image')
    
    # 1st Block
    
    # Passes 64 filters with a 10 by 10 pixel shape
    convolution1 = Conv2D(64, (10, 10), activation = 'relu')(input1)
    
    # Condenses down the amount of data into a signle value within a 2 x 2 pixel area
    max_pooling1 = MaxPooling2D(64, (2, 2), padding = 'same')(convolution1)
    
    # 2nd Block
    convolution2 = Conv2D(128, (7, 7), activation = 'relu')(max_pooling1)
    max_pooling2 = MaxPooling2D(64, (2, 2), padding = 'same')(convolution2)
    
    # 3rd Block
    convolution3 = Conv2D(128, (4, 4), activation = 'relu')(max_pooling2)
    max_pooling3 = MaxPooling2D(64, (2, 2), padding = 'same')(convolution3)
    
    # 4th Block
    convolution4 = Conv2D(256, (4, 4), activation = 'relu')(max_pooling3)
    
    # Reduces everything down to a signle flat dimension
    flatten1 = Flatten()(convolution4)
    
    dense1 = Dense(4096, activation = 'sigmoid')(flatten1)
    
    # Essentially returns a compiled version of the model
    return Model(inputs = [input1], outputs = [dense1], name = 'embedding')

In [None]:
embedding_layer_model = make_embedding_layer()

In [None]:
embedding_layer_model.summary()

In [None]:
# Builds The Distance Layer

'''
Creates a custom neural network layer
Have to combine and join the 2 rivers of data together, in this case by subtracting them from each other obtain
    an L1 Siamese distance layer, telling us how similar the 2 images are allowing for image recognition
Defining characteristc in a siamese neural network
'''

class L1Distance(Layer):
    
    '''
    Base 'init' method within a python class, performs inheritance
    'self' allows for taking actions upon itself
    Inclusion of '**kwargs' in arguments allows for the usage of the class' abstracted method it inherits
    '''
    def __init__(self, **kwargs):
        super().__init__()
    
    '''
    Core function tells the layer what actions to carry out when data is passed to it
        The 1st data river representing the anchor image is the 'input embedding'
        The 2nd data river representing the positive or negative image is the 'validation embedding'
        Returns the absolute value difference between the two embedding layers created from the 
            original images
    Performs a similarity calculation
    '''
    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding)

In [None]:
l1_distance_layer = L1Distance()

In [None]:
# Builds A Siamese Neural Network Model
def make_siamese_model():
    
    # Handles inputs, being 2 data streams, matches the shapes of the input images, raw
    
    # Anchor image input within the network
    input_image = Input(name = 'input_image', shape = (100, 100, 3))
    
    # Validation image input within the network
    validation_image = Input(name = 'validation_image', shape = (100, 100, 3))
    
    # Combines the Siamese distance components
    siamese_layer = L1Distance()
    siamese_layer._name = 'distance'
    distances = siamese_layer(embedding_layer_model(input_image), embedding_layer_model(validation_image))
    
    '''
    Classification layer
        Passing in 4096 units in
        Output 1 unit out, being either a value of 1 or 0 due to the sigmoid activation
        Classifies as either a match or a non match
    '''
    classifier = Dense(1, activation = 'sigmoid')(distances)
    
    return Model(inputs = [input_image, validation_image], outputs = classifier, name = 'SiameseNeuralNetwork')

In [None]:
siamese_neural_network = make_siamese_model()

In [None]:
siamese_neural_network.summary()

In [None]:
# Siamese Neural Network Model Training

# Sets Up Loss & Optimizer
binary_cross_loss = tf.losses.BinaryCrossentropy()
optimizer = tf.keras.optimizers.Adam(0.0001)

In [None]:
# Establishes Checkpoint Callbacks
checkpoint_file_path_directory = '../../Data/Models/Model_Training_Checkpoints'
checkpoint_prefix = os.path.join(checkpoint_file_path_directory, 'ckpt')
checkpoint = tf.train.Checkpoint(optimizer = optimizer, siamese_neural_network = siamese_neural_network)

In [None]:
# Builds Training Step Function

'''
What is used to effectively train upon 1 batch of data
    Makes a prediction
    Calculates loss
    Calculates gradients
    Apply backpropagation throughout the neural network in order to obtain the best possible model
Sames steps with whatever sort of neural network
'''

# Wraps the function with the decorator in order to compile said function into a callable TensorFlow graph
@tf.function
def train_step(batch):
    
    # Allows for the capturing of gradient values produced by the neural network model
    with tf.GradientTape() as tape:
    
        # Retrives the features, the anchor and positive / negative images
        x = batch[:2]
        
        # Retrives the label
        y = batch[2]
        
        # Passes data into the siamese model for prediction making
        y_hat = siamese_neural_network(x, training = True)
        
        # Calculates the training loss
        loss = binary_cross_loss(y, y_hat)
        
    # Calculates gradients for the loss with respect to these trainable variables of the given model
    gradient = tape.gradient(loss, siamese_neural_network.trainable_variables)
    
    # Calculates updated weights and applies them to the model via backpropagation
    optimizer.apply_gradients(zip, gradient, siamese_neural_network.trainnable_variables)

    return loss

In [None]:
# Builds The Training Loop

# Iterates and trains the siamese model over every batch made available from the dataset
def train(data, EPOCHS):
    
    # Loops through the epochs
    for epoch in range(1, EPOCHS + 1):
        print(f'\n Epoch {epoch}/{EPOCHS}')
        progress_bar = tf.keras.utils.Progbar(len(data))
        
        # Loops through each batch
        for index, batch in enumerate(data):
            train_step(batch)
            progress_bar.update(index + 1)
            
        # Optionally saves checkpoints after every 10 epochs of training
        #if epoch % 10 == 0
            #checkpoint.save(file_prefix = checkpoint_prefix)

In [None]:
# Trains The Model
EPOCHS = 50

In [None]:
history = train(training_dataset, EPOCHS)

In [None]:
# Evaluates The Model

# Retrives a single batch of testing data as a numpy equivalent
test_input, test_validation, y_true = testing_dataset.as_numpy_iterator().next()

In [None]:
# Carries out predictions
y_hat = siamese_neural_network.predict([test_input, test_validation])

In [None]:
# Post processing result for easier interpretability
[1 if prediction > 0.5 else 0 for prediction in y_hat]

In [None]:
y_true

In [None]:
# Creates a metric object specifically for recall and precision
metric1 = Recall()
metric2 = Precision()

# Calculates the recall and precision values, updating over time
metric1.update_state(y_true, y_hat)
metric2.update_state(y_true, y_hat)

# Returns the results
print(metric1.result().numpy())
print(metric2.result().numpy())

In [None]:
# Visualizes Results

# Sets the overall plot's size
plt.figure(figsize = (18, 8))

# Sets the 1st subplot, with the number of row, column and its index within
plt.subplot(1, 2, 1)
plt.imshow(test_input[0])

# Sets the 2nd subplot
plt.subplot(1, 2, 2)
plt.imshow(test_validation[0])

# Renders both the test and validation images cleanly
plt.show()

In [None]:
# Saves The Model

model_weights_file_path = '../../Data/Models'

siamese_neural_network.save(os.path.join(model_weights_file_path, 'siamese_model.h5'))

In [None]:
# Loads The Model
model = tf.keras.models.load_model(os.path.join(model_weights_file_path, 'siamese_model.h5'),
                                   custom_objects = {'L1Distance':L1Distance, 
                                                     'BinaryCrossentropy':tf.losses.BinaryCrossentropy})

In [None]:
# Real Time Verification

# Verification Function
def verify(detection_threshold, verification_threshold):
    
    # Instantiates a results array
    results = []
    
    '''
    Loops through all available positive images as validation against the single image being compared
        in order to maximize the chance of a correct prediction being made when given a live feed
    '''
    for image in os.listdir(POS_IMGS_PATH):
        input_image = preprocess(os.path.join('application_data', 'input_image', 'input_image.jpg'))
        validation_image = preprocess(os.path.join(POS_IMGS_PATH, image))
        
        # Makes a prediction and appends it to the results
        result = model.predict(list(np.expand_dims([input_image, validation_image], axis = 1)))
        results.append(result)
    
    # Detection Threshold: Metric above which a prediction is considered positive
    detection = np.sum(np.array(results) > detection_threshold)
    
    # Verification Threshold: Proportion of positive predictions over total positive samples
    verification = detection / len(os.listdir(POS_IMGS_PATH))
    verified = verificiation > verification_threshold
    
    return results, verified

In [None]:
# OpenCV Real Time Verification

image_capture = cv2.VideoCapture(0)

while image_capture.isOpened():
    return_value, image_frame = image_capture.read()

    image_frame = image_frame[120:120 + 250, 200:200 + 250, : ]

    cv2.imshow('Verification: ', image_frame)
    
    # Verification trigger
    if cv2.waitKey(10) & 0XFF == ord('v'):
        
        # Saves the input image to the applicatio data folder
        cv2.imwrite(os.path.join('application_data', 'input_image', 'input_image.jpg'), image_frame)
        
        # Runs verification
        results, verfied = verify(model, 0.9, 0.7)
        print(verfied)
    
    if cv2.waitKey(10) & 0XFF == ord('q'):
        break

image_capture.release()
cv2.destroyAllWindows()