### **Importing Necessary Libraries**

In [1]:
import cv2
import os
import random
import numpy as np
from matplotlib import pyplot as plt
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten
import tensorflow as tf
import uuid

### **Creating Floder Structure**

In [None]:
# Setup Paths
pos_path = os.path.join('Data', 'Positive')
neg_path = os.path.join('Data', 'Negative')
anc_path = os.path.join('Data', 'Anchor')

In [None]:
# Making the Directories
os.makedirs(pos_path)
os.makedirs(neg_path)
os.makedirs(anc_path)

### **Data Collection**

- #### **Negative Class**

In [None]:
# Moving lfw images to negative directory
for directory in os.listdir('lfw'):
    for file in os.listdir(os.path.join('lfw', directory)):
        source_path = os.path.join('lfw', directory, file)
        new_path = os.path.join(neg_path, file)
        os.replace(source_path, new_path)

- #### **Possitive and Anchor Class**

In [None]:
cap = cv2.VideoCapture(0)
while cap.isOpened():
    success, frame = cap.read()

    # Cropping frames
    frame = frame[90 : 90 + 250, 240 : 240 + 250, :]

    # Collecting anchor class
    if cv2.waitKey(1) & 0XFF == ord('a'):
        img_name = os.path.join(anc_path, f'{uuid.uuid1()}.jpg')
        cv2.imwrite(img_name, frame)


    # Collecting positive class
    if cv2.waitKey(1) & 0XFF == ord('p'):
        img_name = os.path.join(pos_path, f'{uuid.uuid1()}.jpg')
        cv2.imwrite(img_name, frame)


    cv2.imshow('Image Collection', frame)
    
    if cv2.waitKey(1) & 0XFF == ord('q'):
        break
    

cap.release()
cv2.destroyAllWindows()

### **Loading and Preprocessing**

- #### **Loading Image Direstories**

In [None]:
anchor = tf.data.Dataset.list_files(anc_path + '\*.jpg').take(300)
positive = tf.data.Dataset.list_files(pos_path + '\*.jpg').take(300)
negative = tf.data.Dataset.list_files(neg_path + '\*.jpg').take(300)

- #### **Preprocessing**

In [2]:
def preprocess(file_path):
    
    # Read in image from file path
    byte_img = tf.io.read_file(file_path)
    
    # Load in the image 
    img = tf.io.decode_jpeg(byte_img)
    
    # Resizing the image 
    img = tf.image.resize(img, (105, 105))
    # Scale the image
    img = img / 255.0

    # Return image
    return img

In [4]:
def preprocess_dataset(input_img, validation_img, label):
    return(preprocess(input_img), preprocess(validation_img), label)

- #### **Creating Labeled Dataset**

In [None]:
positives = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data = positives.concatenate(negatives)

- #### **Train Test Split**

In [None]:
# Building dataloader pipeline
data = data.map(preprocess_dataset)
data = data.cache()
data = data.shuffle(buffer_size = 1024)

