In [14]:
import cv2
import os
import random
import uuid
import numpy as np
from matplotlib import pyplot as plt

In [54]:
from tensorflow.keras.models import Model
from tensorflow.keras.metrics import Precision, Recall
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten
import tensorflow as tf

In [4]:
pos_path = os.path.join('data', 'positive')
neg_path = os.path.join('data', 'negative')
anc_path = os.path.join('data', 'anchor')

In [4]:
os.makedirs(pos_path)
os.makedirs(neg_path)
os.makedirs(anc_path)

In [6]:
!tar -xf lfw.tgz

In [5]:
for directory in os.listdir('lfw'):
    for file in os.listdir(os.path.join('lfw', directory)):
        EX_PATH = os.path.join('lfw', directory, file)
        NEW_PATH = os.path.join(neg_path, file)
        os.replace(EX_PATH, NEW_PATH)

FileNotFoundError: [WinError 3] The system cannot find the path specified: 'lfw'

In [11]:
cap = cv2.VideoCapture(0)
while cap.isOpened(): 
    ret, frame = cap.read()
    frame = frame[120:120+250,200:200+250, :]
 
    if cv2.waitKey(1) & 0XFF == ord('a'):

        imgname = os.path.join(ANC_PATH, '{}.jpg'.format(uuid.uuid1()))
        cv2.imwrite(imgname, frame)

    if cv2.waitKey(1) & 0XFF == ord('p'):
        imgname = os.path.join(POS_PATH, '{}.jpg'.format(uuid.uuid1()))
        cv2.imwrite(imgname, frame)
    cv2.imshow('Image Collection', frame)
    if cv2.waitKey(1) & 0XFF == ord('q'):
        break
        
cap.release()
cv2.destroyAllWindows()

In [8]:
anchor = tf.data.Dataset.list_files(anc_path+'\*.jpg').take(300)
positive = tf.data.Dataset.list_files(pos_path+'\*.jpg').take(300)
negative = tf.data.Dataset.list_files(neg_path+'\*.jpg').take(300)

In [9]:
dir_test = anchor.as_numpy_iterator()

In [10]:
print(dir_test.next())

b'data\\anchor\\8d5129d8-49c6-11ef-8c72-8cc7e9d62ecc.jpg'


In [11]:
def preprocess(file_path):
    byte_img = tf.io.read_file(file_path)
    img = tf.io.decode_jpeg(byte_img)
    img = tf.image.resize(img, (100,100))
    img = img / 255.0

    return img

In [16]:
positives = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data = positives.concatenate(negatives)

In [17]:
samples = data.as_numpy_iterator()

In [18]:
exampple = samples.next()

In [19]:
exampple

(b'data\\anchor\\885888a2-49c6-11ef-82d6-8cc7e9d62ecc.jpg',
 b'data\\positive\\d255f46f-49c6-11ef-b925-8cc7e9d62ecc.jpg',
 1.0)

In [20]:
def preprocess_twin(input_img, validation_img, label):
    return(preprocess(input_img), preprocess(validation_img), label)

In [21]:
data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size=1024)

