### 1. Setup

#### 1.1 Install Dependencies!

In [3]:
!pip install tensorflow opencv-python matplotlib



#### 1.2 Import dependencies

In [1]:
# import dependencies
import cv2
import os
import random
import numpy as np
from matplotlib import pyplot as plt

In [2]:
#import tensorflow dependencies - Functional API

from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten
import tensorflow as tf

#### 1.3 Create Folder Structures

In [3]:
# setup paths
POS_PATH = os.path.join('data', 'positive') #data to be verified
NEG_PATH = os.path.join('data', 'negative') #data to be verified
ANC_PATH = os.path.join('data', 'anchor') #input data

In [10]:
os.makedirs(POS_PATH)
os.makedirs(NEG_PATH)
os.makedirs(ANC_PATH)

### 2. Collect Positives and Anchors

#### 2.1 Untar Labelled Faces in the Wild Dataset 

In [13]:
# Move LFW images to the following repository data/negative

for directory in os.listdir('lfw'):
    directory_path = os.path.join('lfw', directory)
    if os.path.isdir(directory_path):
        for file in os.listdir(directory_path):
            EX_PATH = os.path.join('lfw', directory, file)
            NEW_PATH = os.path.join(NEG_PATH, file)
            os.replace(EX_PATH,NEW_PATH)

#### 2.2 Collect Positive and Anchor Classes

In [4]:
# import uuid library to generate unique image names
import uuid

In [5]:
cap = cv2.VideoCapture(1)
while cap.isOpened():
    ret, frame = cap.read()
    
    #  image resizing
    frame = frame[550: 550+250, 900: 900+250, :]

    # collect image for anchors
    if cv2.waitKey(1) & 0XFF == ord('a'):
        imgname = os.path.join(ANC_PATH, '{}.jpg'.format(uuid.uuid1()))
        cv2.waitKey(1)
        cv2.imwrite(imgname, frame)
        cv2.waitKey(1)

    # collect image for positives
    if cv2.waitKey(1) & 0XFF == ord('p'):
        imgname = os.path.join(POS_PATH, '{}.jpg'.format(uuid.uuid1()))
        cv2.waitKey(1)
        cv2.imwrite(imgname, frame)
        cv2.waitKey(1)
    
    #show image back to screen
    cv2.imshow('Image Collection', frame)

    if cv2.waitKey(1) & 0XFF == ord('q'):
        break
# Release the webcam
cap.release()
# Close the image show frame
cv2.destroyAllWindows()
# need to have waitkey again after destroying all windows
cv2.waitKey(1)



-1

### 3. Load and Preprocess images

#### 3.1 Get Image Directories

In [5]:
# preparing for pre-processing pipeline

# change / to \ if using windows
anchor = tf.data.Dataset.list_files(ANC_PATH+'/*.jpg').take(300)
positive = tf.data.Dataset.list_files(POS_PATH+'/*.jpg').take(300)
negative = tf.data.Dataset.list_files(NEG_PATH+'/*.jpg').take(300)

#### 3.2 Preprocessing - Scale and Resize

In [6]:
def preprocess(file_path):
    # read in image from filepath
    byte_img = tf.io.read_file(file_path)
    # load in the image
    img = tf.io.decode_jpeg(byte_img)
    # preprocessing: resize acc to Siamese network paper
    img = tf.image.resize(img, (100,100))
    # Scale iage to be between 0 and 1
    img = img / 255.0
    return img

#### 3.3 Create Lablelled Dataset

In [7]:
positives = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data = positives.concatenate(negatives)

#### 3.4 Build Train and Test Partition

In [8]:
def preprocess_twin(input_img, validation_img, label):
    return (preprocess(input_img), preprocess(validation_img), label)

In [10]:
# Build dataloader pipelin
data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size=1024)

In [11]:
# Training partition

train_data = data.take(round(len(data)*0.7))
train_data = train_data.batch(16) # batches of 16
train_data = train_data.prefetch(8) #starts preprocessing the next set of data

In [12]:
# for understanding purposes

train_samples = train_data.as_numpy_iterator()

In [13]:
train_sample = train_samples.next()

In [14]:
len(train_sample[0]) #number of images in one batch

16

In [15]:
# Testing partition

