# 1. Setups

## 1.1 Install dependencies

In [130]:
!pip install tensorflow==2.8.0 opencv-python matplotlib



## 1.2 Import Dependencies

In [1]:
import cv2
import os
import random
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
import uuid

In [2]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten
import tensorflow as tf

## 1.3 Set Directory structure

In [17]:
ANCHOR_PATH = os.path.join('data','anchor')
POSITIVE_PATH = os.path.join('data','positive')
NEGATIVE_PATH = os.path.join('data','negative')

In [None]:
os.makedirs(ANCHOR_PATH)
os.makedirs(POSITIVE_PATH)
os.makedirs(NEGATIVE_PATH)

# 2 Collect Anchors and Positives

## 2.1 Get Labled Faces in wild dataset

In [None]:
# Download Dataset as .tgz file (~175mb)
# Linux command
!wget vis-www.cs.umass.edu/lfw/lfw.tgz
# macOS Command
# !curl -o foo.pdf 'http://server1.cyberciti.biz/foo.pdf'
# Windows Powershell Command
# !iwr -outf index.html https://superuser.com

In [None]:
!tar -xf lfw.tgz

In [None]:
for directory in os.listdir('lfw'):
    for file in os.listdir(os.path.join('lfw',directory)):
        CURR_PATH = os.path.join('lfw',directory,file)
        NEW_PATH = os.path.join(NEGATIVE_PATH,file)
        os.replace(CURR_PATH,NEW_PATH)

## 2.2 Collect OpenCV Images

In [None]:
cap = cv2.VideoCapture(0)

while cap.isOpened():
    ret, frame = cap.read()
    
    # Cut from to 250x250px
    frame = frame[100:100+250,200:200+250,:]
    
    
    # Capture positives
    if cv2.waitKey(1) & 0XFF == ord('p'):
        image_path = os.path.join(POSITIVE_PATH,'{}.jpeg'.format(uuid.uuid1()))
        cv2.imwrite(image_path,frame)
    
    
    # Capture anchors
    if cv2.waitKey(1) & 0XFF == ord('a'):
        image_path = os.path.join(ANCHOR_PATH,'{}.jpeg'.format(uuid.uuid1()))
        cv2.imwrite(image_path,frame)
    
        
    cv2.imshow('Image Collection', frame)
    
    # Quit the Capture
    if cv2.waitKey(1) & 0XFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()

# 3 Load + Process Images

## 3.1 Get Image Directories

In [29]:
tf.data.experimental.enable_debug_mode()

In [18]:
anchors = tf.data.Dataset.list_files(ANCHOR_PATH+'/*.jpeg').take(300)
positives = tf.data.Dataset.list_files(POSITIVE_PATH+'/*.jpeg').take(300)
negatives = tf.data.Dataset.list_files(NEGATIVE_PATH+'/*.jpg').take(300)

## 3.2 Preprocessing (Scale + Resize)

In [None]:
def preprocess(path):
    # Read Image from path
    byte_img = tf.io.read_file(path)
    # Load Image
    img = tf.io.decode_jpeg(byte_img)
    # Resize image to 100x100px
    img = tf.image.resize(img,(100,100))
    # Scale px values to 0-1
    img = img / 255.0
    return img

## 3.3 Create Labled Dataset

In [19]:
positives = tf.data.Dataset.zip((anchors,positives,tf.data.Dataset.from_tensor_slices(tf.ones(len(anchors)))))
negatives = tf.data.Dataset.zip((anchors,negatives,tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchors)))))
data = positives.concatenate(negatives)

## 3.4 Create Test Train Partition

In [20]:
def preprocess_twin(input_img, validation_img, label):
    return (preprocess(input_img),preprocess(validation_img),label)

In [21]:
# Data Loader Pipeline
data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size=1024)

In [22]:
# Train Data Partition
train_prop = .7
train_data = data.take(round(len(data)*train_prop))
train_data = train_data.batch(2)
train_data = train_data.prefetch(2)

In [23]:
# Test Data Partition
train_prop = .7
test_data = data.skip(round(len(data)*train_prop))
test_data = data.take(round(len(data)*(1-train_prop)))
test_data = test_data.batch(6)
test_data = test_data.prefetch(6)

# 4. Model Engineering

## 4.1 Embedding layer

In [64]:
def build_embeder():
    inp = Input(shape=(100,100,3),name='input_image')
    
    # Block 1
    c1 = Conv2D(64,(10,10),activation='relu')(inp)
    mp1 = MaxPooling2D(64,(2,2),padding='same')(c1)
    # Block 2
    c2 = Conv2D(128,(7,7),activation='relu')(mp1)
    mp2 = MaxPooling2D(64,(2,2),padding='same')(c2)
    # Block 3
    c3 = Conv2D(128,(4,4),activation='relu')(mp2)
    mp3 = MaxPooling2D(64,(2,2),padding='same')(c3)
    # Final Block
    c4 = Conv2D(256,(4,4),activation='relu')(mp3)
    fl1 = Flatten()(c4)
    d1 = Dense(4096,activation='sigmoid')(fl1)
    
    return Model(inputs=[inp], outputs=[d1] ,name='embedding')