In [None]:
# Training partition
train_data = data.take(round(len(data) * .7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

In [None]:
# Testing partition
test_data = data.skip(round(len(data) * .7))
test_data = test_data.take(round(len(data) * .3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

### **Model Building**

- #### **Building Embedding Layer**

In [None]:
def make_embedding(): 
    input = Input(shape = (105, 105, 3), name = 'input_image')
    
    # First block
    conv1 = Conv2D(64, (10, 10), activation = 'relu')(input)
    maxpool1 = MaxPooling2D(64, (2, 2), padding = 'same')(conv1)
    
    # Second block
    conv2 = Conv2D(128, (7, 7), activation = 'relu')(maxpool1)
    maxpool2 = MaxPooling2D(64, (2, 2), padding = 'same')(conv2)
    
    # Third block 
    conv3 = Conv2D(128, (4, 4), activation = 'relu')(maxpool2)
    maxpool3 = MaxPooling2D(64, (2, 2), padding = 'same')(conv3)
    
    # Final embedding block
    conv4 = Conv2D(256, (4, 4), activation = 'relu')(maxpool3)
    flatten = Flatten()(conv4)
    embedding = Dense(4096, activation = 'sigmoid')(flatten)
    
    
    return Model(inputs = [input], outputs = embedding, name = 'embedding')

In [None]:
make_embedding().summary()

- #### **Distance Layer**

In [3]:
class Dist(Layer):
    def __init__(self, **kwargs):
        super().__init__()
       
    # Similarity calculation
    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding)

- #### **Siamese Model**

In [None]:
def make_siamese_model(): 
    

    input_image = Input(name = 'input_img', shape = (105, 105, 3))
    
    validation_image = Input(name = 'validation_img', shape = (105, 105, 3))
    
    # Embedding layer
    embedding = make_embedding()
    input_embedding = embedding(input_image)
    validation_embedding = embedding(validation_image)


    # Distance components
    siamese_layer = Dist()
    siamese_layer._name = 'distance'
    distances = siamese_layer(input_embedding, validation_embedding)
    
    # Classification layer 
    classifier = Dense(1, activation = 'sigmoid')(distances)
    
    return Model(inputs = [input_image, validation_image], outputs = classifier, name = 'SiameseNetwork')

In [None]:
siamese_model = make_siamese_model()

In [None]:
siamese_model.summary()

### **Model Training**

- #### **Setting Loss Function and Optimizer**

In [None]:
binary_cross_loss = tf.losses.BinaryCrossentropy()

In [None]:
opt = tf.keras.optimizers.Adam(1e-4)

- #### **Setting Up Checkpoints**

In [None]:
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(opt = [opt], siamese_model = siamese_model)

- #### **Building Train Step Function**

In [None]:
@tf.function
def train_step(batch):
    with tf.GradientTape() as tape:     
        X = batch[:2]
        y = batch[2]
        
        yhat = siamese_model(X, training = True)
        loss = binary_cross_loss(y, yhat)
    print(loss)
        
    grad = tape.gradient(loss, siamese_model.trainable_variables)
    
    opt.apply_gradients(zip(grad, siamese_model.trainable_variables))
    
    return loss

- #### **Building Training Loop**

In [None]:
def train(data, EPOCHS):
    for epoch in range(1, EPOCHS+1):
        print('\n Epoch {}/{}'.format(epoch, EPOCHS))
        progbar = tf.keras.utils.Progbar(len(data))
        
        for idx, batch in enumerate(data):
            train_step(batch)
            progbar.update(idx + 1)
        
        if epoch % 10 == 0: 
            checkpoint.save(file_prefix=checkpoint_prefix)


- #### **Model Building**

In [None]:
EPOCHS = 5

In [None]:
train(train_data, EPOCHS)

### **Model Evaluation**

In [None]:
from tensorflow.keras.metrics import Precision, Recall

In [None]:
test_input, test_val, y_true = test_data.as_numpy_iterator().next()

In [None]:
y_hat = siamese_model.predict([test_input, test_val])
y_hat

In [None]:
[1 if prediction > 0.5 else 0 for prediction in y_hat ]

In [None]:
y_true

- #### **Calculating Metrics**

In [None]:
m = Recall()
m.update_state(y_true, y_hat)
m.result().numpy()

In [None]:
m = Precision()
m.update_state(y_true, y_hat)
m.result().numpy()

In [None]:
r = Recall()
p = Precision()

for test_input, test_val, y_true in test_data.as_numpy_iterator():
    yhat = siamese_model.predict([test_input, test_val])
    r.update_state(y_true, yhat)
    p.update_state(y_true, yhat) 

print(r.result().numpy(), p.result().numpy())

- #### **Visualizing the Result**

In [None]:
plt.figure(figsize = (10, 8))

plt.subplot(1, 2, 1)
plt.imshow(test_input[0])

plt.subplot(1, 2, 2)
plt.imshow(test_val[0])

plt.show()

### **Saving the Model**

In [None]:
siamese_model.save('siamesemodel.keras')

In [4]:
siamese_model = tf.keras.models.load_model('siamesemodel.keras', custom_objects = {'Dist': Dist, 'BinaryCrossentropy': tf.losses.BinaryCrossentropy})




In [None]:
siamese_model.predict([test_input, test_val])

In [None]:
siamese_model.summary()

### **Real Time Test**

In [6]:
os.listdir(os.path.join('application_data', 'verification_images'))

['8bcf4171-eb40-11ee-853b-ebf06f213632.jpg',
 '8bd4071c-eb40-11ee-a53d-ebf06f213632.jpg',
 '8bd88727-eb40-11ee-a199-ebf06f213632.jpg',
 '8be44d5a-eb40-11ee-a788-ebf06f213632.jpg',
 '8be923da-eb40-11ee-98e1-ebf06f213632.jpg',
 '8bedd9e0-eb40-11ee-98dc-ebf06f213632.jpg',
 '8bf27cc0-eb40-11ee-b384-ebf06f213632.jpg',
 '8bf735bf-eb40-11ee-8db0-ebf06f213632.jpg',
 '8bfbf1eb-eb40-11ee-aac2-ebf06f213632.jpg',
 '8c112789-eb40-11ee-b7e2-ebf06f213632.jpg',
 '8c1f6021-eb40-11ee-9fd0-ebf06f213632.jpg',
 '8c2af6ea-eb40-11ee-9d75-ebf06f213632.jpg',
 '8c40197b-eb40-11ee-be26-ebf06f213632.jpg',
 '8c57b860-eb40-11ee-88f7-ebf06f213632.jpg',
 '8c5c9c3c-eb40-11ee-baf5-ebf06f213632.jpg',
 '8c613038-eb40-11ee-acf2-ebf06f213632.jpg',
 '8c65fc5f-eb40-11ee-becc-ebf06f213632.jpg',
 '8c6ad9aa-eb40-11ee-84b3-ebf06f213632.jpg',
 '8c76856c-eb40-11ee-a790-ebf06f213632.jpg',
 '8c7b48b2-eb40-11ee-bc0a-ebf06f213632.jpg',
 '8c800651-eb40-11ee-800a-ebf06f213632.jpg',
 '8c848047-eb40-11ee-a44f-ebf06f213632.jpg',
 '8c896d5c

In [7]:
for image in os.listdir(os.path.join('application_data', 'verification_images')):
    validation_img = os.path.join('application_data', 'verification_images', image)
    print(validation_img)

application_data\verification_images\8bcf4171-eb40-11ee-853b-ebf06f213632.jpg
application_data\verification_images\8bd4071c-eb40-11ee-a53d-ebf06f213632.jpg
application_data\verification_images\8bd88727-eb40-11ee-a199-ebf06f213632.jpg
application_data\verification_images\8be44d5a-eb40-11ee-a788-ebf06f213632.jpg
application_data\verification_images\8be923da-eb40-11ee-98e1-ebf06f213632.jpg
application_data\verification_images\8bedd9e0-eb40-11ee-98dc-ebf06f213632.jpg
application_data\verification_images\8bf27cc0-eb40-11ee-b384-ebf06f213632.jpg
application_data\verification_images\8bf735bf-eb40-11ee-8db0-ebf06f213632.jpg
application_data\verification_images\8bfbf1eb-eb40-11ee-aac2-ebf06f213632.jpg
application_data\verification_images\8c112789-eb40-11ee-b7e2-ebf06f213632.jpg
application_data\verification_images\8c1f6021-eb40-11ee-9fd0-ebf06f213632.jpg
application_data\verification_images\8c2af6ea-eb40-11ee-9d75-ebf06f213632.jpg
application_data\verification_images\8c40197b-eb40-11ee-be26-ebf

In [11]:
def verify(model, detection_threshold, verification_threshold):
    results = []
    for image in os.listdir(os.path.join('application_data', 'verification_images')):
        input_img = preprocess(os.path.join('application_data', 'input_image', 'input_image.jpg'))
        validation_img = preprocess(os.path.join('application_data', 'verification_images', image))
        
        result = model.predict(list(np.expand_dims([input_img, validation_img], axis = 1)), verbose = 0)
        results.append(result)
    
    detection = np.sum(np.array(results) > detection_threshold)
    
    verification = detection / len(os.listdir(os.path.join('application_data', 'verification_images'))) 
    verified = verification > verification_threshold
    
    return results, verified

In [32]:
cap = cv2.VideoCapture(0)

while cap.isOpened():
    
    ret, frame = cap.read()
    frame = frame[90 : 90 + 250, 240 : 240 + 250, :]

    
    cv2.imshow('Verification', frame)
    
    if cv2.waitKey(10) & 0xFF == ord('v'):
        cv2.imwrite(os.path.join('application_data', 'input_image', 'input_image.jpg'), frame)
        results, verified = verify(siamese_model, 0.5, 0.7)
        print(verified)
    
    if cv2.waitKey(10) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()

False


In [33]:
np.sum(np.squeeze(results) > 0.5)

3