# *Facial Recognition with Siamese Neural Network*

https://www.cs.cmu.edu/~rsalakhu/papers/oneshot1.pdf

# Import dependencies

### Import standard dependencies

In [None]:
import cv2
import os
import random
import numpy as np
from matplotlib import pyplot as plt

### Import Tensorflow dependencies

In [None]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten
import tensorflow as tf

### Set GPU growth

In [None]:
# # Avoid out-of-memory error by limiting GPU comsumption
# gpus = tf.config.experimental.list_physical_devices('GPU')
# for gpu in gpus:
#     print(gpu)
#     tf.config.experimental.set_memory_growth(gpu, True)

### Create folder structures

In [None]:
# Setup paths

POS_PATH = os.path.join('data', 'positive') # ./data/positive
NEG_PATH = os.path.join('data', 'negative')
ANC_PATH = os.path.join('data', 'anchor')


In [None]:
# Create those folders

if not os.path.exists(POS_PATH):
    os.makedirs(POS_PATH)
if not os.path.exists(NEG_PATH):
    os.makedirs(NEG_PATH)
if not os.path.exists(ANC_PATH):
    os.makedirs(ANC_PATH)

# Collect positives and anchors

### Untar labelled faces in the wild dataset
### http://vis-www.cs.umass.edu/lfw/

### Put all those images into negative folder, as they are all negatives

In [None]:
# Move FaceID files into the following directory: data/negative

for directory in os.listdir('FaceID'):
    for file in os.listdir(os.path.join('FaceID', directory)):
        OLD_PATH = os.path.join('FaceID', directory, file)
        NEW_PATH = os.path.join(NEG_PATH, file)
        os.replace(OLD_PATH, NEW_PATH)

### Collect positive and anchor classes
* Anchor = input
* Positive = Correct
* Negative = Wrong

In [None]:
# Import uuid library to generate unique image names
import uuid

In [None]:
# Access webcam

cap = cv2.VideoCapture(2)

while cap.isOpened():
    ret, frame = cap.read() # ret = return value; frame = the actual image captured on webcam

    # slice/reshape our frame to size 250x250
    frame = frame[130:250+130, 200:200+250, :]

    # collect anchors when hit 'A'
    if cv2.waitKey(1) & 0xFF == ord('a'):
        imgName = os.path.join(ANC_PATH, '{}.jpg'.format(uuid.uuid1())) # image name and full path to save
        cv2.imwrite(imgName, frame) # save image

    # Collect positives when hit 'P'
    if cv2.waitKey(1) & 0xFF == ord('p'):
        imgName = os.path.join(POS_PATH, '{}.jpg'.format(uuid.uuid1())) # image name and full path to save
        cv2.imwrite(imgName, frame) # save image

    cv2.imshow('Image Collection', frame) # render/show the captured image onto the screen

    # quit when hit 'Q'
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# disconnect webcam, close the window
cap.release()
cv2.destroyAllWindows()

# Load and preprocess images

### Get image directories

In [None]:
# Take 300 samples/images from each (must have matching number of samples)

anchor = tf.data.Dataset.list_files(f'{ANC_PATH}\*.jpg').take(300)
negative = tf.data.Dataset.list_files(f'{NEG_PATH}\*.jpg').take(300)
positive = tf.data.Dataset.list_files(f'{POS_PATH}\*.jpg').take(300)

In [None]:
# To show what's contained inside those three variables
# showAnchor = anchor.as_numpy_iterator()
# showAnchor.next()

### Preprocessing - Scale and Resize

1. Read JPEG picture file as bytes
2. Decode the bytes in as JPEG
3. Resize the image/data into 100x100
4. Scale the data into between 0 and 1.

In [None]:
def preprocess(file_path):
    byte_img = tf.io.read_file(file_path) # read the file's data as bytes
    img = tf.io.decode_jpeg(byte_img) # decode the bytes as JPEG and store into img variable
    img = tf.image.resize(img, (100,100)) # resize the img into 100x100
    img /= 255.0 # Without this step the image will be super bright. Try it.
    return img


