# Dependencies


##### The model was trained on google colab, but most of the stuff was run locally. The unzipping part is only necessary if you run on colab. You'll also need to capture the images locally and upload them to be able to train with a colab GPU. I tried connecting colab to my local runtime to make it simpler, didn't work.


In [None]:

from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [None]:
import zipfile

# Opens the zip file in read mode
zip_ref = zipfile.ZipFile('drive/MyDrive/data.zip', 'r')
zip_ref.extractall()  # Extracts the files
zip_ref.close()


In [2]:
import cv2
import numpy as np
import matplotlib.pyplot as plt
import uuid
import os


In [3]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten
import tensorflow as tf


In [4]:
# Setting up folders unless they already exist: anchor, positive, negative
ANC_PATH = os.path.join('data', 'anchor')
POS_PATH = os.path.join('data', 'positive')
NEG_PATH = os.path.join('data', 'negative')

for path in [ANC_PATH, POS_PATH, NEG_PATH]:
    try:
        os.makedirs(path)
    except Exception as e:
        print(e)


[WinError 183] Cannot create a file when that file already exists: 'data\\anchor'
[WinError 183] Cannot create a file when that file already exists: 'data\\positive'
[WinError 183] Cannot create a file when that file already exists: 'data\\negative'


# Collecting data


In [None]:
!tar - xf lfw.tgz  # http://vis-www.cs.umass.edu/lfw/#download for negative images


tar: Must specify one of -c, -r, -t, -u, -x


In [None]:
for directory in os.listdir('lfw'):
    for file in os.listdir(os.path.join('lfw', directory)):
        EX_PATH = os.path.join('lfw', directory, file)
        NEW_PATH = os.path.join(NEG_PATH, file)
        os.replace(EX_PATH, NEW_PATH)


In [38]:
cap = cv2.VideoCapture(0)
while cap.isOpened():
    ret, frame = cap.read()

    # Same size as downloaded images
    frame = frame[120:120+250, 200:200+250, :]
    cv2.imshow('Image Collection', frame)

    if cv2.waitKey(1) & 0XFF == ord('a'):
        path_name = os.path.join(ANC_PATH, '{}.jpg'.format(uuid.uuid1()))
        cv2.imwrite(path_name, frame)

    if cv2.waitKey(1) & 0XFF == ord('p'):
        path_name = os.path.join(POS_PATH, '{}.jpg'.format(uuid.uuid1()))
        cv2.imwrite(path_name, frame)

    if cv2.waitKey(1) & 0XFF == ord('q'):
        break
cap.release()
cv2.destroyAllWindows()


# Loading an preprocessing


In [5]:
anchor = tf.data.Dataset.list_files(ANC_PATH+'/*.jpg').take(300)
positive = tf.data.Dataset.list_files(POS_PATH+'/*.jpg').take(300)
negative = tf.data.Dataset.list_files(NEG_PATH+'/*.jpg').take(300)


In [6]:
def preprocess(file_path):
    byte_img = tf.io.read_file(file_path)
    img = tf.io.decode_jpeg(byte_img)
    img = tf.image.resize(img, (100, 100))
    img = img/255.0
    return img