test_data = data.skip(round(len(data)*0.7))
test_data = test_data.take(round(len(data)*0.3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

### 4. Model Engineering

- Build an embedding layer
- Create an L1 Distance layer
- Compile the Siamese Network

#### 4.1 Build Embedding Layer

In [16]:
def make_embedding():
    inp = Input(shape=(100,100,3), name='input_image')

    # First block
    c1 = Conv2D(64, (10,10), activation='relu')(inp)
    m1 = MaxPooling2D(64,(2,2), padding='same')(c1)

    # Second block
    c2 = Conv2D(128, (7,7), activation='relu')(m1)
    m2 = MaxPooling2D(64,(2,2), padding='same')(c2)

    # Third block
    c3 = Conv2D(128, (4,4), activation='relu')(m2)
    m3 = MaxPooling2D(64,(2,2), padding='same')(c3)

    # Final block
    c4 = Conv2D(256, (4,4), activation='relu')(m3)
    f1 = Flatten()(c4)
    d1 = Dense(4096, activation='sigmoid')(f1)
    
    
    return Model(inputs=[inp], outputs=[d1], name='embedding')

In [17]:
embedding = make_embedding()

In [18]:
embedding.summary()

Model: "embedding"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_image (InputLayer)    [(None, 100, 100, 3)]     0         
                                                                 
 conv2d (Conv2D)             (None, 91, 91, 64)        19264     
                                                                 
 max_pooling2d (MaxPooling2  (None, 46, 46, 64)        0         
 D)                                                              
                                                                 
 conv2d_1 (Conv2D)           (None, 40, 40, 128)       401536    
                                                                 
 max_pooling2d_1 (MaxPoolin  (None, 20, 20, 128)       0         
 g2D)                                                            
                                                                 
 conv2d_2 (Conv2D)           (None, 17, 17, 128)       26

#### 4.2 Build Distance Layer

In [19]:
# Siamese L1 Distance Class

class L1Dist(Layer):

    # Init Method - Inheritance
    def __init__(self, **kwargs):
        super().__init__()

    # Similarity Calculation
    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding)
        

In [20]:
l1 = L1Dist()

In [21]:
l1

<__main__.L1Dist at 0x16bad0d90>

#### 4.3 Make Siamese Model

In [22]:
def make_siamese_model():

    # Anchor image
    input_image = Input(name='input_img', shape=(100,100,3))

    # Validation image
    validation_image = Input(name='validation_img', shape=(100,100,3))

    # Combine siamese distance components
    siamese_layer = L1Dist()
    siamese_layer._name = 'distance'
    distances = siamese_layer(embedding(input_image), embedding(validation_image))

    # Classification Layer
    classifier = Dense(1, activation='sigmoid')(distances)

    return Model(inputs=[input_image, validation_image], outputs=classifier, name='SiameseNetwork')

In [23]:
siamese_model = make_siamese_model()

In [24]:
siamese_model.summary()

Model: "SiameseNetwork"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_img (InputLayer)      [(None, 100, 100, 3)]        0         []                            
                                                                                                  
 validation_img (InputLayer  [(None, 100, 100, 3)]        0         []                            
 )                                                                                                
                                                                                                  
 embedding (Functional)      (None, 4096)                 3896044   ['input_img[0][0]',           
                                                          8          'validation_img[0][0]']      
                                                                                     

### 5. Training

#### 5.1 Setup Loss and Optimizer

In [25]:
binary_cross_loss = tf.losses.BinaryCrossentropy()

In [26]:
# At this time, the v2.11+ optimizer `tf.keras.optimizers.Adam` runs slowly on M1/M2 Macs, 
# please use the legacy Keras optimizer instead, located at `tf.keras.optimizers.legacy.Adam`
opt = tf.keras.optimizers.legacy.Adam(1e-4)

#### 5.2 Establish Checkpoints

In [29]:
checkpoint_dir = './training_checkpoints' 
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(opt=opt, siamese_model=siamese_model)

#### 5.3 Build Train Step Function

Basic Flow for training on one batch is as follows:
1. Make a prediction
2. Calculate loss
3. Derive gradients
4. Calculate new weights and apply

In [27]:
@tf.function # wrapping the function inside the tf.function decorator
def train_step(batch):
    
    with tf.GradientTape() as tape: #helps to capture gradients from our NN
        # Get Anchor and positive/negative image
        X = batch[:2]
        # Get label
        y = batch[2]

        # Forward pass
        yhat = siamese_model(X, training=True)
        # Calculate binary loss
        loss = binary_cross_loss(y, yhat)
    print(loss)

    # Calculate gradients
    grad = tape.gradient(loss, siamese_model.trainable_variables)

    # Calculate updated weights and apply to siamese model
    opt.apply_gradients(zip(grad, siamese_model.trainable_variables))
    return loss
        

#### 5.4 Build Training Loop

While the train_step function was focused on training for one batch, the loop here will be used to iterate over every batch in the dataset. 

In [60]:
def train(data, EPOCHS):
    # Loop through epochs
    for epoch in range(1, EPOCHS + 1):
        print('\n Epoch {}/{}'.format(epoch, EPOCHS))
        progbar = tf.keras.utils.Progbar(len(train_data))

        # Loop through each batch
        for idx, batch in enumerate(train_data):
            # Run train step here
            train_step(batch)
            progbar.update(idx + 1)

        # save checkpoints
        if epoch % 10 == 0:
            checkpoint.save(file_prefix=checkpoint_prefix)
    

#### 5.5 Train the model

In [61]:
EPOCHS = 50

In [62]:
train(train_data, EPOCHS)


 Epoch 1/50
Tensor("binary_crossentropy/weighted_loss/value:0", shape=(), dtype=float32)
Tensor("binary_crossentropy/weighted_loss/value:0", shape=(), dtype=float32)

 Epoch 2/50

 Epoch 3/50

 Epoch 4/50

 Epoch 5/50

 Epoch 6/50

 Epoch 7/50

 Epoch 8/50

 Epoch 9/50

 Epoch 10/50

 Epoch 11/50

 Epoch 12/50

 Epoch 13/50

 Epoch 14/50

 Epoch 15/50

 Epoch 16/50

 Epoch 17/50

 Epoch 18/50

 Epoch 19/50

 Epoch 20/50

 Epoch 21/50

 Epoch 22/50

 Epoch 23/50

 Epoch 24/50

 Epoch 25/50

 Epoch 26/50

 Epoch 27/50

 Epoch 28/50

 Epoch 29/50

 Epoch 30/50

 Epoch 31/50

 Epoch 32/50

 Epoch 33/50

 Epoch 34/50

 Epoch 35/50

 Epoch 36/50

 Epoch 37/50

 Epoch 38/50

 Epoch 39/50

 Epoch 40/50

 Epoch 41/50

 Epoch 42/50

 Epoch 43/50

 Epoch 44/50

 Epoch 45/50

 Epoch 46/50

 Epoch 47/50

 Epoch 48/50

 Epoch 49/50

 Epoch 50/50


### 6. Evaluate model

In [63]:
# import metric calculations
from tensorflow.keras.metrics import Precision, Recall

In [64]:
recall_metric = Recall()
precision_metric = Precision()

In [65]:
test_batches = list(test_data)

# Iterate over the entire test dataset
for test_input, test_val, y_true in test_batches:
    y_hat = siamese_model.predict([test_input, test_val])
    y_hat = [1 if prediction > 0.5 else 0 for prediction in y_hat]
    
    # Update the metric with the true labels and predicted labels
    recall_metric.update_state(y_true, y_hat)
    precision_metric.update_state(y_true, y_hat)




In [67]:
# Get the final recall result
recall_result = recall_metric.result().numpy()
recall_result

1.0

In [68]:
# Get the final precision result
precision_result = precision_metric.result().numpy()
precision_result

1.0

### 7. Save Model

In [71]:
# Save weights
# h5 legacy file format, using .keras as high level
siamese_model.save('siamesemodel.keras')

In [32]:
model = tf.keras.models.load_model('siamesemodel.h5', custom_objects={'L1Dist': L1Dist})



In [33]:
model.summary()

Model: "SiameseNetwork"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_img (InputLayer)      [(None, 100, 100, 3)]        0         []                            
                                                                                                  
 validation_img (InputLayer  [(None, 100, 100, 3)]        0         []                            
 )                                                                                                
                                                                                                  
 embedding (Functional)      (None, 4096)                 3896044   ['input_img[0][0]',           
                                                          8          'validation_img[0][0]']      
                                                                                     

### 8. Real Time Test

#### 8.1 Verification Function

In [34]:
 def verify(model, detection_threshold, verification_threshold):
     results = []
     for image in os.listdir(os.path.join('application_data', 'verification_images')):
         input_img = preprocess(os.path.join('application_data', 'input_image', 'input_image.jpg'))
         validation_img = preprocess(os.path.join('application_data', 'verification_images', image))
         
         result = model.predict(list(np.expand_dims([input_img, validation_img], axis = 1)), verbose=0)
         results.append(result)

     # Detection Threshold: Metric above which a prediction is considered positive
     detection = np.sum(np.array(results) > detection_threshold)
     
     # Verification Threshold: Proportion of positive predictions / total positive samples
     verification = detection / len(os.listdir(os.path.join('application_data', 'verification_images')))
     verified = verification > verification_threshold

     return results, verified

#### 8.2 OpenCV Real Time Verification

In [35]:
cap = cv2.VideoCapture(1)
while cap.isOpened():
    ret, frame = cap.read()

    #  image resizing
    frame = frame[550: 550+250, 900: 900+250, :]

    cv2.imshow('Verification', frame)

    # Verification trigger
    if cv2.waitKey(10) &  0XFF == ord('v'):
        # save input image to application_data/input_image folder
        cv2.imwrite(os.path.join('application_data', 'input_image', 'input_image.jpg'), frame)
        cv2.waitKey(1)
        results, verified = verify( model, 0.5,0.5)
        print(verified)

    if cv2.waitKey(10) &  0XFF == ord('q'):
        break

# Release the webcam
cap.release()
# Close the image show frame
cv2.destroyAllWindows()
# need to have waitkey again after destroying all windows
cv2.waitKey(1)



InvalidArgumentError: {{function_node __wrapped__DecodeJpeg_device_/job:localhost/replica:0/task:0/device:CPU:0}} Unknown image file format. One of JPEG, PNG, GIF, BMP required. [Op:DecodeJpeg]

In [36]:
# Release the webcam
cap.release()
# Close the image show frame
cv2.destroyAllWindows()
# need to have waitkey again after destroying all windows
cv2.waitKey(1)

-1

In [75]:
np.sum(np.squeeze(results) > 0.5)

36

In [50]:
43/50

0.86