# Imports

In [22]:
import os
import cv2
import uuid
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import  Layer, Input, Dense, MaxPool2D, Flatten, Conv2D
from tensorflow.keras.models import Model
from tensorflow.keras.metrics import Precision, Recall
from matplotlib import pyplot as plt

# Data

In [23]:
ANCHOR = os.path.join('data', 'Anchor')
POSITIVE = os.path.join('data', 'Positive')
NEGATIVE = os.path.join('data', 'Negative')

In [None]:
os.makedirs (ANCHOR)
os.makedirs (POSITIVE)
os.makedirs (NEGATIVE)

In [None]:
#unzip lfw dataset
!tar -xf lfw.tgz

In [None]:
for directory in os.listdir('lfw'):
    for file in os.listdir(os.path.join('lfw', directory)):
        current_path = os.path.join('lfw', directory, file)
        new_path = os.path.join(NEGATIVE, file)
        os.replace(current_path, new_path)

In [27]:
cap=cv2.VideoCapture(0)

while cap.isOpened():
    
    ret, frame = cap.read()
    frame = frame[120:120+250,200:200+250, :]
    cv2.imshow('Collectin Images', frame)
    
    if cv2.waitKey(1) & 0XFF == ord('a'):
        cv2.imwrite(os.path.join(ANCHOR, '{}.jpg'.format(uuid.uuid1())), frame)
    
    if cv2.waitKey(1) & 0XFF == ord('p'):
        cv2.imwrite(os.path.join(POSITIVE, '{}.jpg'.format(uuid.uuid1())), frame)
                    
    if cv2.waitKey(1) & 0XFF == ord('q'):
        break
        
# Release the webcam
cap.release()
# Close the image show frame
cv2.destroyAllWindows()

In [None]:
# gpu setting
gpus = tf.config.experimental.list_physical_devices('GPU')

for gpu in gpus: 
    tf.config.experimental.set_memory_growth(gpu, True)

In [29]:
data_anch = tf.data.Dataset.list_files(ANCHOR+'\*.jpg').take(300)
data_pos = tf.data.Dataset.list_files(POSITIVE+'\*.jpg').take(300)
data_neg = tf.data.Dataset.list_files(NEGATIVE+'\*.jpg').take(300)

In [30]:
def preprocess(file_path):
    byte_img=tf.io.read_file(file_path)
    img=tf.io.decode_jpeg(byte_img)
    img=tf.image.resize(img, (100, 100))
    img=img / 255.0
    
    return img
    

In [31]:
positives=tf.data.Dataset.zip((data_anch, data_pos, tf.data.Dataset.from_tensor_slices(tf.ones(len(data_anch)))))
negatives=tf.data.Dataset.zip((data_anch, data_neg, tf.data.Dataset.from_tensor_slices(tf.zeros(len(data_anch)))))
data=positives.concatenate(negatives)

In [32]:
def preprocess_twin(anchor_path, validation_path, label):
    anchor_img = preprocess(anchor_path)
    validation_img = preprocess(validation_path)
    return(anchor_img, validation_img, label)

In [33]:
data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size=1024)

