# 1. Setup

### 1.1 Install Dependencies

In [None]:
# %pip install tensorflow==2.4.1 tensorflow-gpu==2.4.1 opencv-python matplotlib

### 1.2 Import Dependencies

In [None]:
# Import standard dependencies
import cv2
import os
import random
import numpy as np
import matplotlib.pyplot as plt

In [None]:
# Import  tensorflow dependencies - Function API
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten
import tensorflow as tf


### 1.3 Set GPU Growth

In [None]:
# Avoid OOM errors by settings GPU Memory Consumption Growth
gpus = tf.config.experimental.list_physical_devices('GPU')
print(gpus)

for gpu in gpus:
  tf.config.experimental.set_memory_growth(gpu, True)

### 1.4 Create Folder Structures

In [None]:
# Setup Folder and Path
POS_PATH = os.path.join('data', 'positive')
NEG_PATH = os.path.join('data', 'negative')
ANC_PATH = os.path.join('data', 'anchor')

In [None]:
# Make the directories
os.makedirs(POS_PATH)
os.makedirs(NEG_PATH)
os.makedirs(ANC_PATH)

# 2. Collect Positive and Anchors

### 2.1 Untar Labelled Faces in the Wild Dataset

In [None]:
# http://vis-www.cs.umass.edu/lfw/

In [None]:
# Uncompress Tar GZ Labelled Faces in the Wild Dataset
# !tar -xf lfw.tgz

In [None]:
# Move LFW Images to the following repository data/negative
for directory in os.listdir('lfw'):
  for file in os.listdir(os.path.join('lfw', directory)):
    EX_PATH = os.path.join('lfw', directory, file)
    NEW_PATH = os.path.join(NEG_PATH, file)
    os.replace(EX_PATH, NEW_PATH)

### 2.2 Collect Positive and Anchor Classes

In [None]:
# Import uuid libray to generate unique image name
import uuid

In [None]:
str(uuid.uuid1())

In [None]:
# Establish a connection to the webcam
cap = cv2.VideoCapture(0)
while cap.isOpened():
  _, frame = cap.read()
  # Cut down frame to 250x250px
  frame = frame[120:120+250, 200:200+250, :]

  # Collect anchors
  if cv2.waitKey(1) & 0xFF == ord('a'):
    imgname = os.path.join(ANC_PATH, '{}.jpg'.format(str(uuid.uuid1())))
    cv2.imwrite(imgname, frame)

  # Collect positive
  if cv2.waitKey(1) & 0xFF == ord('p'):
    imgname = os.path.join(POS_PATH, '{}.jpg'.format(str(uuid.uuid1())))
    cv2.imwrite(imgname, frame)

  cv2.imshow('image Collection', frame)
  
  if cv2.waitKey(1) & 0xFF == ord('q'):
    break

cap.release()
cv2.destroyAllWindows()

### 2.x NEW - Data Augmetation

In [None]:
def data_aug(img):
  data = []
  for i in range(9):
    img = tf.image.stateless_random_brightness(img, max_delta=0.02, seed=(1,2))
    img = tf.image.stateless_random_contrast(img, lower=0.6, upper=1, seed=(1,3))
    img = tf.image.stateless_random_flip_left_right(img, seed=(np.random.randint(100), np.random.randint(100)))
    img = tf.image.stateless_random_jpeg_quality(img, min_jpeg_quality=90, max_jpeg_quality=100, seed=(np.random.randint(100), np.random.randint(100)))
    img = tf.image.stateless_random_saturation(img, lower=0.9, upper=1, seed=(np.random.randint(100), np.random.randint(100)))
    
    data.append(img)

  return data

In [None]:
for file_name in os.listdir(os.path.join(ANC_PATH)):
  img_path = os.path.join(ANC_PATH, file_name)
  img = cv2.imread(img_path)
  augmented_images = data_aug(img)

  for image in augmented_images:
    cv2.imwrite(os.path.join(ANC_PATH, '{}.jpg'.format(uuid.uuid1())), image.numpy())

# 3. Load and Preprocess Images

