## Import Dependencies

In [1]:
import cv2
import os
import random
import numpy as np
from matplotlib import pyplot as plt


In [2]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten
import tensorflow as tf





## 1.4 Create Folder Structures

In [3]:
POS_PATH = os.path.join('data', 'positive')
NEG_PATH = os.path.join('data', 'negative')
ANC_PATH = os.path.join('data', 'anchor')


In [None]:
os.makedirs(POS_PATH)
os.makedirs(NEG_PATH)
os.makedirs(ANC_PATH)


# Collecting Positives and Anchors

In [None]:
# download negative images from following URL.
# http://vis-www.cs.umass.edu/lfw/


In [None]:
# Unzip Tar GZ Labelled Faces in the Wild Dataset. firstly copy that downloaded .tgz file into project directory.
!tar -xf lfw.tgz


In [None]:
# Move LFW Images to the following repository "data/negative"
for directory in os.listdir('lfw'):
    for file in os.listdir(os.path.join('lfw', directory)):
        EX_PATH = os.path.join('lfw', directory, file)
        NEW_PATH = os.path.join(NEG_PATH, file)
        os.replace(EX_PATH, NEW_PATH)


In [4]:
# Import uuid library to generate unique image names
import uuid


In [5]:
os.path.join(ANC_PATH, '{}.jpg'.format(uuid.uuid1()))


'data\\anchor\\8c519567-1b62-11ef-8ab1-a884ea54a618.jpg'

In [10]:
# take minimum 300 images for both anchor images and positive images.
# Establish a connection to the webcam
cap = cv2.VideoCapture(0)
while cap.isOpened(): 
    ret, frame = cap.read()
   
    # Cut down frame to 250x250px
    frame = frame[120:120+250,200:200+250, :]

    # press 'a' to capture the anchor images.
    if cv2.waitKey(1) & 0XFF == ord('a'):
        
        imgname = os.path.join(ANC_PATH, '{}.jpg'.format(uuid.uuid1()))
        
        cv2.imwrite(imgname, frame)
    
    # press 'p' to capture positive images.
    if cv2.waitKey(1) & 0XFF == ord('p'):
        
        imgname = os.path.join(POS_PATH, '{}.jpg'.format(uuid.uuid1()))
        
        cv2.imwrite(imgname, frame)
    
    # Show image back to screen
    cv2.imshow('Image Collection', frame)
    
    # press 'q' to quit the images.
    if cv2.waitKey(1) & 0XFF == ord('q'):
        break
        
# Release the webcam
cap.release()
# Close the image show frame
cv2.destroyAllWindows()


# Data Augmentation

In [12]:
# Image augmentation:
def data_aug(img):
    data = []
    for i in range(9):
        img = tf.image.stateless_random_brightness(img, max_delta=0.02, seed=(1,2))
        img = tf.image.stateless_random_contrast(img, lower=0.6, upper=1, seed=(1,3))
        # img = tf.image.stateless_random_crop(img, size=(20,20,3), seed=(1,2))
        img = tf.image.stateless_random_flip_left_right(img, seed=(np.random.randint(100),np.random.randint(100)))
        img = tf.image.stateless_random_jpeg_quality(img, min_jpeg_quality=90, max_jpeg_quality=100, seed=(np.random.randint(100),np.random.randint(100)))
        img = tf.image.stateless_random_saturation(img, lower=0.9,upper=1, seed=(np.random.randint(100),np.random.randint(100)))
            
        data.append(img)
    return data


In [13]:
import os
import uuid


In [None]:
# implimenting data augmentation on anchor images. total images will be 10X before.
for file_name in os.listdir(os.path.join(ANC_PATH)):
    img_path = os.path.join(ANC_PATH, file_name)
    img = cv2.imread(img_path)
    augmented_images = data_aug(img) 
    
    for image in augmented_images:
        cv2.imwrite(os.path.join(ANC_PATH, '{}.jpg'.format(uuid.uuid1())), image.numpy())


