# Import necessary libraries

In [8]:

import cv2
import os
import random
import numpy as np
import matplotlib.pyplot as plt
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten
import tensorflow as tf
import uuid
from tensorflow.keras.metrics import Precision, Recall

# Setup paths and get files

In [9]:

POS_PATH = os.path.join('data', 'positive')
NEG_PATH = os.path.join('data', 'negative')
ANC_PATH = os.path.join('data', 'anchor')

In [3]:

os.makedirs(POS_PATH)
os.makedirs(NEG_PATH)
os.makedirs(ANC_PATH)

FileExistsError: [WinError 183] Nie można utworzyć pliku, który już istnieje: 'data\\positive'

In [None]:
!tar -xf lfw.tgz

In [None]:
for directory in os.listdir("lfw"):
    for file in os.listdir(os.path.join('lfw', directory)):
        EX_PATH = os.path.join('lfw', directory, file)
        NEW_PATH = os.path.join(NEG_PATH, file)
        os.replace(EX_PATH, NEW_PATH)

In [None]:
cap = cv2.VideoCapture(0)

while cap.isOpened():
    ret, frame = cap.read()
    frame = frame[120:120+250, 800:1050, :]
    
    # Collect anchors
    if cv2.waitKey(1) & 0XFF == ord('a'):
        imgname = os.path.join(ANC_PATH, '{}.jpg'.format(uuid.uuid1()))
        cv2.imwrite(imgname, frame)
    
    # Collect positives
    if cv2.waitKey(1) & 0XFF == ord('p'):
        imgname = os.path.join(POS_PATH, '{}.jpg'.format(uuid.uuid1()))
        cv2.imwrite(imgname, frame)
    
    cv2.imshow("Image Collection", frame)
    
    if cv2.waitKey(1) & 0XFF == ord('q'):
        break
cap.release()
cv2.destroyAllWindows()

# Load and preprocess images

In [10]:
#Get image directories
anchor = tf.data.Dataset.list_files(ANC_PATH+'\*.jpg').take(180)
positive = tf.data.Dataset.list_files(POS_PATH+'\*.jpg').take(180)
negative = tf.data.Dataset.list_files(NEG_PATH+'\*.jpg').take(180)

In [13]:
# Preprocess
def preprocess(file_path):
    byte_img = tf.io.read_file(file_path)
    img = tf.io.decode_jpeg(byte_img)
    img = tf.image.resize(img, (105,105))
    img = img / 255.0
    return img

In [14]:
# Create labeled dataset
positives = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data = positives.concatenate(negatives)

In [16]:
def preprocess_twin(input_img, validation_img, label):
    return(preprocess(input_img), preprocess(validation_img), label)

In [17]:
data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size=1024)