In [None]:
# Demonstrate the 'preprocess' function, and show the resultant image
# img = preprocess('data\\anchor\\35fe2f84-a2fd-11ec-922d-ace2d36277c6.jpg')
# plt.imshow(img)

### Create labelled dataset

In [None]:
# (anchor, positive) => 1,1,1,1,1
# (anchor, negative) => 0,0,0,0,0

positives = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor))))) # All 1's, as anchor matches the positive
negatives = tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor))))) # All 0's, as anchor matches the negative
data = positives.concatenate(negatives) # joining positive and negative together

* The tf.data.zip() function is used for creating a dataset by zipping together a dict, array, or nested structure of Dataset.
* All these three variables are tuples with 3 elements, namely (input image, comparison image, label - 1/0)

In [None]:
# To show what's contained inside those three variables
# Positives
# showPositives = positives.as_numpy_iterator()
# showPositives.next()

# Negatives
# showNegatives = negatives.as_numpy_iterator()
# showNegatives.next()

# Positives
showData = data.as_numpy_iterator()
example = showData.next()
showData.next()

### Build train-test partition

In [None]:
def preprocess_twin(input_img, validation_img, label):
    return (preprocess(input_img), preprocess(validation_img), label)

# Sample input tuple for the function: 
# (b'data\\anchor\\3c539b2a-a2fd-11ec-9a85-ace2d36277c6.jpg',   <- Innput
#  b'data\\positive\\d56b4449-a2fd-11ec-b5f5-ace2d36277c6.jpg', <- Comparison
#  1.0)                                                         <- Result

In [None]:
result = preprocess_twin(*example) # * = unpack the tuple, so we dont have to type each input arguments one by one ourselves

# result = (preprocessed input image, preprocessed comparison image, label - 1 as correct, 0 as wrong)
print(type(result))

f, axarr = plt.subplots(2,1)
axarr[0].imshow(result[0])
axarr[1].imshow(result[1])
print(result[2])

### Build dataloader pipeline

1. pass all the stuff in 'data' into the 'preprocess_twin' function conveniently using the '.map' method.
2. store the consequent outputs in cache
3. shuffle the positives and negatives for effective training
4. done preparing training data

In [None]:
data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size=1024)


* Now the length of 'data' is 600, as it contains both positives and negatives, each at 300. 

In [None]:
# display training data individually/one by one

print(type(data))

ddd = data.as_numpy_iterator()
print(type(ddd))

dddd = ddd.next()

f, axarr = plt.subplots(2,1)

axarr[0].imshow(dddd[0])
axarr[1].imshow(dddd[1])
print(dddd[2])

* Python's map() is a built-in function that allows you to process and transform all the items in an iterable without using an explicit for loop, a technique commonly known as mapping. map() is useful when you need to apply a transformation function to each item in an iterable and transform them into a new iterable.
* Function caching allows us to cache the return values of a function depending on the arguments. It can save time when an I/O bound function is periodically called with the same arguments. Before Python 3.2 we had to write a custom implementation.

`data`</br>
<ShuffleDataset element_spec=(TensorSpec(shape=(100, 100, None), dtype=tf.float32, name=None), TensorSpec(shape=(100, 100, None), dtype=tf.float32, name=None), TensorSpec(shape=(), dtype=tf.float32, name=None))>

In [None]:
# Training partition

train_data = data.take(round(len(data) * 0.7)) # Round off the 70% of the length of 'data'
train_data = train_data.batch(16) # Make batches of 16
train_data = train_data.prefetch(8) # Start preprocessing the next set of images

`train_data`</br>
<PrefetchDataset element_spec=(TensorSpec(shape=(None, 100, 100, None), dtype=tf.float32, name=None), TensorSpec(shape=(None, 100, 100, None), dtype=tf.float32, name=None), TensorSpec(shape=(None,), dtype=tf.float32, name=None))>

* One extra dimension is created due to the _batch()_ function.

In [None]:
# Testing partition

test_data = data.skip(round(len(data) * 0.7)) # Avoid taking the train_data
test_data = test_data.take(round(len(data) * 0.3)) # Take the rest 30% of 'data'
test_data = test_data.batch(16) # Make batches of 16
test_data = test_data.prefetch(8) # Start preprocessing the next set of images