In [17]:
# implimenting data augmentation on positive images. total images will be 10X before.
for file_name in os.listdir(os.path.join(POS_PATH)):
    img_path = os.path.join(POS_PATH, file_name)
    img = cv2.imread(img_path)
    augmented_images = data_aug(img)
    
    for image in augmented_images:
        cv2.imwrite(os.path.join(POS_PATH, '{}.jpg'.format(uuid.uuid1())), image.numpy())


In [14]:
# taking 3000 images from all the directories.
anchor = tf.data.Dataset.list_files(ANC_PATH+'\*.jpg').take(3000)
positive = tf.data.Dataset.list_files(POS_PATH+'\*.jpg').take(3000)
negative = tf.data.Dataset.list_files(NEG_PATH+'\*.jpg').take(3000)


In [15]:
dir_test = anchor.as_numpy_iterator()


In [16]:
print(dir_test.next())


b'data\\anchor\\99525bfc-1b63-11ef-9ead-a884ea54a618.jpg'


## Preprocessing - Scale and Resize

In [17]:
def preprocess(file_path):
    
    byte_img = tf.io.read_file(file_path)
    # Load in the image
    img = tf.io.decode_jpeg(byte_img)
    
    # resizing the image to be 100x100x3
    img = tf.image.resize(img, (100,100))
    # Scale image to be between 0 and 1
    img = img / 255.0

    return img


In [18]:
positives = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data = positives.concatenate(negatives)


In [19]:
samples = data.as_numpy_iterator()


In [29]:
example = samples.next()


In [30]:
example


(b'data\\anchor\\a14eddfd-1b63-11ef-abfb-a884ea54a618.jpg',
 b'data\\positive\\46630afd-1b64-11ef-a6f4-a884ea54a618.jpg',
 1.0)

## Build Train and Test Partition

In [31]:
def preprocess_twin(input_img, validation_img, label):
    return(preprocess(input_img), preprocess(validation_img), label)


In [32]:
res = preprocess_twin(*example)


In [35]:
# Build dataloader pipeline
data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size=10000)


