Install Dependencies

In [None]:
%pip install opencv-python matplotlib

Import Dependencies

In [1]:
#import standard dependencies
import cv2
import os
import random
import numpy as numpy
from matplotlib import pyplot as plt

In [2]:
#Import tensorflow dependencies
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten
import tensorflow as tf

Set GPU Growth

In [3]:
#Avoid OOM errors by setting GPU Memory Consumption Growth
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)

In [4]:
#Setup Paths
POS_PATH = os.path.join('data', 'positive')
NEG_PATH = os.path.join('data', 'negative')
ANC_PATH = os.path.join('data', 'anchor')

In [None]:
#make directories
os.makedirs(POS_PATH)
os.makedirs(NEG_PATH)
os.makedirs(ANC_PATH)

Untar Labelled Faces in the Wild Dataset

In [None]:
#https://vis-www.cs.umass.edu/lfw/

In [None]:
#Uncompress Tar GZ labelled Face in the Wild Dataset
!tar -xf lfw.tgz

In [None]:
#Move LFW Images to the following repository data/negative
for directory in os.listdir('lfw'):
    for file in os.listdir(os.path.join('lfw', directory)):
        EX_PATH = os.path.join('lfw', directory, file)
        NEW_PATH = os.path.join(NEG_PATH, file)
        os.replace(EX_PATH, NEW_PATH)

Collect Positive and Anchor Classes

In [8]:
# Import uuid library to generate unique image names
import uuid

In [None]:
#Establish a connection to the webcam
cap = cv2.VideoCapture(0)
while cap.isOpened():
    ret, frame = cap.read()

    #Cut down frame to 250X250 pixels
    frame = frame[120:120+250, 250:250+250, :]

    #Collect anchors
    if cv2.waitKey(1) & 0xFF == ord('a'):
        # Create the unique file path
        imgname = os.path.join(ANC_PATH, '{}.jpg'.format(uuid.uuid1()))
        # Write out anchor image
        cv2.imwrite(imgname, frame)

    #Collect positives
    if cv2.waitKey(1) & 0XFF == ord('p'):
        #Create the unique file path
        imgname = os.path.join(POS_PATH, '{}.jpg'.format(uuid.uuid1()))
        #Write out anchor image
        cv2.imwrite(imgname, frame)
    #Show image back to screen
    cv2.imshow('Image Collection', frame)

    #Breaking out
    if cv2.waitKey(1) & 0XFF == ord('q'):
        break

# Release the webcam
cap.release()
# Close the image show frame
cv2.destroyAllWindows()

Image Augmentation

In [9]:
def data_aug(img):
    data = []
    for i in range(9):
        img = tf.image.stateless_random_brightness(img, max_delta=0.02, seed=(1,2))
        img = tf.image.stateless_random_contrast(img, lower=0.6, upper=1, seed=(1,3))
        # img = tf.image.stateless_random_crop(img, size=(20,20,3), seed=(1,2))
        img = tf.image.stateless_random_flip_left_right(img, seed=(numpy.random.randint(100),numpy.random.randint(100)))
        img = tf.image.stateless_random_jpeg_quality(img, min_jpeg_quality=90, max_jpeg_quality=100, seed=(numpy.random.randint(100),numpy.random.randint(100)))
        img = tf.image.stateless_random_saturation(img, lower=0.9,upper=1, seed=(numpy.random.randint(100),numpy.random.randint(100)))
            
        data.append(img)
    
    return data

In [10]:
for file_name in os.listdir(os.path.join(POS_PATH)):
    img_path = os.path.join(POS_PATH, file_name)
    img = cv2.imread(img_path)
    augmented_images = data_aug(img) 
    
    for image in augmented_images:
        cv2.imwrite(os.path.join(POS_PATH, '{}.jpg'.format(uuid.uuid1())), image.numpy())

In [11]:
for file_name in os.listdir(os.path.join(ANC_PATH)):
    img_path = os.path.join(ANC_PATH, file_name)
    img = cv2.imread(img_path)
    augmented_images = data_aug(img) 
    
    for image in augmented_images:
        cv2.imwrite(os.path.join(ANC_PATH, '{}.jpg'.format(uuid.uuid1())), image.numpy())

Get Image Directories

In [5]:
anchor = tf.data.Dataset.list_files(ANC_PATH+'\*.jpg').take(3000)
positive = tf.data.Dataset.list_files(POS_PATH+'\*.jpg').take(3000)
negative = tf.data.Dataset.list_files(NEG_PATH+'\*.jpg').take(3000)

Preprocessing- Scale and Resize

In [6]:
def preprocess(file_path):
    #Read in image from file path
    byte_img = tf.io.read_file(file_path)
    #Load in the image
    img = tf.io.decode_jpeg(byte_img)
    #Preprocessing steps - resizing the image to be 100X100X3
    img = tf.image.resize(img, (100,100))
    #Scale image to be between 0 and 1
    img = img/255.0
    return img

Create Labelled Dataset

In [7]:
positives = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data = positives.concatenate(negatives)

Build Train and Test Partition