In [65]:
embedding = build_embeder()

## 4.2 Distance Layer (Custom)

In [4]:
# Siamese Distance Class
class L1Dist(Layer):
    def __init__(self,**kwargs):
        super().__init__()
        
    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding)      

## 4.3 Siamese Model

In [67]:
def build_siamese_model():
    
    # Anchor Image
    input_image = Input(shape=(100,100,3), name='input_image')
    
    # Validation Image
    validation_image = Input(shape=(100,100,3), name='validation_image')
    
    # Combine siamese distance components
    siamese_layer = L1Dist()
    siamese_layer._name = 'distance'
    distances = siamese_layer(embedding(input_image), embedding(validation_image))
    
    # Classifiaction Layer
    classifier = Dense(1, activation='sigmoid', name='Classifier')(distances)
    
    return Model(inputs=[input_image, validation_image], outputs=[classifier],name="Siamese_Model")
    

In [68]:
siamese_model = build_siamese_model()

# 5 Model Training

## 5.1 Loss Function and Optimiser

In [69]:
binary_cross_loss = tf.keras.losses.BinaryCrossentropy()

In [70]:
opt = tf.keras.optimizers.Adam(1e-4)

## 5.2 Set Checkpount callbacks

In [71]:
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir,'ckpt')
checkpoint = tf.train.Checkpoint(opt=opt, model=siamese_model)

## 5.3 Train Step Function

In [73]:
# Allow Varible Creation
tf.config.run_functions_eagerly(True)

import time
@tf.function
def train_step(batch):
    with tf.GradientTape() as tape:
        s = time.time()
        # Get Anchor and Validation Image
        X = batch[:2]
        # Get Label
        y = batch[2]
        
        # Forward Pass
        yhat = siamese_model(X,training=True)
        # Calculate Loss
        loss = binary_cross_loss(y,yhat)
    # Calculate Graidents
    gradients = tape.gradient(loss,siamese_model.trainable_variables)
    # Calculate new wieghts and backprop
    opt.apply_gradients(zip(gradients, siamese_model.trainable_variables))
    return loss

## 5.4 Training loop

In [74]:
def train(data, EPOCHS):
    # Loop though Epochs
    for epoch in range(1,EPOCHS+1):
        print(f'Epoch {epoch}/{EPOCHS}')
        prog_bar = tf.keras.utils.Progbar(len(data))
    
        # Loop through batch
        for idx, batch in enumerate(data):
            # Train Step
            train_step(batch)
            prog_bar.update(idx+1)
            
        # Save Checkpoint
        if epoch % 5 == 0:
            checkpoint.save(file_prefix=checkpoint_prefix)

## 5.5 Train Model

In [75]:
EPOCHS = 50

In [None]:
train(train_data, EPOCHS)

In [None]:
siamese_model.save('siamese_model.h5')

# 6 Evauluate model

## 6.1 Load Model

In [5]:
model = tf.keras.models.load_model('./siamese_model.h5', custom_objects={'L1Dist':L1Dist, 'BinaryCrossentropy': tf.losses.BinaryCrossentropy})

2023-02-02 11:06:03.378834: W tensorflow/core/framework/cpu_allocator_impl.cc:82] Allocation of 150994944 exceeds 10% of free system memory.
2023-02-02 11:06:03.663417: W tensorflow/core/framework/cpu_allocator_impl.cc:82] Allocation of 150994944 exceeds 10% of free system memory.
2023-02-02 11:06:03.748601: W tensorflow/core/framework/cpu_allocator_impl.cc:82] Allocation of 150994944 exceeds 10% of free system memory.




2023-02-02 11:06:04.957426: W tensorflow/core/framework/cpu_allocator_impl.cc:82] Allocation of 150994944 exceeds 10% of free system memory.


## 6.2 Generate Metrics

In [24]:
from tensorflow.keras.metrics import Recall, Precision

In [25]:
r = Recall()
p = Precision()

In [26]:
for idx, sample in enumerate(test_data):
    X = sample[:2]
    y = sample[2]
    yhat = model(X,training=False)
    
    yhat = [1.0 if v > 0.5 else 0.0 for v in yhat]
    r.update_state(y,yhat)
    p.update_state(y,yhat)
    
    print(f'{idx/len(test_data) * 100}%', end='\r')
    
print(p.result().numpy())
print(r.result().numpy())