In [18]:
train_data = data.take(round(len(data)*.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

In [19]:
test_data = data.skip(round(len(data)*.7))
test_data = test_data.take(round(len(data)*.3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

# Model engineering

In [20]:
def make_embedding():
    inp = Input(shape=(105, 105, 3), name='input_image')
    
    c1 = Conv2D(64, (10, 10), activation='relu')(inp)
    m1 = MaxPooling2D(64, (2, 2), padding='same')(c1)
    
    c2 = Conv2D(128, (7,  7), activation='relu')(m1)
    m2 = MaxPooling2D(64, (2, 2), padding='same')(c2)
    
    c3 = Conv2D(128, (4,  4), activation='relu')(m2)
    m3 = MaxPooling2D(64, (2, 2), padding='same')(c3)
    
    c4 = Conv2D(256, (4, 4), activation='relu')(m3)
    f1 = Flatten()(c4)
    d1 = Dense(4096, activation='sigmoid')(f1)
    
    
    
    return Model(inputs=[inp], outputs=[d1], name='embedding')

In [21]:
embedding = make_embedding()

In [22]:
class L1Dist(Layer):
    def __init__(self, **kwargs):
        super().__init__()
        
    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding)

In [23]:
def make_siamese_model():
    
    
    input_image = Input(name='input_img', shape=(105, 105, 3))
    validation_image = Input(name='validation_img', shape=(105, 105, 3))
    
    siamese_layer = L1Dist()
    siamese_layer._name = 'distance'
    distances = siamese_layer(embedding(input_image), embedding(validation_image))
    
    classifier = Dense(1, activation='sigmoid')(distances)
    
    return Model(inputs=[input_image, validation_image], outputs=classifier, name='SiameseNetwork')

In [24]:
siamese_model = make_siamese_model()

# Training

In [26]:
binary_cross_loss = tf.losses.BinaryCrossentropy()

In [27]:
opt = tf.keras.optimizers.Adam(1e-4)

In [28]:
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(opt=opt, siamese_model = siamese_model)

In [29]:
@tf.function
def train_step(batch):
    
    with tf.GradientTape() as tape:
        
        X = batch[:2]
        y = batch[2]
        
        yhat = siamese_model(X, training=True)
        loss = binary_cross_loss(y, yhat)
    print(loss)
        
    grad = tape.gradient(loss, siamese_model.trainable_variables)
    
    opt.apply_gradients(zip(grad, siamese_model.trainable_variables))
    
    return loss


In [30]:
def train(data, EPOCHS):
    for epoch in range(1, EPOCHS+1):
        print('\n Epoch {}/{}'.format(epoch, EPOCHS))
        progbar = tf.keras.utils.Progbar(len(data))
        
        for idx, batch in enumerate(data):
            train_step(batch)
            progbar.update(idx+1)
        
        if epoch % 10 == 0:
            checkpoint.save(file_prefix = checkpoint_prefix)

In [34]:
EPOCHS = 10

In [26]:
train(train_data, EPOCHS)


 Epoch 1/10
Tensor("binary_crossentropy/weighted_loss/value:0", shape=(), dtype=float32)
Tensor("binary_crossentropy/weighted_loss/value:0", shape=(), dtype=float32)

 Epoch 2/10

 Epoch 3/10

 Epoch 4/10

 Epoch 5/10

 Epoch 6/10

 Epoch 7/10

 Epoch 8/10

 Epoch 9/10

 Epoch 10/10


NameError: name 'checkpoint' is not defined

# Save model

In [27]:
siamese_model.save('siamesemodel.h5')



In [31]:
model = tf.keras.models.load_model('siamesemodel.h5', custom_objects={'L1Dist': L1Dist, 'BinarryCrossentopy':tf.losses.BinaryCrossentropy})



# Evaluate model

In [32]:
test_input, test_val, y_true = test_data.as_numpy_iterator().next()

In [33]:
y_hat = model.predict([test_input, test_val])


In [34]:
[1 if prediction > 0.5 else 0 for prediction in y_hat]

[1, 0, 1, 0, 1, 1, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0]

In [35]:
y_true

array([1., 0., 1., 0., 1., 1., 0., 0., 0., 1., 1., 1., 1., 0., 0., 0.],
      dtype=float32)

In [36]:
m = Recall()
m.update_state(y_true, y_hat)
m.result().numpy()

1.0

# Real time test

In [37]:
def verify(model, detection_threshold, verification_threshold):
    results = []
    for image in os.listdir(os.path.join('application_data', 'verification_images')):
        input_img = preprocess(os.path.join('application_data', 'input_image', 'input_image.jpg'))
        validation_img = preprocess(os.path.join('application_data', 'verification_images', image))
        
        result = model.predict(list(np.expand_dims([input_img, validation_img], axis=1)))
        results.append(result)
        
    detection = np.sum(np.array(results) > detection_threshold)
    verification = detection / len(os.listdir(os.path.join('application_data', 'verification_images')))
    verified = verification > verification_threshold
    
    return results, verified

In [39]:
cap = cv2.VideoCapture(0)
address = ""
cap.open(address)
while cap.isOpened():
    ret, frame = cap.read()
    frame = frame[120:120+250, 800:1050, :]
    
    cv2.imshow("Verification", frame)
    
    if cv2.waitKey(1) & 0XFF == ord('v'):
        cv2.imwrite(os.path.join('application_data', 'input_image', 'input_image.jpg'), frame)
        results, verified = verify(model, 0.5, 0.5)
        print(verified)
        
    
    if cv2.waitKey(1) & 0XFF == ord('q'):
        break
cap.release()
cv2.destroyAllWindows()

True
True
False


In [59]:
results, verified = verify(model, 0.5, 0.5)
print(verified)

True
