In [None]:
!pip install opencv-python

In [None]:
# import standerd libraries
import cv2
import numpy as np
import matplotlib.pyplot as plt
import os
import random

In [None]:
# import tensorflow 
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, MaxPooling2D, Dense, Input, Flatten
import tensorflow as tf

In [None]:
tf.__version__

In [None]:
# to avoid out of memory errors by settieng GPU consumption growth
gpus = tf.config.experimental.list_physical_devices("GPU")
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu,True)

In [None]:
# Setup paths
POS_PATH = os.path.join("data","psitive")
NEG_PATH = os.path.join("data","negaitive")
ANC_PATH = os.path.join("data","anchor")

In [None]:
# Create Folders
os.makedirs(POS_PATH)
os.makedirs(NEG_PATH)
os.makedirs(ANC_PATH)

# negaitive examples

In [None]:
#download Tar GZ Labelled faces in the wild datasets and unpacked it like this
!tar -xf lfw.tgz

In [None]:
# Move LFW images to the following folder data/negaitive
for directory in os.listdir("lfw"):
    for file in os.listdir(os.path.join("lfw", directory)):
        EX_PATH = os.path.join("lfw", directory, file)
        NEW_PATH = os.path.join(NEG_PATH, file)
        os.replace(EX_PATH, NEW_PATH)

# Positive and anchor classes

In [None]:
# import uuid to generate unique images name
import uuid

In [None]:
#connection to the webcam
cap = cv2.VideoCapture(0)
while cap.isOpened:
    ret, frame = cap.read()
    
    # set the hight and width of the frame
    frame = frame[100:350, 200:450, :]
    
    # collect anchirs
    if cv2.waitKey(1) & 0XFF == ord("a"):
        # Create a unique image path name
        imgname = os.path.join(ANC_PATH, f"{uuid.uuid1()}.jpg")
        # write our image anchor
        cv2.imwrite(imgname, frame)
    
    
    #collect positives
    if cv2.waitKey(1) & 0XFF == ord("p"):
        # Create a unique image path name
        imgname = os.path.join(POS_PATH, f"{uuid.uuid1()}.jpg")
        # write our image anchor
        cv2.imwrite(imgname, frame)
    
    #show image in the screen
    cv2.imshow("image collection",frame)
    
    if cv2.waitKey(1) & 0XFF == ord("q"):
        break
        
cap.release()
cv2.destroyAllWindows()

In [None]:
#get our image directories
anchor = tf.data.Dataset.list_files(ANC_PATH+"\*.jpg").take(400)
positive = tf.data.Dataset.list_files(POS_PATH+"\*.jpg").take(400)
negative = tf.data.Dataset.list_files(NEG_PATH+"\*.jpg").take(400)

In [None]:
def preprocess(file_path):
    # Read image
    img = tf.io.read_file(file_path)
    # turn our image to tensor
    img = tf.io.decode_image(img, expand_animations = False)
    # rescale our image to (105,105)
    img = tf.image.resize(img, (105,105))
    # normalize our image (between 0 and 1)
    img = img/255.
    return img

In [None]:
# Create our labelled Datasets
positives = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data = positives.concatenate(negatives)

In [None]:
len(data)

In [None]:
def preprocess_twin(input_img, validation_img, label):
    return (preprocess(input_img), preprocess(validation_img), label)

In [None]:
# preprocess data
data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size=1024)

In [None]:
len(data)

In [None]:
# calculate the split percentage
number_of_training_samples = round((len(data)*.7))
number_of_testing_samples = round((len(data)*.3))

In [None]:
# creating a train_data
train_data = data.take(number_of_training_samples)
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

In [None]:
# creating a test_data
test_data = data.skip(number_of_training_samples)
test_data = data.take(number_of_testing_samples)
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

# Start building the model

In [None]:
# Creating our model
def make_embedding():
    inp = Input(shape=(105,105,3), name="input_image")
    
    # First block
    conv1 = Conv2D(filters=64, kernel_size=10, activation="relu")(inp)
    mp1 = MaxPooling2D(pool_size=(2,2), padding="same")(conv1) 
    
    # Second block
    conv2 = Conv2D(filters=128, kernel_size=7, activation="relu")(mp1)
    mp2 = MaxPooling2D(pool_size=(2,2), padding="same")(conv2) 
    
    # Third block
    conv3 = Conv2D(filters=128, kernel_size=4, activation="relu")(mp2)
    mp3 = MaxPooling2D(pool_size=(2,2), padding="same")(conv3) 
    
    # Final block
    conv4 = Conv2D(filters=256, kernel_size=4, activation="relu")(mp3)
    f1 = Flatten()(conv4)
    d1 = Dense(4096, activation="sigmoid")(f1)
    
    return Model(inputs=[inp] , outputs=[d1] , name="embedding")

In [None]:
embedding = make_embedding()

In [None]:
embedding.summary()

In [None]:
# Build a Distancce Layer class
class L1Diist(Layer):
    def __init__(self, **kwargs):
        super().__init__()
        
    # this function activated when data passed to the layer - similarity calculation
    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding)

In [None]:
def make_siamese__model():
    
    # Anchor image input
    input_image = Input(name = "input_image", shape=(105,105,3))
    
    
    # Validation image in the network
    validation_image = Input(name = "validation_image", shape=(105,105,3))
    
    
    # Compine siiamese distance components
    
    siamese__layer = L1Diist()
    distances = siamese__layer(embedding(input_image), embedding(validation_image))
    
    
    # Clasification layer 
    classifier = Dense(1, activation="sigmoid")(distances)
    
    return Model(inputs=[input_image, validation_image] , outputs=classifier , name="SiameseNetwork")

