In [None]:
# Installing the dependencies
%pip install tensorflow-gpu tensorflow==2.9 opencv-python matplotlib

In [1]:
# Import the standard libraries
import cv2
import os
import random
import numpy as np
from matplotlib import pyplot as plt

In [2]:
# Import the Tensorflow dependencies - Functional API
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten
import tensorflow as tf

## Setup GPU Growth

In [3]:
# Avoid OOM (Out of Memory) errors by setting GPU Memory Consumption Growth
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)

## Create folder structure

In [4]:
# Setup paths
POS_PATH = os.path.join('data', 'positive')
NEG_PATH = os.path.join('data', 'negative')
ANC_PATH = os.path.join('data', 'anchor')

In [None]:
# Make directories
os.makedirs(POS_PATH)
os.makedirs(NEG_PATH)
os.makedirs(ANC_PATH)

In [None]:
os.removedirs

# Collect Positive and Anchors

## Untar Labelled Faces in the Wild Dataset

In [None]:
# Uncompress Tar GZ labelled Faces in the wild dataset
!tar -xf lfw.tgz

In [5]:
# Move LFW Images to the following repository data/negative
for directory in os.listdir('lfw'):
    for file in os.listdir(os.path.join('lfw', directory)):
        EX_PATH = os.path.join('lfw', directory, file)
        NEW_PATH = os.path.join(NEG_PATH, file)
        os.replace(EX_PATH, NEW_PATH)

## Collect Positive and Anchor Classes 

In [6]:
# Import the uuid library to generate unique image names
import uuid
# uuid - universally unique identifiers

In [None]:
'{}.jpg'.format(uuid.uuid1())

In [8]:
# Establish a connection to the webcam
cap = cv2.VideoCapture(0)
while cap.isOpened():
    ret, frame = cap.read()
    
    # Cut down the frame to 250x250 px
    frame = frame[120:120+250, 200:200+250, :]
    
    # Collect anchors
    if cv2.waitKey(1) & 0XFF == ord('a'):
        # Create the unique file path
        imgname = os.path.join(ANC_PATH, '{}.jpg'.format(uuid.uuid1()))
        # Write out anchor image
        cv2.imwrite(imgname, frame)
    
    # Collect positives
    if cv2.waitKey(1) & 0XFF == ord('p'):
        # Create the unique file path
        imgname = os.path.join(POS_PATH, '{}.jpg'.format(uuid.uuid1()))
        # Write out positive image
        cv2.imwrite(imgname, frame)
    
    # Show image back to screen
    cv2.imshow('Image Collection', frame)
    
    # Breaking gracefully
    if cv2.waitKey(1) & 0XFF == ord('q'):
        break
        
# Release the webcam
cap.release()
# Close the image show frame
cv2.destroyAllWindows()

In [None]:
plt.imshow(frame)

# Load and Preprocess images

## Get Image Directories

In [None]:
anchor = tf.data.Dataset.list_files(ANC_PATH+'\*.jpg').take(300)
positive = tf.data.Dataset.list_files(POS_PATH+'\*.jpg').take(300)
negative = tf.data.Dataset.list_files(NEG_PATH+'\*.jpg').take(300)

In [13]:
dir_test = anchor.as_numpy_iterator()

NameError: name 'anchor' is not defined

In [None]:
dir_test.next()

## Preprocessing - Scale and Resize

In [19]:
def preprocess(file_path):
    
    # Read in image from file path
    byte_img = tf.io.read_file(file_path)
    # Load in the image
    img = tf.io.decode_jpeg(byte_img)
    
    # Preprocessing steps - resizing the image to 100x100x3
    img = tf.image.resize(img, (100, 100))
    # Scale image between 0 and 1
    img = img / 255.0
    return img

In [20]:
img = preprocess('data\\anchor\\55c66f8c-1a77-11ed-9447-3ca067c97eb7.jpg')

In [None]:
plt.imshow(img)

## Create Labelled Dataset 

In [None]:
# (anchor, positive) => 1,1,1,1,1
# (anchor, negative) => 0,0,0,0,0

In [22]:
positives = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data = positives.concatenate(negatives)

In [23]:
samples = data.as_numpy_iterator()

In [24]:
examples = samples.next()

In [None]:
examples

## Build Train and Test Partitions

In [26]:
def preprocess_twin(input_img, validation_img, label):
    return(preprocess(input_img), preprocess(validation_img), label)

In [27]:
res = preprocess_twin(*examples)

In [None]:
plt.imshow(res[0])

In [None]:
res[2]

In [29]:
# Build dataloader pipeline
data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size=1024)

In [None]:
round(len(data)*.7)