In [22]:
train_data = data.take(round(len(data)*.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

In [23]:
test_data = data.skip(round(len(data)*.7))
test_data = test_data.take(round(len(data)*.3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

In [190]:
inp = Input(shape=(100,100,3), name='input_image')

In [191]:
c1 = Conv2D(64, (10,10), activation='relu')(inp)

In [192]:
m1 = MaxPooling2D(64, (2,2), padding='same')(c1)

In [193]:
c2 = Conv2D(128, (7,7), activation='relu')(m1)
m2 = MaxPooling2D(64, (2,2), padding='same')(c2)

In [194]:
c3 = Conv2D(128, (4,4), activation='relu')(m2)
m3 = MaxPooling2D(64, (2,2), padding='same')(c3)

In [195]:
c4 = Conv2D(256, (4,4), activation='relu')(m3)
f1 = Flatten()(c4)
d1 = Dense(4096, activation='sigmoid')(f1)

In [196]:
mod = Model(inputs=[inp], outputs=[d1], name='embedding')

In [197]:
mod.summary()

Model: "embedding"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_image (InputLayer)     [(None, 100, 100, 3)]     0         
_________________________________________________________________
conv2d_8 (Conv2D)            (None, 91, 91, 64)        19264     
_________________________________________________________________
max_pooling2d_6 (MaxPooling2 (None, 46, 46, 64)        0         
_________________________________________________________________
conv2d_9 (Conv2D)            (None, 40, 40, 128)       401536    
_________________________________________________________________
max_pooling2d_7 (MaxPooling2 (None, 20, 20, 128)       0         
_________________________________________________________________
conv2d_10 (Conv2D)           (None, 17, 17, 128)       262272    
_________________________________________________________________
max_pooling2d_8 (MaxPooling2 (None, 9, 9, 128)         0 

In [24]:
def make_embedding(): 
    inp = Input(shape=(100,100,3), name='input_image')
    c1 = Conv2D(64, (10,10), activation='relu')(inp)
    m1 = MaxPooling2D(64, (2,2), padding='same')(c1)
    c2 = Conv2D(128, (7,7), activation='relu')(m1)
    m2 = MaxPooling2D(64, (2,2), padding='same')(c2)
    c3 = Conv2D(128, (4,4), activation='relu')(m2)
    m3 = MaxPooling2D(64, (2,2), padding='same')(c3)
    c4 = Conv2D(256, (4,4), activation='relu')(m3)
    f1 = Flatten()(c4)
    d1 = Dense(4096, activation='sigmoid')(f1)
    
    return Model(inputs=[inp], outputs=[d1], name='embedding')

In [25]:
embedding = make_embedding()

In [26]:
embedding.summary()

Model: "embedding"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_image (InputLayer)    [(None, 100, 100, 3)]     0         
                                                                 
 conv2d (Conv2D)             (None, 91, 91, 64)        19264     
                                                                 
 max_pooling2d (MaxPooling2D  (None, 46, 46, 64)       0         
 )                                                               
                                                                 
 conv2d_1 (Conv2D)           (None, 40, 40, 128)       401536    
                                                                 
 max_pooling2d_1 (MaxPooling  (None, 20, 20, 128)      0         
 2D)                                                             
                                                                 
 conv2d_2 (Conv2D)           (None, 17, 17, 128)       26

## 4.2 Build Distance Layer

In [27]:
class L1Dist(Layer):
    def __init__(self, **kwargs):
        super().__init__()
        
    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding)

In [29]:
input_image = Input(name='input_img', shape=(100,100,3))
validation_image = Input(name='validation_img', shape=(100,100,3))

In [30]:
inp_embedding = embedding(input_image)
val_embedding = embedding(validation_image)

In [31]:
siamese_layer = L1Dist()

In [34]:
distances = siamese_layer(inp_embedding, val_embedding)
classifier = Dense(1, activation='sigmoid')(distances)
classifier

<KerasTensor: shape=(None, 1) dtype=float32 (created by layer 'dense_1')>

In [35]:
siamese_network = Model(inputs=[input_image, validation_image], outputs=classifier, name='SiameseNetwork')

In [36]:
siamese_network.summary()

Model: "SiameseNetwork"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_img (InputLayer)         [(None, 100, 100, 3  0           []                               
                                )]                                                                
                                                                                                  
 validation_img (InputLayer)    [(None, 100, 100, 3  0           []                               
                                )]                                                                
                                                                                                  
 embedding (Functional)         (None, 4096)         38960448    ['input_img[0][0]',              
                                                                  'validation_img[0][

In [37]:
def make_siamese_model(): 
    input_image = Input(name='input_img', shape=(100,100,3))
    validation_image = Input(name='validation_img', shape=(100,100,3))
    siamese_layer = L1Dist()
    siamese_layer._name = 'distance'
    distances = siamese_layer(embedding(input_image), embedding(validation_image)) 
    classifier = Dense(1, activation='sigmoid')(distances)
    
    return Model(inputs=[input_image, validation_image], outputs=classifier, name='SiameseNetwork')

In [38]:
siamese_model = make_siamese_model()

In [39]:
siamese_model.summary()

Model: "SiameseNetwork"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_img (InputLayer)         [(None, 100, 100, 3  0           []                               
                                )]                                                                
                                                                                                  
 validation_img (InputLayer)    [(None, 100, 100, 3  0           []                               
                                )]                                                                
                                                                                                  
 embedding (Functional)         (None, 4096)         38960448    ['input_img[0][0]',              
                                                                  'validation_img[0][

In [41]:
binary_cross_loss = tf.losses.BinaryCrossentropy()

In [43]:
optim = tf.keras.optimizers.Adam(1e-4) 

In [44]:
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(opt=optim, siamese_model=siamese_model)

In [50]:
@tf.function
def train_step(batch): 
    with tf.GradientTape() as tape:     
        X = batch[:2]
        y = batch[2]

        yhat = siamese_model(X, training=True)

        loss = binary_cross_loss(y, yhat)
    print(loss)
    grad = tape.gradient(loss, siamese_model.trainable_variables)
    opt.apply_gradients(zip(grad, siamese_model.trainable_variables))

    return loss

In [55]:
def train(data, EPOCHS):

    for epoch in range(1, EPOCHS+1):
        print('\n Epoch {}/{}'.format(epoch, EPOCHS))
        progbar = tf.keras.utils.Progbar(len(data))
        r = Recall()
        p = Precision()

        for idx, batch in enumerate(data):

            loss = train_step(batch)
            yhat = siamese_model.predict(batch[:2])
            r.update_state(batch[2], yhat)
            p.update_state(batch[2], yhat) 
            progbar.update(idx+1)
        print(loss.numpy(), r.result().numpy(), p.result().numpy())

        if epoch % 10 == 0: 
            checkpoint.save(file_prefix=checkpoint_prefix)

In [53]:
EPOCHS = 10

In [69]:
train(train_data, EPOCHS)


 Epoch 1/10
Tensor("binary_crossentropy/weighted_loss/value:0", shape=(), dtype=float32)
Tensor("binary_crossentropy/weighted_loss/value:0", shape=(), dtype=float32)
0.5274174 0.20560747 1.0

 Epoch 2/10
0.19155994 0.9756098 1.0

 Epoch 3/10
9.983794e-07 1.0 1.0

 Epoch 4/10
0.011093965 1.0 1.0

 Epoch 5/10
0.0034517932 0.96666664 0.9902439

 Epoch 6/10
 3/27 [==>...........................] - ETA: 7:55

KeyboardInterrupt: 

In [56]:
test_input, test_val, y_true = test_data.as_numpy_iterator().next()

In [57]:
y_hat = siamese_model.predict([test_input, test_val])



In [58]:
[1 if prediction > 0.5 else 0 for prediction in y_hat ]

[0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0]

In [59]:
y_true

array([0., 0., 0., 1., 0., 1., 0., 1., 1., 1., 1., 0., 1., 0., 1., 1.],
      dtype=float32)

In [62]:
r = Recall()
p = Precision()
for test_input, test_val, y_true in test_data.as_numpy_iterator():
    yhat = siamese_model.predict([test_input, test_val])
    r.update_state(y_true, yhat)
    p.update_state(y_true,yhat) 
print(r.result().numpy(), p.result().numpy())

0.07526882 0.35


In [70]:
siamese_model.save('siamesemodelv2.h5')



In [71]:
siamese_model = tf.keras.models.load_model('siamesemodelv2.h5', 
                                   custom_objects={'L1Dist':L1Dist, 'BinaryCrossentropy':tf.losses.BinaryCrossentropy})



In [239]:
siamese_model.predict([test_input, test_val])

array([[2.7295970e-05],
       [8.7373185e-01],
       [1.1476276e-06],
       [9.9997568e-01],
       [9.9490523e-01],
       [2.8164588e-06],
       [2.9260066e-06],
       [1.0000000e+00]], dtype=float32)

In [240]:
siamese_model.summary()

Model: "SiameseNetwork"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_img (InputLayer)          [(None, 100, 100, 3) 0                                            
__________________________________________________________________________________________________
validation_img (InputLayer)     [(None, 100, 100, 3) 0                                            
__________________________________________________________________________________________________
embedding (Functional)          (None, 4096)         38960448    input_img[0][0]                  
                                                                 validation_img[0][0]             
__________________________________________________________________________________________________
l1_dist_6 (L1Dist)              (None, 4096)         0           embedding[0][0]     

In [63]:
os.listdir(os.path.join('application_data', 'verification_images'))

['8c024559-49c6-11ef-976d-8cc7e9d62ecc.jpg',
 '8c10aed7-49c6-11ef-bca3-8cc7e9d62ecc.jpg',
 '8c39634b-49c6-11ef-b318-8cc7e9d62ecc.jpg',
 '8c3e41ed-49c6-11ef-a509-8cc7e9d62ecc.jpg',
 '8c4c140c-49c6-11ef-aac8-8cc7e9d62ecc.jpg',
 '8c5a529f-49c6-11ef-b731-8cc7e9d62ecc.jpg',
 '8c688752-49c6-11ef-a6ad-8cc7e9d62ecc.jpg',
 '8c76b6d1-49c6-11ef-8980-8cc7e9d62ecc.jpg',
 '8c7dcefb-49c6-11ef-af8b-8cc7e9d62ecc.jpg',
 '8c89fe30-49c6-11ef-9e0c-8cc7e9d62ecc.jpg',
 '8c983bf2-49c6-11ef-8e3a-8cc7e9d62ecc.jpg',
 '8ca6c71c-49c6-11ef-9ae4-8cc7e9d62ecc.jpg',
 '8cb4fa76-49c6-11ef-aba0-8cc7e9d62ecc.jpg',
 '8cc308c9-49c6-11ef-b119-8cc7e9d62ecc.jpg',
 '8cd12ded-49c6-11ef-ae51-8cc7e9d62ecc.jpg',
 '8cdfa052-49c6-11ef-9e6f-8cc7e9d62ecc.jpg',
 '8ceda68d-49c6-11ef-be5c-8cc7e9d62ecc.jpg',
 '8cfbcb47-49c6-11ef-a05e-8cc7e9d62ecc.jpg',
 '8d21b767-49c6-11ef-b1f8-8cc7e9d62ecc.jpg',
 '8d2fd741-49c6-11ef-8b4c-8cc7e9d62ecc.jpg',
 '8d3e31fb-49c6-11ef-8d7f-8cc7e9d62ecc.jpg',
 '8d4a7958-49c6-11ef-8fa4-8cc7e9d62ecc.jpg',
 '8d5f6883

In [64]:
os.path.join('application_data', 'input_image', 'input_image.jpg')

'application_data\\input_image\\input_image.jpg'

In [65]:
for image in os.listdir(os.path.join('application_data', 'verification_images')):
    validation_img = os.path.join('application_data', 'verification_images', image)
    print(validation_img)

application_data\verification_images\8c024559-49c6-11ef-976d-8cc7e9d62ecc.jpg
application_data\verification_images\8c10aed7-49c6-11ef-bca3-8cc7e9d62ecc.jpg
application_data\verification_images\8c39634b-49c6-11ef-b318-8cc7e9d62ecc.jpg
application_data\verification_images\8c3e41ed-49c6-11ef-a509-8cc7e9d62ecc.jpg
application_data\verification_images\8c4c140c-49c6-11ef-aac8-8cc7e9d62ecc.jpg
application_data\verification_images\8c5a529f-49c6-11ef-b731-8cc7e9d62ecc.jpg
application_data\verification_images\8c688752-49c6-11ef-a6ad-8cc7e9d62ecc.jpg
application_data\verification_images\8c76b6d1-49c6-11ef-8980-8cc7e9d62ecc.jpg
application_data\verification_images\8c7dcefb-49c6-11ef-af8b-8cc7e9d62ecc.jpg
application_data\verification_images\8c89fe30-49c6-11ef-9e0c-8cc7e9d62ecc.jpg
application_data\verification_images\8c983bf2-49c6-11ef-8e3a-8cc7e9d62ecc.jpg
application_data\verification_images\8ca6c71c-49c6-11ef-9ae4-8cc7e9d62ecc.jpg
application_data\verification_images\8cb4fa76-49c6-11ef-aba0-8cc

In [66]:
def verify(model, detection_threshold, verification_threshold):
    results = []
    for image in os.listdir(os.path.join('application_data', 'verification_images')):
        input_img = preprocess(os.path.join('application_data', 'input_image', 'input_image.jpg'))
        validation_img = preprocess(os.path.join('application_data', 'verification_images', image))
        result = model.predict(list(np.expand_dims([input_img, validation_img], axis=1)))
        results.append(result)

    detection = np.sum(np.array(results) > detection_threshold)
    verification = detection / len(os.listdir(os.path.join('application_data', 'verification_images'))) 
    verified = verification > verification_threshold
    
    return results, verified

In [78]:
cap = cv2.VideoCapture(0)
while cap.isOpened():
    ret, frame = cap.read()
    frame = frame[120:120+250,200:200+250, :]
    
    cv2.imshow('Verification', frame)
    if cv2.waitKey(10) & 0xFF == ord('v'):

        cv2.imwrite(os.path.join('application_data', 'input_image', 'input_image.jpg'), frame)
        results, verified = verify(siamese_model, 0.5, 0.5)
        print(verified)
    
    if cv2.waitKey(10) & 0xFF == ord('q'):
        break
cap.release()
cv2.destroyAllWindows()

False


In [68]:
results

[array([[0.49833822]], dtype=float32),
 array([[0.49881986]], dtype=float32),
 array([[0.49881217]], dtype=float32),
 array([[0.49879032]], dtype=float32),
 array([[0.4988454]], dtype=float32),
 array([[0.49883392]], dtype=float32),
 array([[0.4986956]], dtype=float32),
 array([[0.4986364]], dtype=float32),
 array([[0.49864802]], dtype=float32),
 array([[0.49849072]], dtype=float32),
 array([[0.49852365]], dtype=float32),
 array([[0.4988957]], dtype=float32),
 array([[0.49877712]], dtype=float32),
 array([[0.49855924]], dtype=float32),
 array([[0.49857518]], dtype=float32),
 array([[0.49815235]], dtype=float32),
 array([[0.49817425]], dtype=float32),
 array([[0.4979473]], dtype=float32),
 array([[0.49863836]], dtype=float32),
 array([[0.49883595]], dtype=float32),
 array([[0.4990556]], dtype=float32),
 array([[0.49849612]], dtype=float32),
 array([[0.4984983]], dtype=float32),
 array([[0.49795285]], dtype=float32),
 array([[0.49813727]], dtype=float32),
 array([[0.49833912]], dtype=flo

In [79]:
res , verified = verify(siamese_model, 0.5, 0.5)



In [80]:
verified

True