### 3.1 Get Images Directories

In [None]:
print('total items on anchor folder: {}'.format(len([name for name in os.listdir(os.path.join('data', 'anchor'))])))
print('total items on positive folder: {}'.format(len([name for name in os.listdir(os.path.join('data', 'positive'))])))
print('total items on negative folder: {}'.format(len([name for name in os.listdir(os.path.join('data', 'negative'))])))

In [None]:
anchor = tf.data.Dataset.list_files(ANC_PATH + '\*.jpg').take(3000)
positive = tf.data.Dataset.list_files(POS_PATH + '\*.jpg').take(3000)
negative = tf.data.Dataset.list_files(NEG_PATH + '\*.jpg').take(3000)

In [None]:
dir_test = anchor.as_numpy_iterator()

In [None]:
dir_test.next()

### 3.2 Preprocessing - Scale and Resize

In [None]:
def preprocess(file_path):
  # Read in image from file path
  bytes_img = tf.io.read_file(file_path)
  # Load in the image
  img = tf.io.decode_jpeg(bytes_img)
  # Preprocessing steps - resizing the image to be 100x100x3
  img = tf.image.resize(img, (100,100))
  # Scale image to be between 0 and 1
  img = img / 255.0
  return img

In [None]:
img = preprocess('data\\anchor\\6cdceddc-796f-11ed-be01-b07b25a8233d.jpg')

In [None]:
img.numpy().max()

In [None]:
plt.imshow(img)

### 3.3 Create Labelled Dataset

In [None]:
# (anchor, positive) => 1,1,1,1,1
# (anchor, negative) => 0,0,0,0,0
positives = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data = positives.concatenate(negatives)

In [None]:
data

In [None]:
samples = data.as_numpy_iterator()

In [None]:
samples.next()

### 3.4 Build Train and Test Partition

In [None]:
def preprocess_twin(input_img, validation_img, label):
  return (preprocess(input_img), preprocess(validation_img), label)

In [None]:
res = preprocess_twin(*samples.next())

In [None]:
plt.imshow(res[1])

In [None]:
# Builder dataloader pipeline
data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size=10000)