In [31]:
# Training partition
train_data = data.take(round(len(data)*.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

In [32]:
train_samples = train_data.as_numpy_iterator()

In [33]:
train_sample = train_samples.next()

In [None]:
round(len(data)*.3)

In [35]:
# Testing partition
test_data = data.skip(round(len(data)*.7))
test_data = test_data.take(round(len(data)*.3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

# Model Engineering

## Build Embedding Layer

In [38]:
inp = Input(shape=(100,100,3), name='input_image')

In [39]:
c1 = Conv2D(64, (10,10), activation='relu')(inp)

In [None]:
c1

In [40]:
m1 = MaxPooling2D(64, (2,2), padding='same')(c1)

In [None]:
m1

In [41]:
c2 = Conv2D(128, (7,7), activation='relu')(m1)

In [None]:
c2

In [42]:
m2 = MaxPooling2D(64, (2,2), padding='same')(c2)

In [None]:
m2

In [43]:
c3 = Conv2D(128, (4,4), activation='relu')(m2)
m3 = MaxPooling2D(64, (2,2), padding='same')(c3)

In [None]:
c3

In [None]:
m3

In [44]:
c4 = Conv2D(256, (4,4), activation='relu')(m3)

In [None]:
c4

In [45]:
c4 = Conv2D(256, (4,4), activation='relu')(m3)
f1 = Flatten()(c4)
d1 = Dense(4096, activation='sigmoid')(f1)

In [None]:
c4

In [None]:
f1

In [None]:
d1

In [46]:
mod = Model(inputs=[inp], outputs=[d1], name='embedding')

In [None]:
mod.summary()

In [48]:
def make_embedding():
    inp = Input(shape=(100,100,3), name='input_image')
    
    # First block
    c1 = Conv2D(64, (10,10), activation='relu')(inp)
    m1 = MaxPooling2D(64, (2,2), padding='same')(c1)
    
    # Second block
    c2 = Conv2D(128, (7,7), activation='relu')(m1)
    m2 = MaxPooling2D(64, (2,2), padding='same')(c2)
    
    # Third block
    c3 = Conv2D(128, (4,4), activation='relu')(m2)
    m3 = MaxPooling2D(64, (2,2), padding='same')(c3)
    
    # Final layer
    c4 = Conv2D(256, (4,4), activation='relu')(m3)
    f1 = Flatten()(c4)
    d1 = Dense(4096, activation='sigmoid')(f1)
    
    return Model(inputs=[inp], outputs=[d1], name='embedding')

In [49]:
embedding = make_embedding()

In [None]:
embedding.summary()

## Build Distance Layer

In [51]:
# Siamese L1 Distance class
class L1Dist(Layer):
    
    # Init method - inheritance
    def __init__(self, **kwargs):
        super().__init__()
                
    # Similarity calculation        
    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding)

In [52]:
l1 = L1Dist()

In [None]:
l1

## Make Siamese Model

In [53]:
input_image = Input(name='input_img', shape=(100,100,3))
validation_image = Input(name='validation_img', shape=(100,100,3))

In [54]:
inp_embedding = embedding(input_image)
val_embedding = embedding(validation_image)

In [55]:
siamese_layer = L1Dist()

In [56]:
distances = siamese_layer(inp_embedding, val_embedding)

In [57]:
classifier = Dense(1, activation='sigmoid')(distances)

In [None]:
classifier

In [59]:
siamese_network = Model(inputs=[input_image, validation_image], outputs=classifier, name='SiameseNetwork')

In [None]:
siamese_network.summary()

In [61]:
def make_siamese_model():
    
    # Anchor image input in the network
    input_image = Input(name='input_img', shape=(100,100,3))
    
    # Validation image input in the network
    validation_image = Input(name='validation_img', shape=(100,100,3))
    
    # Combine siamese distance components
    siamese_layer = L1Dist()
    siamese_layer._name = 'distance'
    distances = siamese_layer(embedding(input_image), embedding(validation_image))
       
    # Classification layer
    classifier = Dense(1, activation='sigmoid')(distances)
    
    return Model(inputs=[input_image, validation_image], outputs=classifier, name='SiameseNetwork')

In [62]:
siamese_model = make_siamese_model()

In [None]:
siamese_model.summary()

# Training

## 1. Setup Loss and Optimizer

In [64]:
binary_cross_loss = tf.losses.BinaryCrossentropy()

In [65]:
opt = tf.keras.optimizers.Adam(1e-4)

## 2. Establish Checkpoints

In [66]:
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(opt=opt, siamese_model=siamese_model)

## 3. Build Train Step Function

In [67]:
test_batch = train_data.as_numpy_iterator()

In [68]:
batch_1 = test_batch.next()

In [69]:
X = batch_1[:2]

In [None]:
np.array(X).shape

In [71]:
y = batch_1[2]

In [None]:
y

In [73]:
@tf.function
def train_step(batch):
    
    # Record all of the operations
    with tf.GradientTape() as tape:
        # Get anchor and positive/negative image
        X = batch[:2]
        # Get label
        y = batch[2]
        
        # Forward pass
        yhat = siamese_model(X, training=True)
        # Calculate loss
        loss = binary_cross_loss(y, yhat)
    print(loss)
        
    # Calculate gradients
    grad = tape.gradient(loss, siamese_model.trainable_variables)
    
    # Calculate updated weights and apply to siamese model
    opt.apply_gradients(zip(grad, siamese_model.trainable_variables))
    
    # Return loss
    return loss

## 4. Build Training Loop

In [74]:
def train(data, EPOCHS):
    # Loop through epochs
    for epoch in range(1, EPOCHS+1):
        print('\n Epoch {}/{}'.format(epoch, EPOCHS))
        progbar = tf.keras.utils.Progbar(len(data))
        
        # Loop through each batch
        for idx, batch in enumerate(data):
            # Run train step
            train_step(batch)
            progbar.update(idx+1)
            
        # Save checkpoints
        if epoch % 10 == 0:
            checkpoint.save(file_prefix=checkpoint_prefix)

## 5. Train the Model

In [75]:
EPOCHS = 50

In [None]:
train(train_data, EPOCHS)

# Evaluate model

## 1. Import Metrics

In [76]:
# Import metric calculations
from tensorflow.keras.metrics import Precision, Recall

## 2. Make Predictions

In [121]:
# Get a batch of test data
test_input, test_val, y_true = test_data.as_numpy_iterator().next()

In [122]:
test_var = test_data.as_numpy_iterator().next()

In [None]:
# Make predictions
y_hat = siamese_model.predict([test_input, test_val])
y_hat

In [None]:
# Post processing the results
[1 if prediction > 0.5 else 0 for prediction in y_hat]

In [None]:
y_true

## 3. Calculate Metrics

In [None]:
# Creating a metric object
m = Recall()

# Calculating the recall value
m.update_state(y_true, y_hat)

# Recall result
m.result().numpy()

In [None]:
# Creating a metric object
m = Precision()

# Calculating the recall value
m.update_state(y_true, y_hat)

# Recall result
m.result().numpy()

## 4. Visualize results

In [None]:
# Set plot size
plt.figure(figsize=(10,8))

# Set first subplot
plt.subplot(1,2,1)
plt.imshow(test_input[2])

# Set second subplot
plt.subplot(1,2,2)
plt.imshow(test_val[2])
plt.show()

# Save Model

In [None]:
# Save weights
siamese_model.save('siamesemodel.h5')

In [None]:
# Reload model
model = tf.keras.models.load_model('siamesemodel.h5', 
                                   custom_objects={'L1Dist':L1Dist, 'BinaryCrossentropy':tf.losses.BinaryCrossentropy})

In [None]:
# Make predictions with reloaded model
model.predict([test_input, test_val])

In [None]:
# View model summary
model.summary()

# Real Time Test

## 1. Verification Function

In [None]:
os.listdir(os.path.join('application_data', 'verification_images'))

In [None]:
os.path.join('application_data', 'input_image', 'input_image.jpg')

In [None]:
for image in os.listdir(os.path.join('application_data', 'verification_images')):
    validation_img = preprocess(os.path.join('application_data', 'verification_images', image))
    print(validation_img)

In [95]:
def verify(model, detection_threshold, verification_threshold):
    # Build results array
    results = []
    for image in os.listdir(os.path.join('application_data', 'verification_images')):
        input_img = preprocess(os.path.join('application_data', 'input_image', 'input_image.jpg'))
        validation_img = preprocess(os.path.join('application_data', 'verification_images', image))
        
        result = model.predict(list(np.expand_dims([input_img, validation_img], axis=1)))
        results.append(result)
    
    # Dectection Threshold: Metric above which a prediction is considered positive
    detection = np.sum(np.array(results) > detection_threshold)
    
    # Verification Threshold: Proportion of positive predictions / total positive samples
    verification = detection / len(os.listdir(os.path.join('application_data', 'verification_images')))
    verified = verification > verification_threshold
    
    return results, verified

## 2. OpenCV Real Time Verification

In [None]:
cap = cv2.VideoCapture(0)
while cap.isOpened():
    ret, frame = cap.read()
    frame = frame[120:120+250, 200:200+250, :]
    
    cv2.imshow('verification', frame)
    
    # Verification trigger
    if cv2.waitKey(10) & 0xFF == ord('v'):
        # Save input image to application_data/input_image folder
        cv2.imwrite(os.path.join('application_data', 'input_image', 'input_image.jpg'), frame)
        # Run verification
        results, verified = verify(model, 0.9, 0.7)
        print(verified)
    
    if cv2.waitKey(10) & 0xFF == ord('q'):
        break
cap.release()
cv2.destroyAllWindows()

In [None]:
np.squeeze(results) > 0.5

In [None]:
np.sum(np.squeeze(results) > 0.5)