In [None]:
!pip install tensorflow tensorflow-gpu opencv-python matplotlib

In [None]:
# import dependencies 
import cv2
import os 
import random 
import numpy as np
from matplotlib import pyplot as plt

In [None]:
# import tensorflow dependencies 
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten
import tensorflow as tf 

In [None]:
# set gpu growth 
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus: 
    tf.config.experimental.set_memory_growth(gpu, True)

In [None]:
len(gpus)

In [None]:
# create Folder structures
# setup paths 
POS_PATH = os.path.join('data', 'positive')
NEG_PATH = os.path.join('data', 'negative')
ANC_PATH = os.path.join('data', 'anchor')

In [None]:
# make directories
os.makedirs(POS_PATH)
os.makedirs(NEG_PATH)
os.makedirs(ANC_PATH)

In [None]:
# Uncompress Tar GZ Labelled Faces in the Wild Dataset
!tar -xf lfw.tgz

In [None]:
for directory in os.listdir('lfw'): 
    for file in os.listdir(os.path.join('lfw', directory)): 
        EX_PATH = os.path.join('lfw', directory, file)
        NEW_PATH = os.path.join(NEG_PATH, file)
        os.replace(EX_PATH, NEW_PATH)

In [None]:
# import uid library to generate unique image names 
import uuid

In [None]:
os.path.join(ANC_PATH, '{}.jpg'.format(uuid.uuid1()))

In [None]:
# Establish connection to the webcam: 
cap = cv2.VideoCapture(0)
while cap.isOpened(): 
    ret, frame = cap.read()

    # resize frame to be same as negative pictures
    frame = frame[120:120+250, 200:200+250, :]

    # collect anchors
    if cv2.waitKey(1) & 0XFF == ord('a'):
        # create unique file path
        imgname = os.path.join(ANC_PATH, '{}.jpg'.format(uuid.uuid1()))
        # write out anchor image
        cv2.imwrite(imgname, frame)

    # collect positives 
    if cv2.waitKey(1) & 0XFF == ord('p'):
        # create unique file path
        imgname = os.path.join(POS_PATH, '{}.jpg'.format(uuid.uuid1()))
        # write out positive image
        cv2.imwrite(imgname, frame)
    # show image back to screen 
    cv2.imshow('Image Collection', frame)


  
    # break and close down frames 
    if cv2.waitKey(1) & 0XFF == ord('q'):
        break
# release webcam
cap.release()
# close image show frame
cv2.destroyAllWindows()


In [None]:
anchor = tf.data.Dataset.list_files(ANC_PATH+'\*.jpg').take(300)
positive = tf.data.Dataset.list_files(POS_PATH+'\*.jpg').take(300)
negative = tf.data.Dataset.list_files(NEG_PATH+'\*.jpg').take(300)

In [None]:
# preprocessing - scale and resize 
def preprocess(file_path): 

    # read in image from file path 
    byte_img = tf.io.read_file(file_path)
    # Load in the image
    img = tf.io.decode_jpeg(byte_img)

    # Preprocessing steps - resize img to 100x100x3
    img = tf.image.resize(img, (100,100))
    # scale image to be between 0 and 1
    img = img / 255.0

    return img

In [None]:
# Create labelled dataset 
positives = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data = positives.concatenate(negatives)

In [None]:
# build train and test sets 
def preprocess_twin(input_img, validation_img, label): 
    return(preprocess(input_img), preprocess(validation_img), label)

In [None]:
# Build dataloader pipeline 
data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size=1024)