In [None]:
# Training partition
train_data = data.take(round(len(data) *.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

In [None]:
# Testing Partition
test_data = data.skip(round(len(data) *.7))
test_data = test_data.take(round(len(data) *.3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

# 4. Model Engineering

### 4.1 Build Embedding Layer

In [None]:
inp = Input(shape=(105,105,3), name='input_image')
inp

In [None]:
c1 = Conv2D(64, (10,10), activation='relu')(inp)
c1

In [None]:
m1 = MaxPooling2D(64, (2,2), padding='same')(c1)
m1

In [None]:
c2 = Conv2D(128, (7,7), activation='relu')(m1)
c2

In [None]:
m2 = MaxPooling2D(64, (2, 2), padding='same')(c2)
m2

In [None]:
c3 = Conv2D(128, (4,4), activation='relu')(m2)
c3

In [None]:
m3 = MaxPooling2D(64, (2,2), padding='same')(c3)
m3

In [None]:
c4 = Conv2D(256, (4,4), activation='relu')(m3)
c4

In [None]:
f1 = Flatten()(c4)
f1

In [None]:
d1 = Dense(4096, activation='sigmoid')(f1)
d1

In [None]:
model_example = Model(inputs=[inp], outputs=[d1], name='embedding')
model_example.summary()

In [None]:
def make_embedding():
  inp = Input(shape=(100,100,3), name='input_image')
  
  # First Block
  c1 = Conv2D(64, (10,10), activation='relu')(inp)
  m1 = MaxPooling2D(64, (2,2), padding='same')(c1)

  # Second Block
  c2 = Conv2D(128, (7,7), activation='relu')(m1)
  m2 = MaxPooling2D(64, (2, 2), padding='same')(c2)

  # Third Block
  c3 = Conv2D(128, (4,4), activation='relu')(m2)
  m3 = MaxPooling2D(64, (2,2), padding='same')(c3)

  # Final embedding block
  c4 = Conv2D(256, (4,4), activation='relu')(m3)
  f1 = Flatten()(c4)
  d1 = Dense(4096, activation='sigmoid')(f1)

  return Model(inputs=[inp], outputs=[d1], name='embedding')

In [None]:
embedding = make_embedding()

In [None]:
embedding.summary()

### 4.2 Build Distance Layer

In [None]:
# Siamese L1 Distance class
class L1Dist(Layer):
  # Init method - inheritance
  def __init__(self, **kwargs):
    super().__init__()
  
  # Magic happens here - similarity calculation
  def call(self, input_embedding, validation_embedding):
    return tf.math.abs(input_embedding - validation_embedding)

In [None]:
l1 = L1Dist()

### 4.3 Make Siamese Model

In [None]:
def make_siamese_model():
  # Anchor image input in the network
  input_image = Input(name='input_img', shape=(100,100,3))
  
  # Validation image in the network
  validation_image = Input(name='validation_img', shape=(100,100,3))

  # Combine siamese distance components
  siamese_layer = L1Dist()
  siamese_layer._name = 'distance'
  distances = siamese_layer(embedding(input_image), embedding(validation_image))

  # classificatin layer
  classifier = Dense(1, activation='sigmoid')(distances)

  return Model(inputs=[input_image, validation_image], outputs=classifier, name='SiameseNetwork')
  

In [None]:
siamese_model = make_siamese_model()

In [None]:
siamese_model.summary()

# 5. Training

### 5.1 Setup Loss and Optimizer

In [None]:
binary_cross_loss = tf.losses.BinaryCrossentropy()
opt = tf.keras.optimizers.Adam(1e-4) # 0.0001

### 5.2 Establish Checkpoints

In [None]:
# To load from the checkpoints you can use model.load(path_to_checkpoint)
# This will load the pre trained weights into the existings model

In [None]:
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(opt=opt, siamese_model=siamese_model)

### 5.3 Build Train Step Function

In [None]:
# The basic flow for training on one batch is as follows:
# 1. Make a prediction
# 2. Calculate loss
# 3. Derive gradients
# 4. Calculate new weights adn apply

@tf.function
def train_step(batch):

  # Record all of out operations
  with tf.GradientTape() as tape:
    # Get anchor and positive/negative image
    x = batch[:2]
    
    # Get Label
    y = batch[2]

    # forward pass
    yhat = siamese_model(x, training=True)

    # Calculate loss
    loss = binary_cross_loss(y, yhat)
  
  print(loss)
  
  # Calculate gradients
  grad = tape.gradient(loss, siamese_model.trainable_variables)

  # Calculate updated weights and apply to siamese model
  opt.apply_gradients(zip(grad, siamese_model.trainable_variables))
  return loss


# Each batch in the dataset is comprised of 16 samples, each of whice contains an anchor
# a positive or negative image and label

# The Optimizer here is calculating and propagating the new weights using
# Adam's optimisation algorithm, a variant of gradient descent

### 5.4 Build Training Loop

In [None]:
# Import Metric Calculations
from tensorflow.keras.metrics import Precision, Recall

In [None]:
# While the train_step function was focused on training for one batch
# the loop here will be used to iterate over every batch in the dataset

def train(data, EPOCHS):
  # Loop through epochs
  for epoch in range(1, EPOCHS + 1):
    print('\n Epoch {}/{}'.format(epoch, EPOCHS))
    progbar = tf.keras.utils.Progbar(len(data))

    # Creating a metric object
    r = Recall()
    p = Precision()

    # Loop through each batch
    for idx, batch in enumerate(data):
      # Run train step here
      loss = train_step(batch)
      yhat = siamese_model.predict(batch[:2])
      r.update_state(batch[2], yhat)
      p.update_state(batch[2], yhat)
      progbar.update(idx + 1)

    print(loss.numpy(), r.result().numpy(), p.result().numpy())
    
    # Save checkpoints
    if epoch % 10 == 0:
      checkpoint.save(file_prefix=checkpoint_prefix)

### 5.5 Train the Model

In [None]:
EPOCHS = 50

In [None]:
train(train_data, EPOCHS)

# 6. Evaluate Model

### 6.1 Import Metrics

In [None]:
# Import metric calculation

# Precison demonstrates what proportion of positive identifications were actually
# correct. Recall shows what proportion of actual positives were identified correctly
from tensorflow.keras.metrics import Precision, Recall

### 6.2 Make Predictions

In [None]:
# Get a batch of test data
test_input, test_val, y_true = test_data.as_numpy_iterator().next()

In [None]:
# Make predictions
y_hat = siamese_model.predict([test_input, test_val])
y_hat

In [None]:
# Post processing the results
[1 if prediction > 0.5 else 0 for prediction in y_hat]

In [None]:
y_true

### 6.3 Calculate Metrics

In [None]:
# Creating a metric object
m = Precision()

# Calculate recall value
m.update_state(y_true, y_hat)

# Return Recall Result
m.result().numpy()

In [None]:
# Creating a metric object
m = Recall()

# Calculate recall value
m.update_state(y_true, y_hat)

# Return Recall Result
m.result().numpy()

### 6.4 Viz Results

In [None]:
plt.figure(figsize=(18,8))

# Set first subplot
plt.subplot(1,2,1)
plt.imshow(test_input[0])

# Set second subplot
plt.subplot(1,2,2)
plt.imshow(test_val[0])
plt.show()

# 7. Save Model

In [None]:
# Save weights
siamese_model.save('siamese-model-v2.h5')

In [None]:
model = tf.keras.models.load_model('SiameseModel.h5', custom_objects={'L1Dist': L1Dist, 'BinaryCrossentropy': tf.losses.BinaryCrossentropy})

In [None]:
# The load_weights method can be used to load the weights also.
# The save_weights() method save only the weights, the save() method save
# optimizer state as well
model.predict([test_input, test_val])

In [None]:
# View Model Saved
model.summary()

# 8. Real Time Test

### 8.1 Verification Function

In [None]:
# You can use as many images as you like inside of the validadtion images folder
# Just keep in mind that more images will mean slower verification

def verify(model, detection_threshold, verification_threshold):
  # Build results array
  results = []
  for image in os.listdir(os.path.join('application_data', 'verification_images')):
    input_img = preprocess(os.path.join('application_data', 'input_image', 'input_image.jpg'))
    validation_img = preprocess(os.path.join('application_data', 'verification_images', image))

    # Make Predictions
    result = model.predict(list(np.expand_dims([input_img, validation_img], axis=1)))
    results.append(result)

  # Detection Threshold:  Metric above which a prediction is considered positive
  detection = np.sum(np.array(results) > detection_threshold)

  # Verification Threshold: Proportion of positive prediction / total positive samples
  verification = detection / len(os.listdir(os.path.join('application_data', 'verification_images')))
  verified = verification > verification_threshold

  return results, verified

  # We don't actually end up using the frame argument inside of the function.
  # I'll clean this up in a few minutes in case you were like Nick... what is this?

### 8.2 OpenCV Real Time Verification

In [None]:
cap = cv2.VideoCapture(0)
while cap.isOpened():
  _, frame = cap.read()

  # This is standard image slicing, we're slicing down the image from 460x640 to 250x250
  # This ensures out input image will have a consistant aspect ratio
  frame = frame[120:120+250, 200:200+250, :]

  cv2.imshow('Verification', frame)

  # Verification Trigger
  if cv2.waitKey(10) & 0xFF == ord('v'):
    # Save input image to application_data/input_image folder
    cv2.imwrite(os.path.join('application_data', 'input_image', 'input_image.jpg'), frame)

    # Run verification
    results, verified = verify(model, 0.9, 0.5)
    print(verified)

  if cv2.waitKey(1) & 0XFF == ord('q'):
    break

cap.release()
cv2.destroyAllWindows()

In [None]:
np.sum(np.squeeze(results) > 0.1)

In [None]:
cap.release()
cv2.destroyAllWindows()