# Model Engineering

https://www.cs.cmu.edu/~rsalakhu/papers/oneshot1.pdf

### Build embedding layer

In [None]:
def make_embedding():
    inp = Input(shape=(100,100,3), name='input_image')

    # 1st block
    c1 = Conv2D(64, (10,10), activation='relu')(inp)
    m1 = MaxPooling2D(64, (2,2), padding='same')(c1)

    # 2nd block
    c2 = Conv2D(128, (7,7), activation='relu')(m1)
    m2 = MaxPooling2D(64, (2,2), padding='same')(c2)
    
    # 3rd block
    c3 = Conv2D(128, (4,4), activation='relu')(m2)
    m3 = MaxPooling2D(64, (2,2), padding='same')(c3)

    # Final embedding block
    c4 = Conv2D(256, (4,4), activation='relu')(m3)
    f1 = Flatten()(c4)
    d1 = Dense(4096, activation='sigmoid')(f1)

    return Model(inputs=[inp], outputs=[d1], name='embedding') # Compile the model

In [None]:
embedding = make_embedding()
embedding.summary()

* Embedding = the Siamese Neural Network structure up until the Feature Vector 4096 without the L1 siamese dist.
* Its input = image data with dimension 100x100x3
* Its output = vector with length 4096
* This embedding is just a structure for data to be passed through and processed. No data has been given to it yet.
</br></br>*refer Model.png*

### Build distance layer

In [None]:
class L1Dist(Layer): # from tensorflow.keras.layers import Layer

    # init method - inheritance
    def __init__(self, **kwargs):
        super().__init__()
    
    # Combine the two rivers
    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding)

* So now this is the L1 siamese dist part in the pipeline.
</br></br>
*refer Model.png*

### Make Siamese model

1. input/validation image = a data/tensor with dimension 100x100x3
2. inp/val embedding = a flat data/tensor with length 4096

*\*\*Everything from this point on until the 'def make_siamese_model():' is for the purpose of explaining what does the function do. </br>Please comment them out unless you want to run them one by one and check the outputs.*

In [None]:
# fetch the image as input data
input_image = Input(name='input_img', shape=(100, 100, 3))
validation_image = Input(name='validation_img', shape=(100, 100, 3))

# Pass those image data into the embedding that we constructed before
inp_embedding = embedding(input_image)
val_embedding = embedding(validation_image)

`input_image` & `validation_image`
</br></br>
<KerasTensor: shape=(None, 100, 100, 3) dtype=float32 (created by layer 'input_img')></br>
<KerasTensor: shape=(None, 100, 100, 3) dtype=float32 (created by layer 'validation_img')>

*- After passing through the embedding (pipeline):*

`inp_embedding` & `val_embedding`
</br></br>
<KerasTensor: shape=(None, 4096) dtype=float32 (created by layer 'embedding')></br>
<KerasTensor: shape=(None, 4096) dtype=float32 (created by layer 'embedding')>

*refer Model.png*

In [None]:
siamese_layer = L1Dist()
distances = siamese_layer(inp_embedding, val_embedding)

* Siamese layer = input embedding - validation embedding
* A minus between 2 vectors of length 4096 is performed to obtain the absolute difference. 

`distances`
</br>
<KerasTensor: shape=(None, 4096) dtype=float32 (created by layer 'l1_dist_1')>

* The siamese layer is where the comparison between the input and validation images happens.
* The class 'L1Dist' is passed into a variable/placehoder named 'siamese_layer'
* And then 'siamese_layer' takes in 2 inputs, namely the input embedding and validation embedding, which are the two rivers, 2 streams of neural network pipelines that process the input image (anchor) and the validation image (positive/negative)

In [None]:
classifier = Dense(1, activation='sigmoid')(distances)

`classifier`
</br>
<KerasTensor: shape=(None, 1) dtype=float32 (created by layer 'dense_5')>

* After making the comparison, the result is passed into the final layer - the output, named 'classifier', which is has a shape of only 1x1, as the answer is just yes/no. 

`siameseNetwork`</br>
run it and you will see the full SNN model that is built.

In [None]:
siameseNetwork = Model(inputs=[input_image, validation_image], outputs=classifier, name='SiameseNetwork')
siameseNetwork.summary()