In [8]:
def preprocess_twin(input_img, validation_img, label):
    return(preprocess(input_img), preprocess(validation_img), label)

In [9]:
#Build dataloader pipeline
data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size=10000)

In [10]:
# Training Partition
train_data = data.take(round(len(data)*.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

In [11]:
#Testing partition
test_data = data.skip(round(len(data)*.7))
test_data = test_data.take(round(len(data)*.3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

Build Embedding Layer

In [12]:
def make_embedding():
    inp = Input(shape=(100,100,3), name="input_image")
    
    #First Block
    c1 = Conv2D(64, (10,10), activation='relu')(inp)
    m1 = MaxPooling2D(64, (2,2), padding='same')(c1)

    #Second Block
    c2 = Conv2D(128, (7,7), activation='relu')(m1)
    m2 = MaxPooling2D(64, (2,2), padding='same')(c2)

    #Third Block
    c3 = Conv2D(128, (4,4), activation='relu')(m2)
    m3 = MaxPooling2D(64, (2,2), padding='same')(c3)

    # Final embedding block
    c4 = Conv2D(256, (4,4), activation='relu')(m3)
    f1 = Flatten()(c4)
    d1 = Dense(4096, activation='sigmoid')(f1)

    return Model(inputs=[inp], outputs=[d1], name='embedding')

In [13]:
embedding = make_embedding()
embedding.summary()

Model: "embedding"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_image (InputLayer)    [(None, 100, 100, 3)]     0         
                                                                 
 conv2d (Conv2D)             (None, 91, 91, 64)        19264     
                                                                 
 max_pooling2d (MaxPooling2D  (None, 46, 46, 64)       0         
 )                                                               
                                                                 
 conv2d_1 (Conv2D)           (None, 40, 40, 128)       401536    
                                                                 
 max_pooling2d_1 (MaxPooling  (None, 20, 20, 128)      0         
 2D)                                                             
                                                                 
 conv2d_2 (Conv2D)           (None, 17, 17, 128)       26

Build Distance Layer

In [14]:
#Siamese L1 Distance  class
class L1Dist(Layer):
    def __init__(self, **kwargs):
        super().__init__()

    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding-validation_embedding)
    

Make Siamese Model

In [15]:
def make_siamese_model():
    
    # Anchor image input in the network
    input_image = Input(name='input_img', shape=(100,100,3))

    # Validation image in the network
    validation_image = Input(name='validation_img', shape=(100,100,3))

    #Combine siamese distance components
    siamese_layer = L1Dist()
    siamese_layer._name = 'distance'
    distances = siamese_layer(embedding(input_image), embedding(validation_image))

    # Classification Layer
    classifier = Dense(1, activation='sigmoid')(distances)
    
    return Model(inputs=[input_image, validation_image], outputs=classifier, name='SiameseNetwork')

In [16]:
siamese_model = make_siamese_model()

In [17]:
siamese_model.summary()

Model: "SiameseNetwork"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_img (InputLayer)         [(None, 100, 100, 3  0           []                               
                                )]                                                                
                                                                                                  
 validation_img (InputLayer)    [(None, 100, 100, 3  0           []                               
                                )]                                                                
                                                                                                  
 embedding (Functional)         (None, 4096)         38960448    ['input_img[0][0]',              
                                                                  'validation_img[0][

Setup Loss and Optimizer

In [18]:
binary_cross_loss = tf.losses.BinaryCrossentropy()

In [19]:
opt = tf.keras.optimizers.Adam(1e-4)


Establish Checkpoints

In [20]:
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(opt = opt, siamese_model=siamese_model)

Build Train Step Function

In [21]:
@tf.function
def train_step(batch):
    #Record all of our operations
    with tf.GradientTape() as tape:
        # Get anchor and positive/negative image
        X = batch[:2]
        # Get label
        y = batch[2]
        #Forward pass
        yhat = siamese_model(X, training=True)
        #Calculate Loss
        loss = binary_cross_loss(y,yhat)
    print(loss)
    
    # Calculate gradients
    grad = tape.gradient(loss, siamese_model.trainable_variables)
    
    # Calculate updated weights and apply to siamese model
    opt.apply_gradients(zip(grad, siamese_model.trainable_variables))
    
    # Return loss
    return loss

Build Training Loop

In [22]:
from tensorflow.keras.metrics import Precision, Recall

In [23]:
def train(data, EPOCHS):
    # Loop through epochs
    for epoch in range(1, EPOCHS+1):
        print('\n Epoch {}/{}'.format(epoch, EPOCHS))
        progbar = tf.keras.utils.Progbar(len(data))

        # Create metric object
        r = Recall()
        p = Precision()

        # Loop through each batch
        for idx, batch in enumerate(data):
            # Run train step here
            loss = train_step(batch)
            yhat = siamese_model.predict(batch[:2])
            r.update_state(batch[2], yhat)
            p.update_state(batch[2], yhat)
            progbar.update(idx+1)
        print(loss.numpy(), r.result().numpy(), p.result().numpy())
        
        # Save checkpoints
        if epoch % 10 == 0:
            checkpoint.save(file_prefix=checkpoint_prefix)


Train the model

In [24]:
EPOCHS = 50

In [25]:
train(train_data, EPOCHS)


 Epoch 1/50
Tensor("binary_crossentropy/weighted_loss/value:0", shape=(), dtype=float32)
Tensor("binary_crossentropy/weighted_loss/value:0", shape=(), dtype=float32)
0.09965037 0.9153032 0.9963332

 Epoch 2/50
0.057448294 0.96877956 0.9932105

 Epoch 3/50
0.0032491134 0.96040076 0.99309325

 Epoch 4/50
4.0456785e-06 0.98913556 0.9957204

 Epoch 5/50
0.03035604 0.99522674 0.9980852

 Epoch 6/50
0.046046276 0.9896324 0.99904853

 Epoch 7/50
0.0018414147 0.9891253 0.9980916

 Epoch 8/50
0.015715038 0.9971698 1.0

 Epoch 9/50
0.10987586 0.997093 0.9956459

 Epoch 10/50
0.008148229 0.99715775 1.0

 Epoch 11/50
1.7361052e-05 0.9995252 1.0

 Epoch 12/50
0.07255856 0.999038 1.0

 Epoch 13/50
0.018431911 0.9962031 0.99809796

 Epoch 14/50
0.06337969 0.9980916 0.9980916

 Epoch 15/50
1.7517035e-05 0.9976404 0.9976404

 Epoch 16/50
0.0193998 0.99810064 0.9995245

 Epoch 17/50
0.0018162997 0.9995211 0.99904263

 Epoch 18/50
0.0008784774 0.99952173 0.9976134

 Epoch 19/50
0.06521976 0.997646 0.998

Restore from checkpoint

In [None]:
checkpoint_path = os.path.join(checkpoint_dir, 'ckpt-5')

In [None]:
checkpoint.restore(checkpoint_path)

Evaluate Model

In [27]:
test_input, test_val, y_true = test_data.as_numpy_iterator().next()

In [28]:
# Make predictions
y_hat = siamese_model.predict([test_input, test_val])
for i in range(len(y_hat )):
    print (1 if y_hat [i] > 0.5  else 0 ,y_true[i])

1 1.0
1 1.0
0 0.0
0 0.0
0 0.0
1 1.0
1 1.0
1 1.0
1 1.0
0 0.0
0 0.0
0 0.0
1 1.0
0 0.0
1 1.0
1 1.0


In [29]:
# Post processing the results
[1 if prediction > 0.5 else 0 for prediction in y_hat]

[1, 1, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0, 1, 0, 1, 1]

In [30]:
#Create a metric object
m= Recall()
m.update_state(y_true, y_hat)
m.result().numpy()

1.0

In [31]:
m= Precision()
m.update_state(y_true, y_hat)
m.result().numpy()

1.0

Save Model

In [40]:
siamese_model.save('siamesemodelv2.h5')



Load Model

In [47]:
model = tf.keras.models.load_model('siamesemodelv2.h5', custom_objects={'L1Dist': L1Dist, 'BinaryCrossentropy': tf.losses.BinaryCrossentropy})



In [50]:
model.summary()


Model: "SiameseNetwork"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_img (InputLayer)         [(None, 100, 100, 3  0           []                               
                                )]                                                                
                                                                                                  
 validation_img (InputLayer)    [(None, 100, 100, 3  0           []                               
                                )]                                                                
                                                                                                  
 embedding (Functional)         (None, 4096)         38960448    ['input_img[0][0]',              
                                                                  'validation_img[0][

Real Time Test

Verification Function

In [51]:
def verify(frame, model, detection_threshold, verification_threshold):
    # Build results array
    results = []
    for image in os.listdir(os.path.join('application_data', 'verification_images')):
        input_img = preprocess(os.path.join('application_data', 'input_image', 'input_image.jpg'))
        validation_img = preprocess(os.path.join('application_data', 'verification_images', image))        
        # Make Predictions
        result = model.predict(list(numpy.expand_dims([input_img, validation_img], axis = 1)))
        results.append(result)
    
    detection = numpy.sum(numpy.array(results) > detection_threshold)
    verification = detection / len(os.listdir(os.path.join('application_data', 'verification_images')))
    verified = verification > verification_threshold

    return results, verified
    # Detection Threshold: Metric above which  prediction is considered positive
    # Verification Threshold: Proportion of positive predictions / total positive samples


OpenCV Real Time Verification

In [53]:
#Establish a connection to the webcam
cap = cv2.VideoCapture(0)
while cap.isOpened():
    ret, frame = cap.read()
    frame = frame[120:120+250, 250:250+250, :]

    cv2.imshow('Verification', frame)
    # Verification trigger
    if(cv2.waitKey(10) & 0xFF == ord('v')):
        # Save input image to application/input_image folder
        cv2.imwrite(os.path.join('application_data', 'input_image', 'input_image.jpg'), frame)
        results, verified = verify(frame, model, 0.5,0.5)
        print(verified)
    if(cv2.waitKey(10) & 0xFF == ord('q')):
        break
cap.release()
cv2.destroyAllWindows()

False
True
False
False
False
True
True
True
False
True
True
False
False
True
False