In [None]:
# Training partitition 
train_data = data.take(round(len(data)*.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

In [None]:
train_data

In [None]:
# Testing set 
test_data = data.skip(round(len(data)*.7))
test_data = test_data.take(round(len(data)*.3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)


Build Embedding Layer

In [None]:
def make_embedding(): 
    inp = Input(shape=(100,100,3), name='input_image')

    # first block 
    conv1 = Conv2D(64, (10,10), activation='relu')(inp)
    max1 = MaxPooling2D(64, (2, 2), padding='same')(conv1)

    # second block 
    conv2 = Conv2D(128, (7,7), activation='relu')(max1)
    max2 = MaxPooling2D(64, (2,2), padding='same')(conv2)

    # third block 
    conv3 = Conv2D(128, (4,4), activation='relu')(max2)
    max3 = MaxPooling2D(64, (2,2), padding='same')(conv3)

    # final embedding block 
    conv4 = Conv2D(256, (4, 4), activation='relu')(max3)
    flat1 = Flatten()(conv4)
    dense1 = Dense(4096, activation='sigmoid')(flat1)

    return Model(inputs=[inp], outputs=[dense1], name='embedding')


In [None]:
embedding = make_embedding()
embedding.summary()

Build Distance Layer

In [None]:
# siamese L1 distance class 
class L1Dist(Layer): 

    # init method
    def __init__(self, **kwargs): 
        super().__init__()

    # similarity calculation 
    def call(self, input_embedding, validation_embedding): 
        return tf.math.abs(input_embedding - validation_embedding)

In [None]:
l1 = L1Dist()


Make the Siamese Model

In [None]:
def make_siamese_model(): 

    # anchor image input in network
    input_image = Input(name='input_img', shape=(100, 100, 3))

    # validation image in network 
    validation_image = Input(name='validation_img', shape=(100,100,3))

    # Combine siamese distance components
    siamese_layer = L1Dist()
    siamese_layer._name = 'distance'
    distances = siamese_layer(embedding(input_image), embedding(validation_image))

    # classification layer 
    # are these 2 images similar enough to be considered same person?
    classifier = Dense(1, activation='sigmoid')(distances)

    return Model(inputs=[input_image, validation_image], outputs=classifier, name='SiameseNetwork')

In [None]:
siamese_model = make_siamese_model()

In [None]:
siamese_model.summary()

Training 

In [None]:
# Define loss
binary_cross_loss = tf.losses.BinaryCrossentropy()

In [None]:
# define optimizer and learning rate
lr = 1e-4
optimizer = tf.keras.optimizers.Adam(lr)

In [None]:
# establish checkpoints 
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(opt=optimizer, siamese_model=siamese_model)


In [None]:
# Training step 
@tf.function
def train_step(batch): 
    #Record all of the operations
    with tf.GradientTape() as tape: 
        #get anchor and positive/negative image
        X = batch[:2]
        # get label 
        y = batch[2]

        # forward pass 
        yhat = siamese_model(X, training=True) 
        # calculate loss
        loss = binary_cross_loss(y, yhat)
    print(loss)

    #calculate gradients 
    grad = tape.gradient(loss, siamese_model.trainable_variables)

    #calculate updated weights and apply to siamese model 
    optimizer.apply_gradients(zip(grad, siamese_model.trainable_variables))

    # return loss
    return loss

In [None]:
# import metric calculations 
from tensorflow.keras.metrics import Precision, Recall 

In [None]:
# Build training loop 
def train(data, EPOCHS): 
    # loop through epochs
    for epoch in range(1, EPOCHS+1): 
        print('\n Epoch {}/{}'.format(epoch, EPOCHS))
        progbar = tf.keras.utils.Progbar(len(data))

        # Create a metric object 
        r = Recall()
        p = Precision()
    # loop through each batch 
    for idx, batch in enumerate(data): 
        # run train step 
        loss = train_step(batch)
        yhat = siamese_model.predict(batch[:2])
        r.update_state(batch[2], yhat)
        p.update_state(batch[2], yhat)
        progbar.update(idx+1)
    print(loss.numpy, r.result().numpy(), p.result().numpy())

    # save checkpoints
    if epoch % 10 == 0:
        checkpoint.save(file_prefix=checkpoint_prefix)

In [None]:
# Set number of epochs
EPOCHS = 50

In [None]:
# Train the model 
train(train_data, EPOCHS)

In [None]:
# Get a batch of test data
test_input, test_val, y_true = test_data.as_numpy_iterator().next()

In [None]:
# make predictions 
y_hat = siamese_model.predict([test_input, test_val])

In [None]:
# Post processing the results to get answers of either 0 for images not of same person and 1 for images of same person 
[1 if prediction > 0.5 else 0 for prediction in y_hat ]

In [None]:
y_true # compare to true labels 

Calculate Metrics

In [None]:

r = Recall()
p = Precision()

for test_input, test_val, y_true in test_data.as_numpy_iterator():
    yhat = siamese_model.predict([test_input, test_val])
    r.update_state(y_true, yhat)
    p.update_state(y_true,yhat) 

print(r.result().numpy(), p.result().numpy())



Save model 

In [None]:
# Save model weights
siamese_model.save('siamesemodelv2.h5')



In [None]:
# Reload model 
siamese_model = tf.keras.models.load_model('siamesemodelv2.h5', 
                                   custom_objects={'L1Dist':L1Dist, 'BinaryCrossentropy':tf.losses.BinaryCrossentropy})

In [None]:
# Make predictions with reloaded model
siamese_model.predict([test_input, test_val])

In [None]:
siamese_model.summary()