In [None]:
siamese_model = make_siamese__model()

In [None]:
siamese_model.summary()

In [None]:
# Setup a loss function
binary_cross_loss = tf.losses.BinaryCrossentropy()  #  use from_logits = True whe you dont normalized your  data

In [None]:
# Defiine or optimizer
opt = tf.keras.optimizers.Adam(1e-4)

In [None]:
# Checkpoiint callback  ---- to reload the checkpoint you can use modell.load("path_to_checkpoint")
# this will load the pre-trained weights

checkpoint_dir = "./training__checkpoints"
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")

# what we want to save
checkpoint = tf.train.Checkpoint(opt = opt, siamese_model = siamese_model)

# Bulding a train function to train our model

In [None]:
#  Build train step function
@tf.function # we do this (@tf.function) to compile every thing under this decorator 
def train_step(batch):
    
    # We record all of or operations
    with tf.GradientTape() as tape:
        
        # Get anchor and positive/negative image
        X = batch[:2]
        
        #  Get Label
        y = batch[2]
        
        #Forward pass
        yhat = siamese_model(X, training = True)
        # Calculate the loss
        loss = binary_cross_loss(y, yhat)
    print(f"loss: {loss}")
        
    # Calculating gradient
    gred = tape.gradient(loss, siamese_model.trainable_variables )
    
    # Calculate updated weights and apply to siamese_model
    opt.apply_gradients(zip(gred, siamese_model.trainable_variables ))
        
    return loss

In [None]:
# Bullding our training loop
def train(data, EPOCHS):
    # loop through epochs 
    for epoch in range(1, EPOCHS+1):
        print(f"\nEpoch: {epoch}/{EPOCHS}")
        progpar = tf.keras.utils.Progbar(len(data))
        
        # loop through each batch
        for idx, batch in enumerate(data):
            
            # run train_step here
            train_step(batch)
            progpar.update(idx+1)

        # save our checkpoint
        if epoch % 10 == 0:
            checkpoint.save(file_prefix=checkpoint_prefix)

# train the model

In [None]:
EPOCH = 500

In [None]:
train(train_data, EPOCH)

In [None]:
# import metrics calculation to evalute out model
from tensorflow.keras.metrics import Precision, Recall

In [None]:
# get batch of test data
test_input, test_val, y_true = test_data.as_numpy_iterator().next()

In [None]:
y_hat = siamese_model.predict([test_input, test_val])

In [None]:
y_hat

In [None]:
# post-processing the result
q = [1 if pred > 0.5 else 0 for pred in y_hat]

In [None]:
y_true

In [None]:
# Creating a metrics object for Recall
m = Recall()

# Calculating the recall value
m.update_state(y_true, y_hat)

# Return the result of the recall
m.result().numpy() *100

In [None]:
# Creating a metrics object for Precision
m = Precision()

# Calculating the recall value
m.update_state(y_true, y_hat)

# Return the result of the recall
m.result().numpy() *100

# Visualize our prediction

In [None]:
index = 5
plt.figure(figsize=(15,15))
print(f"true label: {y_true[index]}, predicted label: {q[index]}")
# input image
plt.subplot(1,2,1)
plt.imshow(test_input[index])

# validation image
plt.subplot(1,2,2)
plt.imshow(test_val[index])
plt.show()

In [None]:
# Save the weights
siamese_model.save("siamese_model.h5")

In [None]:
# Reload the model
model = tf.keras.models.load_model("siamese_model.h5", 
                                   custom_objects={"L1Diist": L1Diist, "BinaryCrossentropy":tf.keras.losses.BinaryCrossentropy})

In [None]:
# view our loaded model summary
model.summary()

In [None]:
def verify(model, detection_threshold, verification_threshold):
    # Build results array
    results = []
    for image in os.listdir(os.path.join('application_data', 'verification_images')):
        input_img = preprocess(os.path.join('application_data', 'input_image', 'input_image.jpg'))
        validation_img = preprocess(os.path.join('application_data', 'verification_images', image))
        
        # Make Predictions 
        result = model.predict(list(np.expand_dims([input_img, validation_img], axis=1)), verbose = False)
        results.append(result)
    
    # Detection Threshold: Metric above which a prediciton is considered positive 
    detection = np.sum(np.array(results) > detection_threshold)
    
    # Verification Threshold: Proportion of positive predictions / total positive samples 
    verification = detection / len(os.listdir(os.path.join('application_data', 'verification_images'))) 
    verified = verification > verification_threshold
    
    return results, verified


In [None]:
# input iimage path
input_path = os.path.join("application_data", "input_image")

In [None]:
# OpenCV Real time verification
cap = cv2.VideoCapture(0)
while cap.isOpened:
    ret, frame = cap.read()
    
    # set the hight and width of the frame
    frame = frame[100:350, 200:450, :]
    
    cv2.imshow("verification:", frame)

    if cv2.waitKey(10) & 0xFF == ord("v"):        # Create a unique image path name
        imgname = os.path.join(input_path, "input_image.jpg")
        # write our image anchor
        cv2.imwrite(imgname, frame)
        
        # verification function
        result, verified = verify(model, 0.8, 0.7)
        
        if verified ==  True:
              print("verified")
        else:
              print("not verified")
        
        print(verified)
    
    if cv2.waitKey(10) & 0xFF == ord("q"):
        break

cap.release
cv2.destroyAllWindows()