# *Set up notebook*

In [None]:
import cv2
import numpy as np
import os
from matplotlib import pyplot as plt
import uuid

import tensorflow as tf
from tensorflow.keras.metrics import Precision, Recall
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, Dense, Input, Flatten, MaxPooling2D

# Setup path

In [15]:
POS_Path = os.path.join("data","positive")
NEG_Path = os.path.join("data","negative")
ANC_Path = os.path.join("data","anchor")

In [None]:
# Creating dir
os.makedirs(POS_Path)
os.makedirs(NEG_Path)
os.makedirs(ANC_Path)

In [None]:
# Move LFW Images to the following repository data/negative
for directory in os.listdir('lfw'):
    for file in os.listdir(os.path.join('lfw', directory)):
       
        EX_PATH = os.path.join('lfw', directory, file)
        NEW_PATH = os.path.join(NEG_Path, file)
        os.replace(EX_PATH, NEW_PATH)

# *Collecting pos and anchor data*

In [None]:
# Connecting to web cam and collecting my data imgs

cap = cv2.VideoCapture(0)

while True:
    ret , frame = cap.read()
    
    # Make the size of the frame to 250x250px
    frame = frame[120:120+250,200:200+250, :]
    
    # Collecting ANC
    if cv2.waitKey(1) & 0XFF == ord("a"):
        # Create unique file name 
        imgname = os.path.join(ANC_Path, '{}.jpg'.format(uuid.uuid1()))
        cv2.imwrite(imgname, frame)
    
    # Collecting POS
    if cv2.waitKey(1) & 0XFF == ord("p"):
        imgname = os.path.join(POS_Path, '{}.jpg'.format(uuid.uuid1()))
        cv2.imwrite(imgname, frame)
    
    cv2.imshow("Collecting data",frame)
    if cv2.waitKey(1) & 0XFF == ord("e"):
        break
    
cap.release()
cv2.destroyAllWindows()

## Get images and preprocessing it

In [16]:
# Take only 300 img

anchor = tf.data.Dataset.list_files(ANC_Path+'\*.jpg').take(300)
positive = tf.data.Dataset.list_files(POS_Path+'\*.jpg').take(300)
negative = tf.data.Dataset.list_files(NEG_Path+'\*.jpg').take(300)

In [2]:
# Preprocessing img before entering the model

def preprocessing(file_path):
    # Read in image from file path and load it
    byte_img = tf.io.read_file(file_path) 
    img = tf.io.decode_jpeg(byte_img)
    
    # Preprocessing steps - resizing the image to be 100x100x3
    img = tf.image.resize(img, (100,100))
    # Scale image to be between 0 and 1 
    img = img / 255.0
    return img

## Creating dataset

In [18]:
# (anchor, positive) ==> 1,1,1,1,1
# (anchor, negative) ==> 0,0,0,0,0

positives = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))

data = positives.concatenate(negatives)
samples = data.as_numpy_iterator()

In [3]:
def preprocessing_twin(input_img,validation_img,label):
    return(preprocessing(input_img),preprocessing(validation_img),label)

In [None]:
example = samples.next()

# Plotting img and its label from the data
res = preprocessing_twin(*example)
plt.imshow(res[0]);
print(res[2])

# Data loader pipeline

In [21]:
data = data.map(preprocessing_twin)
data = data.cache()
data = data.shuffle(buffer_size=1024)

