## **Importing Dependencies**

In [51]:
# Standrad dependencies
import cv2
import os
import random
import uuid
import numpy as np
import matplotlib.pyplot as plt

In [52]:
# Tensorflow
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, Dense, MaxPooling2D, Flatten, Layer
import tensorflow as tf

## **Creating Folder Structures**

In [53]:
pos_path = os.path.join('data', 'positive')
neg_path = os.path.join('data', 'negative')
anc_path = os.path.join('data', 'anchor')

In [None]:
os.makedirs(pos_path)
os.makedirs(neg_path)
os.makedirs(anc_path)

## **Collecting Data**

### **Negative class**

In [None]:
# Moving the lfw images into negative directory
for directory in os.listdir('lfw'):
    for file in os.listdir(os.path.join('lfw', directory)):
        source_path = os.path.join('lfw', directory, file)
        dest_path = os.path.join(neg_path, file)
        os.replace(source_path, dest_path)

### **Positive and Anchor class**

In [None]:
cap = cv2.VideoCapture(0)
while cap.isOpened():
    success, frame = cap.read()

    # Cropping frames
    frame = frame[130:130+250, 150:150+250, :]

    # Collecting anchors
    if cv2.waitKey(1) & 0XFF == ord('a'):
        img_name = os.path.join(anc_path, f'{uuid.uuid1()}.jpg')
        cv2.imwrite(img_name, frame)

    # Collecting positives
    if cv2.waitKey(1) & 0XFF == ord('p'):
        img_name = os.path.join(pos_path, f'{uuid.uuid1()}.jpg')
        cv2.imwrite(img_name, frame)

    cv2.imshow('Image Collection', frame)
    
    if cv2.waitKey(1) & 0XFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()

## **Loading & Preprocessing**

### **Loading Image Directories**

In [54]:
anchor = tf.data.Dataset.list_files(anc_path + '\\*.jpg').take(300)
positive = tf.data.Dataset.list_files(pos_path + '\\*.jpg').take(300)
negative = tf.data.Dataset.list_files(neg_path + '\\*.jpg').take(300)

### **Preprocessing**

In [55]:
def preprocess(file_path):
    # Read in image from file path
    byte_img = tf.io.read_file(file_path)

    # Load in the image
    img = tf.io.decode_jpeg(byte_img) 
    
    # Resizing
    img = tf.image.resize(img, (105, 105))
    
    # Scaling
    img = img / 255.0
    
    return img

In [56]:
def preprocess_dataset(input_img, validation_img, label):
    return (preprocess(input_img), preprocess(validation_img), label)

### **Creating Labelled Dataset**

In [57]:
positives = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data = positives.concatenate(negatives)

### **Train-Test Split**

In [58]:
# Building dataloader pipeline
data = data.map(preprocess_dataset)
data = data.cache()
data = data.shuffle(buffer_size = 1024) # Shuffling the dataset

In [59]:
# Training partition
train_data = data.take(round(len(data) * 0.7)) # 70% as training
train_data = train_data.batch(16) # Batch size
train_data = train_data.prefetch(8) # Prefetches the next 8 when processing the previous batch

In [60]:
# Testing partition
test_data = data.skip(round(len(data) * 0.7))
test_data = test_data.take(round(len(data) * 0.3))
test_data = test_data.batch(16) # Batch size
test_data = test_data.prefetch(8) # Prefetches the next 8 when processing the previous batch

## **Building Model**

### **Embedding Model**

In [61]:
def make_embedding(): 
    input = Input(shape = (105, 105, 3), name = 'input_image')
    
    # First block
    conv1 = Conv2D(64, (10, 10), activation = 'relu')(input)
    maxp1 = MaxPooling2D(64, (2, 2), padding = 'same')(conv1)
    
    # Second block
    conv2 = Conv2D(128, (7, 7), activation = 'relu')(maxp1)
    maxp2 = MaxPooling2D(64, (2, 2), padding = 'same')(conv2)
    
    # Third block 
    conv3 = Conv2D(128, (4, 4), activation = 'relu')(maxp2)
    maxp3 = MaxPooling2D(64, (2, 2), padding = 'same')(conv3)
    
    # Final embedding block
    conv4 = Conv2D(256, (4, 4), activation = 'relu')(maxp3)
    flatten = Flatten()(conv4)
    embedding = Dense(4096, activation = 'sigmoid')(flatten)
    
    return Model(inputs = [input], outputs = embedding, name = 'embedding')

In [62]:
make_embedding().summary()

### **Distance Layers**

In [63]:
class Dist(Layer):
    def __init__(self, **kwargs):
        super().__init__()
       
    # Similarity Calculation
    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding)

### **Siamese Model**

In [64]:
def make_siamese_model(): 
    # Anchor & Validation image input in the network
    input_image = Input(name = 'input_img', shape = (105, 105, 3)) 
    validation_image = Input(name = 'validation_img', shape = (105, 105, 3))

    # Embedding Layers
    embedding = make_embedding()
    input_embeddings = embedding(input_image)
    validation_embeddings = embedding(validation_image)
    
    # Distance Layer
    distance_layer = Dist()
    distance_layer.name = 'distance'
    distances = distance_layer(input_embeddings, validation_embeddings)
    
    # Classification Layer
    classifier = Dense(1, activation = 'sigmoid')(distances)
    
    return Model(inputs = [input_image, validation_image], outputs = classifier, name = 'SiameseNetwork')

In [65]:
siamese_model = make_siamese_model()

In [66]:
siamese_model.summary()

## **Model Training**

### **Setting Loss function and Optimizer**

In [67]:
binary_cross_loss = tf.losses.BinaryCrossentropy()