In [34]:
train_data = data.take(round(len(data)*0.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

In [35]:
test_data = data.skip(round(len(data)*.7))
test_data = test_data.take(round(len(data)*.3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

In [36]:
len(list(train_data.as_numpy_iterator()))

27

# Model

In [37]:
class EmbedeModel(Model):
    
    def __init__(self, **kwargs):
        
        super().__init__()
        
        self.c1 = Conv2D(64, (10, 10), activation="relu")
        self.m1 = MaxPool2D(64, (2,2), padding='same')
        self.c2 = Conv2D(128, (7, 7), activation='relu')
        self.m2 = MaxPool2D(64, (2,2), padding='same')
        self.c3 = Conv2D(128, (4, 4), activation='relu')
        self.m3 = MaxPool2D(64, (2, 2), padding='same')
        self.c4 = Conv2D(256, (4, 4), activation='relu')
        self.flat = Flatten()
        self.fully = Dense(4096, activation='sigmoid')
    
    def call(self, inputs):
        x = inputs
        x = self.c1(x)
        x = self.m1(x)
        x = self.c2(x)
        x = self.m2(x)
        x = self.c3(x)
        x = self.m3(x)
        x = self.c4(x)
        x = self.flat(x)
        x = self.fully(x)
        return x
    
    def make(self, input_shape=(100,100,3)):
        '''
        This method makes the command "model.summary()" work.
        '''
        x = tf.keras.layers.Input(shape=input_shape)
        model = tf.keras.Model(inputs=[x], outputs=self.call(x), name='actor')
        print(model.summary())
        
        return model
   

In [38]:
class L1Dist(Layer):
    
    def __init__(self, **kwargs):
        super().__init__()
        
    def call(self, anchor, validation):
        return tf.math.abs(anchor-validation)
        

In [39]:
class FinalSiamese(Model):
    
    def __init__(self):
        super().__init__()
        self.dist=L1Dist()
        self.embeded=EmbedeModel()
        self.out=Dense(1, activation='relu')
        
    def call(self, inputs):
        anch = inputs[0]
        valid = inputs[1]
        distance = self.dist(self.embeded(anch), self.embeded(valid))
        output = self.out(distance)
        
        return output
    
    def make(self, input_shape=(100,100,3)):
        '''
        This method makes the command "model.summary()" work.
        '''
        x = tf.keras.layers.Input(shape=input_shape)
        y = tf.keras.layers.Input(shape=input_shape)
        model = tf.keras.Model(inputs=[x,y], outputs=self.call([x,y]), name='actor')
        print(model.summary())
        
        return model
    
       

In [40]:
model=EmbedeModel()
model.make()

Model: "actor"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_1 (InputLayer)        [(None, 100, 100, 3)]     0         
                                                                 
 conv2d (Conv2D)             (None, 91, 91, 64)        19264     
                                                                 
 max_pooling2d (MaxPooling2D  (None, 46, 46, 64)       0         
 )                                                               
                                                                 
 conv2d_1 (Conv2D)           (None, 40, 40, 128)       401536    
                                                                 
 max_pooling2d_1 (MaxPooling  (None, 20, 20, 128)      0         
 2D)                                                             
                                                                 
 conv2d_2 (Conv2D)           (None, 17, 17, 128)       262272

<keras.engine.functional.Functional at 0x23645e96920>

In [41]:
siamese_model=FinalSiamese()
siamese_model.make()

Model: "actor"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_2 (InputLayer)           [(None, 100, 100, 3  0           []                               
                                )]                                                                
                                                                                                  
 input_3 (InputLayer)           [(None, 100, 100, 3  0           []                               
                                )]                                                                
                                                                                                  
 embede_model_1 (EmbedeModel)   (None, 4096)         38960448    ['input_2[0][0]',                
                                                                  'input_3[0][0]']            

<keras.engine.functional.Functional at 0x23646010430>

# Train

In [42]:
Loss=tf.keras.losses.BinaryCrossentropy()
opt=tf.keras.optimizers.Adam(1e-4)

In [43]:
if 'training_checkpoints' not in os.listdir():
    check_path = os.path.join('training_checkpoints')
    os.mkdir(check_path)
    
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(opt=opt, siamese_model=siamese_model)

In [44]:
r = Recall()
p = Precision()

In [45]:
@tf.function
def train_step(batch, model, loss, opt):
    
    with tf.GradientTape() as tape:
        
        x = batch[:2]
        y_true = batch[2]
        y_hat = model(x, training=True)
        loss=Loss(y_true, y_hat)
        
    grad = tape.gradient(loss, model.trainable_variables)
    opt.apply_gradients(zip(grad, model.trainable_variables)) 
    r.update_state(batch[2], y_hat)
    p.update_state(batch[2], y_hat) 

    return loss

In [46]:
def train(data, Epochs, model, loss, opt):
    
    for epoch in range(1, Epochs+1):
        
        print('\n Epoch {}/{}'.format(epoch, Epochs))
        progbar = tf.keras.utils.Progbar(len(data))
        
        for idx, batch in enumerate(data):
            loss = train_step(batch, model, loss, opt)
            yhat = model.predict(batch[0:2])
            progbar.update(idx+1)
            
        print('loss:',loss.numpy(), 'recall:',r.result().numpy(), 'precision:',p.result().numpy())
        
        # Save checkpoints
        if epoch % 10 == 0: 
            checkpoint.save(file_prefix=checkpoint_prefix)
        
        

In [47]:
train(train_data, 50, siamese_model, Loss, opt)


 Epoch 1/50
loss: 0.7354725 recall: 0.0 precision: 0.0

 Epoch 2/50
loss: 0.90690565 recall: 0.08372093 precision: 0.371134

 Epoch 3/50
loss: 0.636273 recall: 0.18615384 precision: 0.5990099

 Epoch 4/50
loss: 0.10796297 recall: 0.26426077 precision: 0.709375

 Epoch 5/50
loss: 0.0787767 recall: 0.38122067 precision: 0.79607844

 Epoch 6/50


loss: 0.16128796 recall: 0.47611588 precision: 0.84444445

 Epoch 7/50
loss: 0.2594567 recall: 0.5430108 precision: 0.87445885

 Epoch 8/50
loss: 0.7770499 recall: 0.592113 precision: 0.8871252

 Epoch 9/50
loss: 0.1771114 recall: 0.62992126 precision: 0.89285713

 Epoch 10/50
loss: -0.0 recall: 0.66113746 precision: 0.8982614

 Epoch 11/50


loss: -0.0 recall: 0.68992245 precision: 0.9050847

 Epoch 12/50
loss: 0.67166424 recall: 0.70292723 precision: 0.9089514

 Epoch 13/50
loss: 0.06592608 recall: 0.698072 precision: 0.91120607

 Epoch 14/50
loss: 0.12363139 recall: 0.71674544 precision: 0.9146919

 Epoch 15/50
loss: -0.0 recall: 0.7342371 precision: 0.9154874

 Epoch 16/50


loss: 0.18627141 recall: 0.749852 precision: 0.9180863

 Epoch 17/50
loss: 0.028822 recall: 0.76400113 precision: 0.92261106

 Epoch 18/50
loss: -0.0 recall: 0.7768421 precision: 0.9271357

 Epoch 19/50
loss: -0.0 recall: 0.7883704 precision: 0.9310345

 Epoch 20/50
loss: -0.0 recall: 0.79881376 precision: 0.93423975

 Epoch 21/50
loss: -0.0 recall: 0.8082316 precision: 0.9373197

 Epoch 22/50


loss: -0.0 recall: 0.81700474 precision: 0.94062114

 Epoch 23/50
loss: -0.0 recall: 0.8249742 precision: 0.94335616

 Epoch 24/50
loss: -0.0 recall: 0.83254343 precision: 0.94592774

 Epoch 25/50
loss: 3.8123095 recall: 0.8392417 precision: 0.94816875

 Epoch 26/50
loss: -0.0 recall: 0.8456498 precision: 0.9502966

 Epoch 27/50


loss: -0.0 recall: 0.8514626 precision: 0.95220375

 Epoch 28/50
loss: -0.0 recall: 0.8568293 precision: 0.95394737

 Epoch 29/50
loss: -0.0 recall: 0.8620016 precision: 0.9557921

 Epoch 30/50
loss: -0.0 recall: 0.86651975 precision: 0.9573913

 Epoch 31/50
loss: -0.0 recall: 0.8706923 precision: 0.9588581

 Epoch 32/50


loss: -0.0 recall: 0.8747045 precision: 0.9601038

 Epoch 33/50
loss: -0.0 recall: 0.87861437 precision: 0.9613156

 Epoch 34/50
loss: -0.0 recall: 0.88225496 precision: 0.9624356

 Epoch 35/50
loss: -0.0 recall: 0.8856372 precision: 0.9634683

 Epoch 36/50
loss: -0.0 recall: 0.8887723 precision: 0.9644179

 Epoch 37/50


loss: -0.0 recall: 0.8917401 precision: 0.96544576

 Epoch 38/50
loss: -0.0 recall: 0.89459294 precision: 0.96629965

 Epoch 39/50
loss: -0.0 recall: 0.8973117 precision: 0.9671104

 Epoch 40/50
loss: -0.0 recall: 0.89984643 precision: 0.96786076

 Epoch 41/50
loss: -0.0 recall: 0.90224785 precision: 0.96856827

 Epoch 42/50
loss: -0.0 recall: 0.9046655 precision: 0.9694013

 Epoch 43/50


loss: -0.0 recall: 0.90685415 precision: 0.9700388

 Epoch 44/50
loss: -0.0 recall: 0.90891516 precision: 0.9707468

 Epoch 45/50
loss: -0.0 recall: 0.9108776 precision: 0.9713101

 Epoch 46/50
loss: -0.0 recall: 0.9128558 precision: 0.9719882

 Epoch 47/50
loss: -0.0 recall: 0.91471386 precision: 0.9726232

 Epoch 48/50


loss: -0.0 recall: 0.9163708 precision: 0.9731881

 Epoch 49/50
loss: -0.0 recall: 0.91813093 precision: 0.97368693

 Epoch 50/50
loss: -0.0 recall: 0.91978055 precision: 0.9741509


# Save & Load

In [None]:
siamese_model.save('./saved_model', save_format='tf')

In [None]:
loaded_model=tf.keras.models.load_model('./saved_model')
loaded_model.summary()

# Test

In [None]:
test_recall=Recall()
test_precision=Precision()

for batch in test_data:
    
    yhat = siamese_model.predict(batch[:2])
    test_recall.update_state(batch[2], yhat)
    test_precision.update_state(batch[2], yhat)
    print('recal:', test_recall.result().numpy(), 'precision:', test_precision.result().numpy())
    res=[]
    print('prediction: ',[1 if prediction >= 0.5 else 0 for prediction in yhat])
    print('y_true:     ', batch[2].numpy())
    
    for i in range (0, len(yhat)):
        
        print('prediction: ', [1 if yhat[i] >= 0.5 else 0])
        plt.figure(figsize = (10,8))
        plt.subplot(1, 2, 1)
        plt.imshow(batch[0][i])
        plt.subplot(1, 2, 2)
        plt.imshow(batch[1][i])
        plt.show()
        
print('recal:', test_recall.result().numpy(), 'precision:', test_precision.result().numpy())
        

### real_time test

In [4]:
# making directories aplication_data/verification_images and directories aplication_data/input_image
#then copy some of positive images to verification_images directory

In [5]:
def verify(model, detection_threshold, verification_threshold):
    
    results=[]
    
    for image_path in os.listdir(os.path.join('application_data', 'verification_images')):
        
        input_img = preprocess(os.path.join('application_data', 'input_image', 'input_image.jpg'))
        validation_img = preprocess(os.path.join('application_data', 'verification_images', image_path))
        result = model.predict(list(np.expand_dims([input_img, validation_img], axis=1)))
    
        results.append(result)
    
    # Detection Threshold: Metric above which a prediciton is considered positive 
    detection = np.sum(np.array(results) > detection_threshold)
    
    # Verification Threshold: Proportion of positive predictions / total positive samples 
    verification = detection / len(os.listdir(os.path.join('application_data', 'verification_images'))) 
    verified = verification > verification_threshold
    
    return results, verified
        
        

In [8]:
cap=cv2.VideoCapture(0)

while cap.isOpened():
    ret, frame=cap.read()
    frame = frame[120:120+250,200:200+250, :]
    
    cv2.imshow('Verification', frame)
    
    # Verification trigger
    if cv2.waitKey(10) & 0xFF == ord('v'):

        cv2.imwrite(os.path.join('application_data', 'input_image', 'input_image.jpg'), frame)
        
        # Run verification
        results, verified = verify(loaded_model, 0.5, 0.4)
        print(verified)
    
    if cv2.waitKey(10) & 0xFF == ord('q'):
        break
cap.release()
cv2.destroyAllWindows()

False
True
True
True
False
False