In [7]:
positives = tf.data.Dataset.zip(
    (anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip(
    (anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data = positives.concatenate(negatives)


In [8]:
def preprocess_twin(input_img, validation_img, label):
    return (preprocess(input_img), preprocess(validation_img), label)


In [9]:
data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size=1024)


In [10]:
train_data = data.take(round(len(data)*0.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

test_data = data.skip(round(len(data)*0.7))
test_data = test_data.take(round(len(data)*0.3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)


# Building the model


In [11]:
def make_embedding():
    inp = Input(shape=(100, 100, 3), name='input_img')

    conv1 = Conv2D(64, (10, 10), activation='relu')(inp)
    m1 = MaxPooling2D(64, (2, 2), padding='same')(conv1)

    conv2 = Conv2D(128, (7, 7), activation='relu')(m1)
    m2 = MaxPooling2D(64, (2, 2), padding='same')(conv2)

    conv3 = Conv2D(128, (4, 4), activation='relu')(m2)
    m3 = MaxPooling2D(64, (2, 2), padding='same')(conv3)

    conv4 = Conv2D(256, (4, 4), activation='relu')(m3)
    f1 = Flatten()(conv4)
    d1 = Dense(4096, activation='sigmoid')(f1)

    return Model(inputs=[inp], outputs=[d1], name='embedding')


embedding = make_embedding()

embedding.summary()


Model: "embedding"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_img (InputLayer)      [(None, 100, 100, 3)]     0         
                                                                 
 conv2d (Conv2D)             (None, 91, 91, 64)        19264     
                                                                 
 max_pooling2d (MaxPooling2D  (None, 46, 46, 64)       0         
 )                                                               
                                                                 
 conv2d_1 (Conv2D)           (None, 40, 40, 128)       401536    
                                                                 
 max_pooling2d_1 (MaxPooling  (None, 20, 20, 128)      0         
 2D)                                                             
                                                                 
 conv2d_2 (Conv2D)           (None, 17, 17, 128)       26

In [12]:
class L1Dist(Layer):
    def __init__(self, **kwargs):
        super().__init__()

    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding-validation_embedding)


In [13]:
def make_model():
    input_image = Input(name='input_img', shape=(100, 100, 3))
    validation_image = Input(name='validation_img', shape=(100, 100, 3))

    siamese_layer = L1Dist()
    siamese_layer._name = 'distance'
    distances = siamese_layer(embedding(input_image),
                              embedding(validation_image))

    classifier = Dense(1, activation='sigmoid')(distances)

    return Model(inputs=[input_image, validation_image], outputs=classifier, name='SiameseNetwork')


siamese_model = make_model()


# Training the model


In [14]:
binary_cross_loss = tf.losses.BinaryCrossentropy()


In [15]:
optimizer = tf.optimizers.Adam(1e-4)


In [16]:
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(opt=optimizer, siamese_model=siamese_model)


In [17]:
@tf.function
def train_step(batch):
    with tf.GradientTape() as tape:
        x = batch[:2]
        y = batch[2]

        yhat = siamese_model(x, training=True)

        loss = binary_cross_loss(y, yhat)

    print(loss)

    grad = tape.gradient(loss, siamese_model.trainable_variables)

    optimizer.apply_gradients(zip(grad, siamese_model.trainable_variables))
    return loss


In [18]:
def train(data, EPOCHS):
    for epoch in range(1, EPOCHS+1):
        print('\n Epoch {}/{}'.format(epoch, EPOCHS))
        progbar = tf.keras.utils.Progbar(len(data))

        for idx, batch in enumerate(data):
            train_step(batch)
            progbar.update(idx+1)

        if epoch % 10 == 10:
            checkpoint.save(file_prefix=checkpoint_prefix)


In [33]:
EPOCHS = 50
train(train_data, EPOCHS)



 Epoch 1/50
Tensor("binary_crossentropy/weighted_loss/value:0", shape=(), dtype=float32)
Tensor("binary_crossentropy/weighted_loss/value:0", shape=(), dtype=float32)

 Epoch 2/50

 Epoch 3/50

 Epoch 4/50

 Epoch 5/50

 Epoch 6/50

 Epoch 7/50

 Epoch 8/50

 Epoch 9/50

 Epoch 10/50

 Epoch 11/50

 Epoch 12/50

 Epoch 13/50

 Epoch 14/50

 Epoch 15/50

 Epoch 16/50

 Epoch 17/50

 Epoch 18/50

 Epoch 19/50

 Epoch 20/50

 Epoch 21/50

 Epoch 22/50

 Epoch 23/50

 Epoch 24/50

 Epoch 25/50

 Epoch 26/50

 Epoch 27/50

 Epoch 28/50

 Epoch 29/50

 Epoch 30/50

 Epoch 31/50

 Epoch 32/50

 Epoch 33/50

 Epoch 34/50

 Epoch 35/50

 Epoch 36/50

 Epoch 37/50

 Epoch 38/50

 Epoch 39/50

 Epoch 40/50

 Epoch 41/50

 Epoch 42/50

 Epoch 43/50

 Epoch 44/50

 Epoch 45/50

 Epoch 46/50

 Epoch 47/50

 Epoch 48/50

 Epoch 49/50

 Epoch 50/50


# Evaluating and saving


In [26]:
from tensorflow.keras.metrics import Precision, Recall


In [27]:
test_input, test_val, y_true = test_data.as_numpy_iterator().next()


In [28]:
y_hat = siamese_model.predict([test_input, test_val])
pred_labels = [1 if prediction > 0.5 else 0 for prediction in y_hat]




In [54]:
print(pred_labels)
print(y_true)


[1, 0, 1, 1, 1, 1, 0, 0, 1, 0, 1, 1, 0, 0, 0, 0]
[1. 0. 1. 1. 1. 1. 0. 0. 1. 0. 1. 1. 0. 0. 0. 0.]


In [44]:
m = Recall()
m.update_state(y_true, y_hat)
m.result().numpy()


1.0

In [45]:
m = Precision()
m.update_state(y_true, y_hat)
m.result().numpy()
plt.subplot()


1.0

In [82]:
# This displays all the images with their corresponding labels. I didn't run the code so as not to upload personal mugshots to GitHub
labeled_image_tuples = zip(test_input, test_val, y_true)
fig, axes = plt.subplots(nrows=len(y_true), ncols=3, figsize=(10, 80))
for i, (img1, img2, label) in enumerate(labeled_image_tuples):
    axes[i, 0].imshow(img1)
    axes[i, 0].axis('off')

    axes[i, 1].imshow(img2)
    axes[i, 1].axis('off')

    axes[i, 2].text(0, 0.5, ['True' if label == 1.0 else 'False'],
                    ha='left', va='center', fontsize=12)
    axes[i, 2].axis('off')


In [84]:
siamese_model.save('siamese_model.h5')




In [20]:
model = tf.keras.models.load_model('app/siamese_model2.h5',
                                   custom_objects={'L1Dist': L1Dist, 'BinaryCrossentropy': tf.losses.BinaryCrossentropy})




# Real-time predictions


In [18]:
try:
    os.makedirs('application_data/verification_images')
except Exception as e:
    print(e)
try:
    os.makedirs('application_data/input_image')
except Exception as e:
    print(e)


[WinError 183] Cannot create a file when that file already exists: 'application_data/verification_images'
[WinError 183] Cannot create a file when that file already exists: 'application_data/input_image'


In [21]:
VERIFICATION_PATH = os.path.join('application_data', 'verification_images')
INPUT_PATH = os.path.join('application_data', 'input_image')


In [40]:
# This takes 25 images from both anchor and positive examples and moves them to
# verification directory
verification_source_dirs = (ANC_PATH, POS_PATH)
for directory in verification_source_dirs:
    for index, file in enumerate(os.listdir(directory)):
        EX_PATH = os.path.join(directory, file)
        NEW_PATH = os.path.join(VERIFICATION_PATH, file)
        os.replace(EX_PATH, NEW_PATH)
        if index == 25:
            break


In [22]:
def verify(model, detection_threshold, verification_threshold):
    results = []
    for image in os.listdir(VERIFICATION_PATH):
        input_img = preprocess(os.path.join(INPUT_PATH, 'input_image.jpg'))
        validation_img = preprocess(os.path.join(VERIFICATION_PATH, image))

        result = model.predict(
            list(np.expand_dims([input_img, validation_img], axis=1)))
        results.append(result)
    detection = np.sum(np.array(results) > detection_threshold)

    verification = detection / len(os.listdir(VERIFICATION_PATH))
    verified = verification > verification_threshold

    return results, verified


In [1]:
cap = cv2.VideoCapture(0)
while cap.isOpened():
    ret, frame = cap.read()
    frame = frame[120:120+250, 200:200+250, :]
    cv2.imshow('Verification', frame)

    if cv2.waitKey(1) & 0XFF == ord('v'):
        cv2.imwrite(os.path.join(INPUT_PATH, 'input_image.jpg'), frame)
        results, verified = verify(model, 0.5, 0.5)
        print(verified)
    if cv2.waitKey(1) & 0XFF == ord('q'):
        break
cap.release()
cv2.destroyAllWindows()