In [36]:
# Training partition
train_data = data.take(round(len(data)*.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)


In [37]:
# Testing partition
test_data = data.skip(round(len(data)*.7))
test_data = test_data.take(round(len(data)*.3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)


## Build Embedding Layer

In [46]:
def make_embedding(): 
    inp = Input(shape=(100,100,3), name='input_image')
    
    # First block
    c1 = Conv2D(64, (10,10), activation='relu')(inp)
    m1 = MaxPooling2D(64, (2,2), padding='same')(c1)
    
    # Second block
    c2 = Conv2D(128, (7,7), activation='relu')(m1)
    m2 = MaxPooling2D(64, (2,2), padding='same')(c2)
    
    # Third block 
    c3 = Conv2D(128, (4,4), activation='relu')(m2)
    m3 = MaxPooling2D(64, (2,2), padding='same')(c3)
    
    # Final embedding block
    c4 = Conv2D(256, (4,4), activation='relu')(m3)
    f1 = Flatten()(c4)
    d1 = Dense(4096, activation='sigmoid')(f1)
    
    return Model(inputs=[inp], outputs=[d1], name='embedding')


In [47]:
embedding = make_embedding()


In [48]:
embedding.summary()


Model: "embedding"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_image (InputLayer)    [(None, 100, 100, 3)]     0         
                                                                 
 conv2d_4 (Conv2D)           (None, 91, 91, 64)        19264     
                                                                 
 max_pooling2d_3 (MaxPoolin  (None, 46, 46, 64)        0         
 g2D)                                                            
                                                                 
 conv2d_5 (Conv2D)           (None, 40, 40, 128)       401536    
                                                                 
 max_pooling2d_4 (MaxPoolin  (None, 20, 20, 128)       0         
 g2D)                                                            
                                                                 
 conv2d_6 (Conv2D)           (None, 17, 17, 128)       26

In [49]:
# Siamese L1 Distance class
class L1Dist(Layer):
    
    # Init method - inheritance
    def __init__(self, **kwargs):
        super().__init__()
       
    # similarity calculation
    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding)


In [50]:
l1 = L1Dist()


## Make Siamese Model

In [59]:
def make_siamese_model(): 
    
    # Anchor image input in the network
    input_image = Input(name='input_img', shape=(100,100,3))
    
    # Validation image in the network 
    validation_image = Input(name='validation_img', shape=(100,100,3))
    
    # Combine siamese distance components
    siamese_layer = L1Dist()
    siamese_layer._name = 'distance'
    distances = siamese_layer(embedding(input_image), embedding(validation_image))
    
    # Classification layer 
    classifier = Dense(1, activation='sigmoid')(distances)
    
    return Model(inputs=[input_image, validation_image], outputs=classifier, name='SiameseNetwork')


In [60]:
siamese_model = make_siamese_model()


In [61]:
siamese_model.summary()


Model: "SiameseNetwork"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_img (InputLayer)      [(None, 100, 100, 3)]        0         []                            
                                                                                                  
 validation_img (InputLayer  [(None, 100, 100, 3)]        0         []                            
 )                                                                                                
                                                                                                  
 embedding (Functional)      (None, 4096)                 3896044   ['input_img[0][0]',           
                                                          8          'validation_img[0][0]']      
                                                                                     

# Training

In [62]:
binary_cross_loss = tf.losses.BinaryCrossentropy()


In [63]:
opt = tf.keras.optimizers.Adam(1e-4) # 0.0001


In [64]:
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(opt=opt, siamese_model=siamese_model)


In [65]:
test_batch = train_data.as_numpy_iterator()


In [66]:
batch_1 = test_batch.next()


In [67]:
X = batch_1[:2]


In [68]:
y = batch_1[2]


In [69]:
y


array([1., 1., 1., 0., 1., 1., 1., 0., 1., 0., 1., 0., 0., 1., 0., 1.],
      dtype=float32)

In [70]:
@tf.function
def train_step(batch):
    
    with tf.GradientTape() as tape:     
        # Get anchor and positive/negative image
        X = batch[:2]
        # Get label
        y = batch[2]
        
        # Forward pass
        yhat = siamese_model(X, training=True)
        # Calculate loss
        loss = binary_cross_loss(y, yhat)
    print(loss)
        
    # Calculate gradients
    grad = tape.gradient(loss, siamese_model.trainable_variables)
    
    # Calculate updated weights and apply to siamese model
    opt.apply_gradients(zip(grad, siamese_model.trainable_variables))
        
    return loss


In [71]:
# Import metric calculations
from tensorflow.keras.metrics import Precision, Recall


In [72]:
def train(data, EPOCHS):

    for epoch in range(1, EPOCHS+1):
        print('\n Epoch {}/{}'.format(epoch, EPOCHS))
        progbar = tf.keras.utils.Progbar(len(data))
        
        r = Recall()
        p = Precision()
        
        for idx, batch in enumerate(data):

            loss = train_step(batch)
            yhat = siamese_model.predict(batch[:2])
            r.update_state(batch[2], yhat)
            p.update_state(batch[2], yhat) 
            progbar.update(idx+1)
        print(loss.numpy(), r.result().numpy(), p.result().numpy())
        
        if epoch % 10 == 0: 
            checkpoint.save(file_prefix=checkpoint_prefix)


In [73]:
EPOCHS = 50


In [74]:
train(train_data, EPOCHS)



 Epoch 1/50
Tensor("binary_crossentropy/weighted_loss/value:0", shape=(), dtype=float32)
Tensor("binary_crossentropy/weighted_loss/value:0", shape=(), dtype=float32)
0.5893874 0.120192304 1.0

 Epoch 2/50
0.32918754 0.5 1.0

 Epoch 3/50
0.071150824 0.9348837 0.9852941

 Epoch 4/50
0.053138915 0.9809524 0.9951691

 Epoch 5/50
0.2637659 0.9814815 0.9953052

 Epoch 6/50
6.743139e-05 0.9859155 1.0

 Epoch 7/50
0.09404971 1.0 1.0

 Epoch 8/50
0.008823613 0.9902439 1.0

 Epoch 9/50
5.3408505e-05 1.0 1.0

 Epoch 10/50
0.00711541 1.0 1.0

 Epoch 11/50
0.13081494 1.0 1.0

 Epoch 12/50
0.017894436 1.0 1.0

 Epoch 13/50
0.03530894 1.0 1.0

 Epoch 14/50
0.0038227104 1.0 1.0

 Epoch 15/50
0.12709942 0.9660194 0.995

 Epoch 16/50
0.006644587 0.9859813 1.0

 Epoch 17/50
0.00039618634 1.0 1.0

 Epoch 18/50
1.7583404e-06 1.0 1.0

 Epoch 19/50
0.0013235769 1.0 1.0

 Epoch 20/50
6.6608814e-06 1.0 1.0

 Epoch 21/50
0.0037581096 1.0 1.0

 Epoch 22/50
6.318155e-06 1.0 1.0

 Epoch 23/50
1.6242307e-06 1.0 1.

# Evaluate Model

In [75]:
# Import metric calculations
from tensorflow.keras.metrics import Precision, Recall


In [76]:
# Get a batch of test data
test_input, test_val, y_true = test_data.as_numpy_iterator().next()


In [77]:
y_hat = siamese_model.predict([test_input, test_val])




In [78]:
# Post processing the results 
[1 if prediction > 0.5 else 0 for prediction in y_hat ]


[0, 1, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0, 0]

In [79]:
y_true


array([0., 1., 0., 0., 0., 0., 0., 0., 1., 1., 1., 1., 0., 0., 0., 0.],
      dtype=float32)

## Calculate Metrics

In [80]:
# calculating recall
m = Recall()
m.update_state(y_true, y_hat)
m.result().numpy()


1.0

In [81]:
# calculating precision 
m = Precision()
m.update_state(y_true, y_hat)
m.result().numpy()


1.0

In [82]:
r = Recall()
p = Precision()

for test_input, test_val, y_true in test_data.as_numpy_iterator():
    yhat = siamese_model.predict([test_input, test_val])
    r.update_state(y_true, yhat)
    p.update_state(y_true,yhat) 

print(r.result().numpy(), p.result().numpy())


1.0 1.0


# Save Model

In [84]:
# Save weights
siamese_model.save('siameseModel.h5')




  saving_api.save_model(


In [85]:
L1Dist


__main__.L1Dist

In [86]:
# Reload model
siamese_model = tf.keras.models.load_model('siameseModel.h5', 
                                   custom_objects={'L1Dist':L1Dist, 'BinaryCrossentropy':tf.losses.BinaryCrossentropy})




In [87]:
# Make predictions with reloaded model
siamese_model.predict([test_input, test_val])




array([[9.9999893e-01],
       [1.0852679e-06],
       [1.3583326e-10],
       [1.0000000e+00]], dtype=float32)

In [88]:
siamese_model.summary()


Model: "SiameseNetwork"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_img (InputLayer)      [(None, 100, 100, 3)]        0         []                            
                                                                                                  
 validation_img (InputLayer  [(None, 100, 100, 3)]        0         []                            
 )                                                                                                
                                                                                                  
 embedding (Functional)      (None, 4096)                 3896044   ['input_img[0][0]',           
                                                          8          'validation_img[0][0]']      
                                                                                     

# Real Time Test

In [90]:
os.listdir(os.path.join('application_data', 'verification_images'))


['01c61375-1b63-11ef-841a-a884ea54a618 - Copy.jpg',
 '04bd4c3b-1b63-11ef-af00-a884ea54a618 - Copy.jpg',
 '06498cae-1b63-11ef-9ccf-a884ea54a618.jpg',
 '0e3a65dc-1b63-11ef-b00d-a884ea54a618 - Copy.jpg',
 '1ac3be2c-1b63-11ef-8da5-a884ea54a618 - Copy.jpg',
 '1c0c264a-1b63-11ef-8e52-a884ea54a618 - Copy.jpg',
 '1f7148d7-1b63-11ef-a973-a884ea54a618 - Copy.jpg',
 '249925a2-1b63-11ef-852e-a884ea54a618.jpg',
 '2875ff34-1b63-11ef-ab46-a884ea54a618 - Copy.jpg',
 '2b7115ff-1b63-11ef-8de4-a884ea54a618 - Copy.jpg',
 '2bef7594-1b63-11ef-abb2-a884ea54a618 - Copy.jpg',
 '3094526f-1b63-11ef-a40f-a884ea54a618.jpg',
 '3102393d-1b63-11ef-a750-a884ea54a618 - Copy.jpg',
 '3c72d0d2-1b63-11ef-88a5-a884ea54a618 - Copy.jpg',
 '3e958b1a-1b63-11ef-ac1f-a884ea54a618 - Copy.jpg',
 '4457d0fc-1b63-11ef-bb5b-a884ea54a618.jpg',
 '5d7f7a0d-1b63-11ef-8bcf-a884ea54a618 - Copy.jpg',
 '631ef905-1b63-11ef-bcea-a884ea54a618 - Copy.jpg',
 '650b05f7-1b63-11ef-88e8-a884ea54a618 - Copy.jpg',
 '68539d19-1b63-11ef-bc01-a884ea54a618.j

In [91]:
os.path.join('application_data', 'input_image', 'input_image.jpg')


'application_data\\input_image\\input_image.jpg'

In [92]:
for image in os.listdir(os.path.join('application_data', 'verification_images')):
    validation_img = os.path.join('application_data', 'verification_images', image)
    print(validation_img)


application_data\verification_images\01c61375-1b63-11ef-841a-a884ea54a618 - Copy.jpg
application_data\verification_images\04bd4c3b-1b63-11ef-af00-a884ea54a618 - Copy.jpg
application_data\verification_images\06498cae-1b63-11ef-9ccf-a884ea54a618.jpg
application_data\verification_images\0e3a65dc-1b63-11ef-b00d-a884ea54a618 - Copy.jpg
application_data\verification_images\1ac3be2c-1b63-11ef-8da5-a884ea54a618 - Copy.jpg
application_data\verification_images\1c0c264a-1b63-11ef-8e52-a884ea54a618 - Copy.jpg
application_data\verification_images\1f7148d7-1b63-11ef-a973-a884ea54a618 - Copy.jpg
application_data\verification_images\249925a2-1b63-11ef-852e-a884ea54a618.jpg
application_data\verification_images\2875ff34-1b63-11ef-ab46-a884ea54a618 - Copy.jpg
application_data\verification_images\2b7115ff-1b63-11ef-8de4-a884ea54a618 - Copy.jpg
application_data\verification_images\2bef7594-1b63-11ef-abb2-a884ea54a618 - Copy.jpg
application_data\verification_images\3094526f-1b63-11ef-a40f-a884ea54a618.jpg
a

In [93]:
def verify(model, detection_threshold, verification_threshold):
    
    results = []
    for image in os.listdir(os.path.join('application_data', 'verification_images')):
        input_img = preprocess(os.path.join('application_data', 'input_image', 'input_image.jpg'))
        validation_img = preprocess(os.path.join('application_data', 'verification_images', image))
        
        # Make Predictions
        result = model.predict(list(np.expand_dims([input_img, validation_img], axis=1)))
        results.append(result)
    
    # Detection Threshold: Metric above which a prediciton is considered positive 
    detection = np.sum(np.array(results) > detection_threshold)
    
    # Verification Threshold: Proportion of positive predictions / total positive samples 
    verification = detection / len(os.listdir(os.path.join('application_data', 'verification_images'))) 
    verified = verification > verification_threshold
    
    return results, verified


## OpenCV Real Time Verification

In [None]:
# press 'v' to test the model.
cap = cv2.VideoCapture(0)
while cap.isOpened():
    ret, frame = cap.read()
    frame = frame[120:120+250,200:200+250, :]
    
    cv2.imshow('Verification', frame)
    
    if cv2.waitKey(10) & 0xFF == ord('v'):

        cv2.imwrite(os.path.join('application_data', 'input_image', 'input_image.jpg'), frame)
        # Run verification
        results, verified = verify(siamese_model, 0.5, 0.5)
        print(verified)
    # press 'q' to quit the process.
    if cv2.waitKey(10) & 0xFF == ord('q'):
        break
cap.release()
cv2.destroyAllWindows()


In [None]:
np.sum(np.squeeze(results) > 0.9)


In [None]:
results