In [None]:
def make_siamese_model():

    # Anchor image input
    input_image = Input(name='input_img', shape=(100, 100, 3))

    # Validation image (comparison)
    validation_image = Input(name='validation_img', shape=(100, 100, 3))

    # Combine siamese distance components
    siamese_layer = L1Dist()
    siamese_layer._name = 'distance'
    distances = siamese_layer(embedding(input_image), embedding(validation_image))

    # Classification layer (check - are they similar?)
    classifier = Dense(1, activation='sigmoid')(distances) # refer Model.png

    return Model(inputs=[input_image, validation_image], outputs=classifier, name='SiameseNetwork')

In [None]:
snn_model = make_siamese_model()
snn_model.summary()

*Recap:*
</br> 
* Before this, we created the model up until the Feature vector and named it `'embedding'`.
* Then we **create a new layer** called *`'siamese_layer'`* where the streams of input and validation meets and are compared.
* The comparison result is stored in the variable 'distances'.
* The comparison result in 'distances' is passed into a **new layer**, which is the *`output layer`*, and is given the name 'classifier'.
* The output layer has dimension of one single value (1x1), because we want a YES/NO answer after all.
* Now, the full siamese neural network model is complete, with 3 parts that we just joined:
    - Embeddings: Convolution-ReLU-MaxPooling
    - Siamese layer: for comparison, where minus is done
    - Output layer: Single value output, yes/no

*refer Model.png*

# Training

### Setup loss and optimiser

In [None]:
# Define loss
binary_cross_loss = tf.losses.BinaryCrossentropy()

# Define optimiser
opt = tf.keras.optimizers.Adam(1e-4) # learning rate = 0.0004

### Establish checkpoints

In [None]:
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(opt=opt, siamese_model=snn_model)

### Build train step function
* train_step() function is focused on training for one batch.
* Hence, a training loop is needed afterwards to iterate over every batch in the dataset.

In [None]:
@tf.function
def train_step(batch):

    # Record all of our operations
    with tf.GradientTape() as tape:
        # Get anchor and positive/negative image
        X = batch[:2]

        # Get label
        y = batch[2]

        # Forward pass
        yhat = snn_model(X, training=True)

        # Calculate loss
        loss = binary_cross_loss(y, yhat) # declared/defined under 'Setup loss and optimiser'

    print(loss)

    # Calculate gradients
    grad = tape.gradient(loss, snn_model.trainable_variables)

    # Calculate updated weights and apply to the siamese model
    opt.apply_gradients(zip(grad, snn_model.trainable_variables))

    return loss

* @tf.function = Compiles a function into a callable TensorFlow graph. (deprecated arguments)
</br>https://www.tensorflow.org/api_docs/python/tf/function</br>
* The test_data now consists 3 parts:
    1. A batch of 16 pieces of anchor images
    2. A batch of 16 pieces of positive/negative images
    3. 16 labels of whether it is 1/0, meaning correct/wrong
* The 'batch' input of the function is where the train_data is passed into.
* Dimension/Shape of X = (2, 16, 100, 100, 3).
    - 2: Anchor image & Positive/Negative image
    - 16: Batch of 16 pieces
    - (100, 100): Dimension/Size/Resolution of each image, namely 100x100
    - 3: 3 color channels images, namely RGB.
* y = actual labels
* yhat = predicted y value

### Build training loop

In [None]:
def train(data, EPOCHS): 
    # Loop through the epochs
    for epoch in range(1, EPOCHS+1):
        print('\n Epoch {}/{}'.format(epoch, EPOCHS))
        progbar = tf.keras.utils.Progbar(len(data))

        # Loop through each batch
        for idx, batch in enumerate(data):
            # Run train step here
            train_step(batch)
            progbar.update(idx+1)

        # Save checkpoints
        if epoch % 10 == 0:
            checkpoint.save(file_prefix=checkpoint_prefix) # Both 'checkpoint' and 'checkpoint_prefix' are defined under 'Establish checkpoints' above

* idx = index
* For this 'train()' function, pass in 'train_data' as _data_, and number of epochs as _EPOCHS_. 

