In [161]:
import cv2
import os
import random
import numpy as np 
import uuid
from tensorflow import keras
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten
import tensorflow as tf
from tensorflow.keras.metrics import Recall 
import time

# Passport Collection

In [162]:
class L1Dist(Layer):
    def __init__(self, **kwargs):
        super().__init__()
        
    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding)

In [163]:
siamese_model = tf.keras.models.load_model("siamesemodel_2.h5", custom_objects={"L1Dist":L1Dist, "BinaryCrossentropy":tf.losses.BinaryCrossentropy})



In [164]:
POS_PATH = os.path.join("application_data", "input_image")
NEG_PATH = os.path.join("application_data", "negative_images")
ANC_PATH = os.path.join("application_data", "verification_images")

In [165]:
# Establish a connection to the webcam
cap = cv2.VideoCapture(0)
while cap.isOpened(): 
    ret, frame = cap.read()
    # Cut down frame to 250x250px
    frame = frame[120:120+250,200:200+250, :]
    
    # Collect anchors 
    if cv2.waitKey(1) & 0XFF == ord("c"):
        for image in os.listdir(os.path.join("application_data", "input_image")):
            image_remove = os.path.join("application_data", "input_image", image)
            os.remove(image_remove)
        for i in range(300):
            # Create the unique file path 
            imgname = os.path.join(POS_PATH, "{}.jpg".format(uuid.uuid1()))
            # Write out anchor image
            cv2.imwrite(imgname, frame)
        break
    

    cv2.imshow("Image Collection", frame)
    
    if cv2.waitKey(1) & 0XFF == ord("q"):
        break
        
# Release the webcam
cap.release()
# Close the image show frame
cv2.destroyAllWindows()

In [166]:
anchor = tf.data.Dataset.list_files(ANC_PATH+"/*.jpg").take(300)
positive = tf.data.Dataset.list_files(POS_PATH+"/*.jpg").take(300)
negative = tf.data.Dataset.list_files(NEG_PATH+"/*.jpg").take(300)

In [167]:
def preprocess(file_path):
    
    # Read in image from file path
    byte_img = tf.io.read_file(file_path)
    # Load in the image 
    img = tf.io.decode_jpeg(byte_img)
    
    # Preprocessing steps - resizing the image to be 100x100x3
    img = tf.image.resize(img, (100,100))
    # Scale image to be between 0 and 1 
    img = img / 255.0
    
    # Return image
    return img

In [168]:
positives = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
data = positives.concatenate(negatives)

In [169]:
def preprocess_twin(input_img, validation_img, label):
    return(preprocess(input_img), preprocess(validation_img), label)

In [170]:
data = data.map(preprocess_twin)
data = data.cache()

In [171]:
# Testing partition
test_data = data.skip(round(len(data)*.7))
test_data = test_data.take(round(len(data)*.3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

In [172]:
test_input, test_validation, y_true = test_data.as_numpy_iterator().next()

2022-11-03 09:51:23.964753: W tensorflow/core/kernels/data/cache_dataset_ops.cc:856] The calling iterator did not fully read the dataset being cached. In order to avoid unexpected truncation of the dataset, the partially cached contents of the dataset  will be discarded. This can happen if you have an input pipeline similar to `dataset.cache().take(k).repeat()`. You should use `dataset.take(k).cache().repeat()` instead.


In [173]:
y_pred = siamese_model.predict([test_input, test_validation])
y_pred



array([[5.0116116e-01],
       [1.5514797e-04],
       [5.0116116e-01],
       [5.0116116e-01],
       [5.0116116e-01],
       [5.0116116e-01],
       [5.0116116e-01],
       [5.0116116e-01],
       [5.0116116e-01],
       [5.0116116e-01],
       [5.0116116e-01],
       [5.0116116e-01],
       [6.0729629e-01],
       [5.0116116e-01],
       [5.0116116e-01],
       [5.0116116e-01]], dtype=float32)

In [174]:
results = []
for prediction in y_pred:
    if prediction < 0.6:
        results.append(1)
    else:
        results.append(0)
results

[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1]

In [175]:
# Creating a metric object 
m = Recall()
# Calculating the recall value 
m.update_state(y_true, y_pred)
# Return Recall Result
m.result().numpy()

0.9375

In [176]:
if m.result().numpy() >= 0.875:
    print("True")
else:
    print("False")

True


# Live Image Collection

In [None]:
class L1Dist(Layer):
    def __init__(self, **kwargs):
        super().__init__()
        
    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding)

In [None]:
siamese_model = tf.keras.models.load_model("siamesemodel_2.h5", custom_objects={"L1Dist":L1Dist, "BinaryCrossentropy":tf.losses.BinaryCrossentropy})

In [None]:
POS_PATH = os.path.join("application_data", "input_image")
NEG_PATH = os.path.join("application_data", "negative_images")
ANC_PATH = os.path.join("application_data", "verification_images")

In [None]:
# Establish a connection to the webcam
cap = cv2.VideoCapture(0)
while cap.isOpened(): 
    ret, frame = cap.read()
    # Cut down frame to 250x250px
    frame = frame[120:120+250,200:200+250, :]

    time.sleep(10)
    # Collect anchors 
    for image in os.listdir(os.path.join("application_data", "input_image")):
        image_remove = os.path.join("application_data", "input_image", image)
        os.remove(image_remove)
    for i in range(300):
        # Create the unique file path 
        imgname = os.path.join(POS_PATH, '{}.jpg'.format(uuid.uuid1()))
        # Write out anchor image
        cv2.imwrite(imgname, frame)
    break
    
    # Show image back to screen
    cv2.imshow('Image Collection', frame)
    
    # Breaking gracefullyq
    if cv2.waitKey(1) & 0XFF == ord('q'):
        break

# Release the webcam
cap.release()
# Close the image show frame
cv2.destroyAllWindows()

In [None]:
anchor = tf.data.Dataset.list_files(ANC_PATH+"/*.jpg").take(300)
positive = tf.data.Dataset.list_files(POS_PATH+"/*.jpg").take(300)
negative = tf.data.Dataset.list_files(NEG_PATH+"/*.jpg").take(300)

In [None]:
positives = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
data = positives.concatenate(negatives)

In [None]:
# Build dataloader pipeline
data = data.map(preprocess_twin)
data = data.cache()

In [None]:
# Training partition
train_data = data.take(round(len(data)*.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

In [None]:
# Testing partition
test_data = data.skip(round(len(data)*.7))
test_data = test_data.take(round(len(data)*.3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

In [None]:
test_input, test_validation, y_true = test_data.as_numpy_iterator().next()

In [None]:
y_pred = siamese_model.predict([test_input, test_validation])

In [None]:
results = []
for prediction in y_pred:
    if prediction < 0.6:
        results.append(1)
    else:
        results.append(0)
print(results)
print(y_true)

In [None]:
# Creating a metric object 
m = Recall()
# Calculating the recall value 
m.update_state(y_true, y_pred)
# Return Recall Result
print(m.result().numpy())

if m.result().numpy() >= 0.875:
    print("True")
else:
    print("False")