1.066666666666667%%
1.0


In [27]:
print(f'Precision: {p.result().numpy() * 100}')
print(f'Recall: {r.result().numpy() * 100}')

Precision: 100.0
Recall: 100.0


# 7 Intergrate with OpenCV (Optional)

## 7.1 Get all Functions and Model

In [7]:
import cv2
import os
import numpy as np
from tensorflow.keras.layers import Layer
import tensorflow as tf
import uuid

In [8]:
def preprocess(path):
    # Read Image from path
    byte_img = tf.io.read_file(path)
    # Load Image
    img = tf.io.decode_jpeg(byte_img)
    # Resize image to 100x100px
    img = tf.image.resize(img,(100,100))
    # Scale px values to 0-1
    img = img / 255.0
    return img

In [9]:
# Siamese Distance Class
class L1Dist(Layer):
    def __init__(self,**kwargs):
        super().__init__()
        
    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding)

In [10]:
def verify(model, detection_threshold, verifcation_threshold):
    results = []
    # Get all varification images
    for image in os.listdir(os.path.join('application_data','verification_images')):
        input_image = preprocess(os.path.join('application_data','input_image','input.jpg'))
        ver_image = preprocess(os.path.join('application_data','verification_images',image))
        # Use model to classify 
        res = model.predict(list(np.expand_dims([input_image,ver_image],axis=1)))
        results.append(res)
    # Detection theshold: Value over which is positive reuslt   
    detection = np.sum(np.array(results) > detection_threshold)
    # Verification Threshold: Number of positived required to verify
    verifcation = detection/len(results)
    verified = verifcation > verifcation_threshold
    return results, verified
        
        

In [12]:
model = tf.keras.models.load_model('./app/siam_model.h5', custom_objects={'L1Dist':L1Dist, 'BinaryCrossentropy': tf.losses.BinaryCrossentropy})

2023-02-02 16:02:14.466044: W tensorflow/core/framework/cpu_allocator_impl.cc:82] Allocation of 150994944 exceeds 10% of free system memory.




## 7.2 Verify App Window

In [13]:
cap = cv2.VideoCapture(0)

while cap.isOpened():
    ret, frame = cap.read()
    
    # Cut from to 250x250px
    frame = frame[100:100+250,200:200+250,:]
        
    cv2.imshow('Image Collection', frame)
    
    # Verification Command
    if cv2.waitKey(10) & 0XFF == ord('v'):
        cv2.imwrite(os.path.join('application_data','input_image','input.jpg'),frame)
        results, verfied = verify(model,detection_threshold=0.5, verifcation_threshold=0.5)
        print("Access Granted" if verfied else "Access Denied")
        print(list(results))


    # Quit the Capture
    if cv2.waitKey(10) & 0XFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()

QObject::moveToThread: Current thread (0x5cb1740) is not the object's thread (0x5c58a20).
Cannot move to target thread (0x5cb1740)

QObject::moveToThread: Current thread (0x5cb1740) is not the object's thread (0x5c58a20).
Cannot move to target thread (0x5cb1740)

QObject::moveToThread: Current thread (0x5cb1740) is not the object's thread (0x5c58a20).
Cannot move to target thread (0x5cb1740)

QObject::moveToThread: Current thread (0x5cb1740) is not the object's thread (0x5c58a20).
Cannot move to target thread (0x5cb1740)

QObject::moveToThread: Current thread (0x5cb1740) is not the object's thread (0x5c58a20).
Cannot move to target thread (0x5cb1740)

QObject::moveToThread: Current thread (0x5cb1740) is not the object's thread (0x5c58a20).
Cannot move to target thread (0x5cb1740)

QObject::moveToThread: Current thread (0x5cb1740) is not the object's thread (0x5c58a20).
Cannot move to target thread (0x5cb1740)

QObject::moveToThread: Current thread (0x5cb1740) is not the object's thread

Access Denied
[array([[0.01639169]], dtype=float32), array([[0.00059202]], dtype=float32), array([[0.00148708]], dtype=float32), array([[0.00036836]], dtype=float32), array([[0.03800222]], dtype=float32), array([[8.7479115e-05]], dtype=float32), array([[0.00195211]], dtype=float32), array([[0.00078848]], dtype=float32), array([[0.07538214]], dtype=float32), array([[0.00434613]], dtype=float32), array([[0.06118935]], dtype=float32), array([[0.00030798]], dtype=float32), array([[0.0008018]], dtype=float32), array([[0.00092262]], dtype=float32), array([[0.00013277]], dtype=float32), array([[0.00278163]], dtype=float32), array([[0.00061589]], dtype=float32), array([[0.00171682]], dtype=float32), array([[0.02875799]], dtype=float32), array([[0.00130644]], dtype=float32)]