In [22]:
# training data 
train_data = data.take(round(len(data)*0.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(10)

In [23]:
# testing data
testing_data = data.skip(round(len(data)*0.7))
testing_data = testing_data.take(round(len(data)*0.3))
testing_data = testing_data.batch(16)
testing_data = testing_data.prefetch(10)

# *Model engineering*

## Building embedding layer

In [4]:
def make_embedding():
    inp = Input(shape=(100,100,3), name = "input_image")
    
    # First block
    conv1 = Conv2D(64,(10,10),activation = "relu")(inp)
    m1 = MaxPooling2D(64,(2,2),padding="same")(conv1)
    
    # Second block
    conv2 = Conv2D(128,(7,7),activation = "relu")(m1)
    m2 = MaxPooling2D(64,(2,2),padding="same")(conv2)
    
    # Third block
    conv3 = Conv2D(128,(4,4),activation = "relu")(m2)
    m3 = MaxPooling2D(64,(2,2),padding="same")(conv3)
    
    # Final block
    conv4 = Conv2D(256,(4,4),activation = "relu")(m3)
    flat = Flatten()(conv4)
    den = Dense(4096,activation = "sigmoid")(flat)
    
    return Model(inputs = [inp] ,outputs = [den] ,name = "embedding")

In [5]:
# Model summary
emb_model = make_embedding()
emb_model.summary()



Model: "embedding"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_image (InputLayer)    [(None, 100, 100, 3)]     0         
                                                                 
 conv2d (Conv2D)             (None, 91, 91, 64)        19264     
                                                                 
 max_pooling2d (MaxPooling2  (None, 46, 46, 64)        0         
 D)                                                              
                                                                 
 conv2d_1 (Conv2D)           (None, 40, 40, 128)       401536    
                                                                 
 max_pooling2d_1 (MaxPoolin  (None, 20, 20, 128)       0         
 g2D)                                                            
                                                                 
 conv2d_2 (Conv2D)           (None, 17, 17, 128)       

# Distance layer

In [6]:
class L1DIST(Layer):
    
    def __init__(self, **kwargs):
        super().__init__()
        
    # similarity calculations
    def call(self, input_emb, validation_emb):
        return tf.math.abs(input_emb - validation_emb)

# Siamese Neural Network

In [9]:
# Making siamese model
def siamese_model():
    
    # Anchor img
    input_img = Input(name = "input_img",shape = (100,100,3))
    # validation img
    validation_img = Input(name = "validation_img",shape = (100,100,3))
    
    # Siamese distance
    siamese_layer = L1DIST()
    siamese_layer.__name = "distance"
    distance = siamese_layer(emb_model(input_img),emb_model(validation_img))
    
    # Classification layer
    classifier = Dense(1,activation="sigmoid")(distance)
    
    return Model(inputs = [input_img,validation_img],outputs = classifier,name = "SiameseNetwork")
    

In [10]:
# Model summary
siamese_model_ = siamese_model()
siamese_model_.summary()

Model: "SiameseNetwork"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_img (InputLayer)      [(None, 100, 100, 3)]        0         []                            
                                                                                                  
 validation_img (InputLayer  [(None, 100, 100, 3)]        0         []                            
 )                                                                                                
                                                                                                  
 embedding (Functional)      (None, 4096)                 3896044   ['input_img[0][0]',           
                                                          8          'validation_img[0][0]']      
                                                                                     

### Loss and Optimizer

In [11]:
# loss function
binary_cross_loss = tf.losses.BinaryCrossentropy(from_logits=True)

In [12]:
# optimizer 
opt = tf.keras.optimizers.Adam(1e-4)

## Checkpoints

In [31]:
# For training the model
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(opt=opt, siamese_model=siamese_model_)

## Train Step Function

In [32]:
test_batch = train_data.as_numpy_iterator()
batch1 = test_batch.next()

In [33]:
@tf.function
def train_step(batch):
    
    with tf.GradientTape() as tape:
        # get anchor, pos and negative img
        X = batch[:2]
        # get label
        Y = batch[2]
        
        # Forward pass
        y_pred = siamese_model_(X,training = True)
        # Calc loss
        loss = binary_cross_loss(Y,y_pred)
    # Calc Gradients
    grad = tape.gradient(loss,siamese_model_.trainable_variables)
    
    # Calc updated weights
    opt.apply_gradients(zip(grad,siamese_model_.trainable_variables))
    
    return loss

## Training loop

In [34]:
def train(data,EPOCHS):
    # Loop through epochs
    for epoch in range(1,EPOCHS+1):
        print('\n Epoch {}/{}'.format(epoch, EPOCHS))
        progress_bar = tf.keras.utils.Progbar(len(data))
        
        # Loop through each batch
        for idx,batch in enumerate(data):
            # Run train step
            train_step(batch)
            progress_bar.update(idx+1)
        
        # save checkpoints
        if epoch % 10 == 0:
            checkpoint.save(file_prefix = checkpoint_prefix)

# Train the model

In [35]:
EPOCHS = 50

train(train_data,EPOCHS)


 Epoch 1/50

 Epoch 2/50

 Epoch 3/50

 Epoch 4/50

 Epoch 5/50

 Epoch 6/50

 Epoch 7/50

 Epoch 8/50

 Epoch 9/50

 Epoch 10/50

 Epoch 11/50

 Epoch 12/50

 Epoch 13/50

 Epoch 14/50

 Epoch 15/50

 Epoch 16/50

 Epoch 17/50

 Epoch 18/50

 Epoch 19/50

 Epoch 20/50

 Epoch 21/50

 Epoch 22/50

 Epoch 23/50

 Epoch 24/50

 Epoch 25/50

 Epoch 26/50

 Epoch 27/50

 Epoch 28/50

 Epoch 29/50

 Epoch 30/50

 Epoch 31/50

 Epoch 32/50

 Epoch 33/50

 Epoch 34/50

 Epoch 35/50

 Epoch 36/50

 Epoch 37/50

 Epoch 38/50

 Epoch 39/50

 Epoch 40/50

 Epoch 41/50

 Epoch 42/50

 Epoch 43/50

 Epoch 44/50

 Epoch 45/50

 Epoch 46/50

 Epoch 47/50

 Epoch 48/50

 Epoch 49/50

 Epoch 50/50


# Evaluate Model

In [36]:
# batch for test data
test_input,test_val,y_true = testing_data.as_numpy_iterator().next()

In [37]:
# make predictions 
pred = siamese_model_.predict([test_input, test_val])
pred



array([[9.99988735e-01],
       [2.15528307e-05],
       [1.05179615e-05],
       [6.85349377e-08],
       [1.00000000e+00],
       [9.99999881e-01],
       [1.49907162e-08],
       [1.00000000e+00],
       [1.00000000e+00],
       [1.00000000e+00],
       [6.14364531e-11],
       [9.99999404e-01],
       [4.78063526e-11],
       [9.99504566e-01],
       [1.12891718e-09],
       [1.08146062e-10]], dtype=float32)

In [38]:
# post processing the results

[1 if y_pred > 0.5 else 0 for y_pred in pred ]

[1, 0, 0, 0, 1, 1, 0, 1, 1, 1, 0, 1, 0, 1, 0, 0]

In [39]:
# Recall
rec = Recall()

# Calculating
rec.update_state(y_true,pred)
rec.result().numpy()

1.0

In [40]:
# Precision
Pre = Precision()

# Calculating
Pre.update_state(y_true,pred)
Pre.result().numpy()

1.0

### Visualization results

In [None]:
plt.figure(figsize=(18,8))
plt.subplot(1,2,1)
plt.imshow(test_input[0])

plt.subplot(1,2,2)
plt.imshow(test_val[0])
plt.show()

# Save Model

In [None]:
# save weights
siamese_model_.save("siamesemodel.keras")

In [13]:
# Reload the model                                 
custom_objects = {'L1DIST':L1DIST, 'BinaryCrossentropy':tf.losses.BinaryCrossentropy}

with tf.keras.saving.custom_object_scope(custom_objects):
    model = tf.keras.models.load_model("siamesemodel.keras")

In [14]:
model.summary()

Model: "SiameseNetwork"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_img (InputLayer)      [(None, 100, 100, 3)]        0         []                            
                                                                                                  
 validation_img (InputLayer  [(None, 100, 100, 3)]        0         []                            
 )                                                                                                
                                                                                                  
 embedding (Functional)      (None, 4096)                 3896044   ['input_img[0][0]',           
                                                          8          'validation_img[0][0]']      
                                                                                     

# Verification function

In [15]:
def verify(model, detection_threshold, verification_threshold):
    # Results array
    results = []
    for image in os.listdir(os.path.join("application_data","verification_images")):
        input_img = preprocessing(os.path.join("application_data","input_image","input_image.jpg"))
        validation_img = preprocessing(os.path.join("application_data","verification_images",image))
        
        # Make predictions
        result = model.predict(list(np.expand_dims([input_img, validation_img], axis=1)))
        results.append(result)
        
    # Detection Threshold: Metric above which a prediciton is considered positive 
    detection = np.sum(np.array(results) > detection_threshold)
    
    # Verification Threshold: Proportion of positive predictions / total positive samples 
    verification = detection / len(os.listdir(os.path.join('application_data', 'verification_images'))) 
    verified = verification > verification_threshold
    
    return results, verified

# Real time verification

In [None]:
cap = cv2.VideoCapture(0)

while cap.isOpened():
    ret, frame = cap.read()
    frame = frame[120:120+250,200:200+250, :]
    
    cv2.imshow('Verification', frame)
    
    # Verification
    if cv2.waitKey(1) & 0xFF == ord('v'):
        # save input img to application data --> input img folder
        cv2.imwrite(os.path.join("application_data","input_image","input_image.jpg"), frame)
        
        # Run verification
        results, verified = verify(model,0.7,0.6)
        print(verified)
    
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break
cap.release()
cv2.destroyAllWindows()