In [68]:
opt = tf.keras.optimizers.Adam(1e-4) # 0.0001

### **Setting up checkpoints**

In [69]:
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(opt = [opt], siamese_model = siamese_model)

### **Building up the train step function**

In [70]:
@tf.function
def train_step(batch):
    
    # Record all of our operations 
    with tf.GradientTape() as tape:
        # Get anchor and positive/negative image
        X = batch[:2]
        # Get label
        y = batch[2]
        
        # Forward pass
        yhat = siamese_model(X, training = True)
        # Calculate loss
        loss = binary_cross_loss(y, yhat)
    print(loss)
        
    # Calculate gradients
    grad = tape.gradient(loss, siamese_model.trainable_variables)
    
    # Calculate updated weights and apply to siamese model
    opt.apply_gradients(zip(grad, siamese_model.trainable_variables))
    
    # Return loss
    return loss

### **Training Function**

In [71]:
def train(data, EPOCHS):
    # Loop through epochs
    for epoch in range(1, EPOCHS+1):
        print('\n Epoch {}/{}'.format(epoch, EPOCHS))
        progbar = tf.keras.utils.Progbar(len(data))
        
        # Loop through each batch
        for idx, batch in enumerate(data):
            # Run train step here
            train_step(batch)
            progbar.update(idx + 1)
        
        # Save checkpoints
        if epoch % 10 == 0: 
            checkpoint.save(file_prefix = checkpoint_prefix)

### **Model Training**

In [72]:
EPOCHS = 50

In [73]:
train(train_data, EPOCHS)


 Epoch 1/50
Tensor("binary_crossentropy/truediv:0", shape=(), dtype=float32)
Tensor("binary_crossentropy/truediv:0", shape=(), dtype=float32)
[1m26/27[0m [32m━━━━━━━━━━━━━━━━━━━[0m[37m━[0m [1m1s[0m 2s/stepTensor("binary_crossentropy/truediv:0", shape=(), dtype=float32)
[1m27/27[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m49s[0m 2s/step

 Epoch 2/50
[1m27/27[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m44s[0m 2s/step

 Epoch 3/50
[1m27/27[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m44s[0m 2s/step

 Epoch 4/50
[1m27/27[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m45s[0m 2s/step

 Epoch 5/50
[1m27/27[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m50s[0m 2s/step

 Epoch 6/50
[1m27/27[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m51s[0m 2s/step

 Epoch 7/50
[1m27/27[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m54s[0m 2s/step

 Epoch 8/50
[1m27/27[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m53s[0m 2s/step

 Epoch 9/50
[1m27/27[0m [32m━━━━━━━━━━━━━━

## **Model Evaluation**

In [74]:
from tensorflow.keras.metrics import Precision, Recall

In [75]:
r = Recall()
p = Precision()

for test_input, test_val, y_true in test_data.as_numpy_iterator():
    y_pred = siamese_model.predict([test_input, test_val], verbose = 0)
    r.update_state(y_true, y_pred)
    p.update_state(y_true, y_pred)

In [76]:
print(f"Precision = {p.result().numpy()}, Recall = {r.result().numpy()}")

Precision = 1.0, Recall = 1.0


## **Saving the Model**

In [79]:
siamese_model.save('siamesemodel.keras')

## **Loading the Model**

In [81]:
siamese_model = tf.keras.models.load_model('siamesemodel.keras',
                                   custom_objects = {'Dist': Dist, 'BinaryCrossentropy': tf.losses.BinaryCrossentropy})

In [82]:
siamese_model.summary()

## **Real-Time Test**

### **Verfication Function**

In [83]:
def verify(model, detection_threshold, verification_threshold):
    # Build results array
    results = []
    for image in os.listdir(os.path.join('application_data', 'verification_images')):
        input_img = preprocess(os.path.join('application_data', 'input_image', 'input_image.jpg'))
        validation_img = preprocess(os.path.join('application_data', 'verification_images', image))
        
        # Make Predictions 
        result = model.predict(list(np.expand_dims([input_img, validation_img], axis=1)), verbose = 0)
        results.append(result)
    
    # Detection Threshold: Metric above which a prediciton is considered positive 
    detection = np.sum(np.array(results) > detection_threshold)
    
    # Verification Threshold: Proportion of positive predictions / total positive samples 
    verification = detection / len(os.listdir(os.path.join('application_data', 'verification_images'))) 
    verified = verification > verification_threshold
    
    return results, verified

### **Collecting Verification Images**

In [84]:
ver_path = os.path.join('application_data', 'verification_images')

In [15]:
cap = cv2.VideoCapture(0)
while cap.isOpened():
    success, frame = cap.read()

    # Cropping frames
    frame = frame[130:130+250, 150:150+250, :]

    # Collecting verification images
    if cv2.waitKey(1) & 0XFF == ord('p'):
        img_name = os.path.join(ver_path, f'{uuid.uuid1()}.jpg')
        cv2.imwrite(img_name, frame)

    cv2.imshow('Image Collection', frame)
    
    if cv2.waitKey(1) & 0XFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()

### **Real Time Verification**

In [89]:
cap = cv2.VideoCapture(0)
while cap.isOpened():
    ret, frame = cap.read()

    frame = frame[130:130+250, 150:150+250, :]
    cv2.imshow('Verification', frame)
    
    # Verification trigger
    if cv2.waitKey(10) & 0xFF == ord('v'):
        cv2.imwrite(os.path.join('application_data', 'input_image', 'input_image.jpg'), frame)
        # Run verification
        results, verified = verify(siamese_model, 0.5, 0.7)
        print(verified)
    
    if cv2.waitKey(10) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()

True
True
True


In [90]:
np.sum(np.squeeze(results) > 0.5)

49