### Train the model

In [None]:
EPOCHS = 50
train(train_data, EPOCHS)

In [None]:
tf.train.load_checkpoint(
    'training_checkpoints'
)

# Evaluate Model

### Import metrics

In [None]:
# Import metric calculations
from tensorflow.keras.metrics import Precision, Recall

# Unpack the 3 components of test_data into individual parts
test_input, test_val, y_true = test_data.as_numpy_iterator().next()

* test_input = what we are going to grab form our webcam. Length = 16, as one batch has 16 members.
* test_val = positive/negative. Length is also 16.
* y_true = the correct labels

### Make predictions

In [None]:
y_hat = snn_model.predict([test_input, test_val])

# Post-process the results
[1 if prediction >= 0.5 else 0 for prediction in y_hat ]

* A simple, non-numpy array is made containing the 16 prediction results from the batch.

In [None]:
y_true

### Calculate the metrics

In [None]:
# Create a metric object
m = Recall()

# Calculating the recall value
m.update_state(y_true, y_hat)

# Return Recall Result 
m.result().numpy()

* 1.0 means 100% accurate.

In [None]:
# Create a metric object
m = Precision()

# Calculating the recall value
m.update_state(y_true, y_hat)

# Return Recall Result 
m.result().numpy()

### Visualise the results

In [None]:
plt.figure(figsize=(10,10))
plt.subplot(1,2,1)
plt.imshow(test_input[2]) # can take from index 0-15
plt.subplot(1,2,2)
plt.imshow(test_val[2])

# Save Model

In [None]:
snn_model.save('snn_model')

In [None]:
snn_model.save('snn_model.h5')

In [None]:
model = tf.keras.models.load_model('snn_model.h5', custom_objects={'L1Dist':L1Dist, 'BinaryCrossentropy':tf.losses.BinaryCrossentropy})

In [None]:
model = tf.keras.models.load_model('snn_model', custom_objects={'L1Dist':L1Dist, 'BinaryCrossentropy':tf.losses.BinaryCrossentropy})

In [None]:
np.round(model.predict([test_input, test_val]))

In [None]:
model.summary()

# Real-Time Test

### Verification function

In [None]:
def verify(model, detection_threshold, verification_threshold):
    results = []
    for image in os.listdir(os.path.join('application_data','verification_images')): # loop through every image in the 'verification_images' folder
        input_img = preprocess(os.path.join('application_data', 'input_image', 'input_image.jpg')) # snap one image from the webcam as save it as 'input_image.jpg' inside that directory
        verification_img = preprocess(os.path.join('application_data', 'verification_images', image))
        results.append(model.predict(list(np.expand_dims([input_img, verification_img], axis=1)))) # compare the input image with the verification images
        
    # Sum up all the results that exceeds the detection threshold
    detection = np.sum(np.array(results) > detection_threshold)

    # Proportion of verification
    verification = detection / len(os.listdir(os.path.join('application_data', 'verification_images')))
    
    # if proportion of verification > verification threshold, then verified = True
    verified = verification > verification_threshold

    return results, verified

* frame = input image
* model = the SNN model
* detection_threshold = metric in which a prediction is considered positive
* verification_threshold = the proportion of ( positive predictions / total positive samples )

### OpenCV real-time verification

In [None]:
from tkinter import messagebox

In [None]:
cap = cv2.VideoCapture(2)

while cap.isOpened():
    ret, frame = cap.read()

    frame = frame[130:250+130, 200:200+250, :]

    cv2.imshow('Verification', frame)

    # Verification trigger
    if cv2.waitKey(10) & 0xFF == ord('v'):
        # save the input image into the 'application_data\input_image' folder
        cv2.imwrite(os.path.join('application_data', 'input_image', 'input_image.jpg'), frame)

        # verification function
        results, verified = verify(model, 0.7, 0.7)
        if verified:
            messagebox.showinfo('Message', 'verification SUCCESS!')
            # print('verification SUCCESS!')
        else:
            messagebox.showinfo('Message', 'verification FAILED')
            # print('verification FAILED')

    if cv2.waitKey(10) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()

In [None]:
cap.release()
cv2.destroyAllWindows()