In [1]:
!pip install tensorflow==2.12.0 opencv-python matplotlib



In [2]:
import os
import cv2
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten
from tensorflow.keras.losses import BinaryCrossentropy
from tensorflow.keras.optimizers import Adam
from tensorflow.data import Dataset

# Define directories
POS_PATH = os.path.join('data', 'positive')
NEG_PATH = os.path.join('data', 'negative')
ANC_PATH = os.path.join('data', 'anchor')

# Create directories if they don't exist
os.makedirs(POS_PATH, exist_ok=True)
os.makedirs(NEG_PATH, exist_ok=True)
os.makedirs(ANC_PATH, exist_ok=True)

In [3]:
import uuid

In [11]:
cap = cv2.VideoCapture(0)
while cap.isOpened():
    ret, frame = cap.read()
    frame = frame[35:35+250, 200:200+250, :]
    cv2.imshow('Image Collection', frame)
    
    if cv2.waitKey(1) & 0xFF == ord('a'):
        imgname = os.path.join(ANC_PATH, f'{uuid.uuid1()}.jpg')
        cv2.imwrite(imgname, frame)
    elif cv2.waitKey(1) & 0xFF == ord('p'):
        imgname = os.path.join(POS_PATH, f'{uuid.uuid1()}.jpg')
        cv2.imwrite(imgname, frame)
    elif cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()

In [12]:
anchor_data = tf.data.Dataset.list_files(os.path.join(ANC_PATH, '*.jpg')).take(300)
positive_data = tf.data.Dataset.list_files(os.path.join(POS_PATH, '*.jpg')).take(300)
negative_data = tf.data.Dataset.list_files(os.path.join(NEG_PATH, '*.jpg')).take(300)

def preprocess_tf(file_path):
    byte_img = tf.io.read_file(file_path)
    img = tf.image.decode_jpeg(byte_img)
    img = tf.image.resize(img, (105, 105))
    img = img / 255.0
    return img

positives = tf.data.Dataset.zip((anchor_data, positive_data, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor_data)))))
negatives = tf.data.Dataset.zip((anchor_data, negative_data, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor_data)))))
data_tf = positives.concatenate(negatives)
data_tf = data_tf.shuffle(buffer_size=1024)

data_count = len(positives) + len(negatives)
train_size = int(0.7 * data_count)

train_data = data_tf.take(train_size)
test_data = data_tf.skip(train_size)

# Apply preprocessing and other transformations
train_data = train_data.map(lambda anchor, img, label: (preprocess_tf(anchor), preprocess_tf(img), label))
test_data = test_data.map(lambda anchor, img, label: (preprocess_tf(anchor), preprocess_tf(img), label))

train_data = train_data.cache()
train_data = train_data.shuffle(buffer_size=1024)
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

test_data = test_data.cache()
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)


In [13]:
def build_embedding_layer():
    inp = Input(shape=(105, 105, 3), name='input_img')
    c1 = Conv2D(64, (10, 10), activation='relu')(inp)
    m1 = MaxPooling2D(2, padding='same')(c1)
    c2 = Conv2D(128, (7, 7), activation='relu')(m1)
    m2 = MaxPooling2D(2, padding='same')(c2)
    c3 = Conv2D(128, (4, 4), activation='relu')(m2)
    m3 = MaxPooling2D(2, padding='same')(c3)
    c4 = Conv2D(256, (4, 4), activation='relu')(m3)
    flat = Flatten()(c4)
    den = Dense(4096, activation='sigmoid')(flat)
    return Model(inputs=inp, outputs=den, name='embedding')

In [14]:
class L1Distance(Layer):
    def __init__(self, **kwargs):
        super().__init__()

    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding)

l1 = L1Distance()

def make_siamese_model(embedding_model):
    input_img = Input(name='input_img', shape=(105, 105, 3))
    validation_img = Input(name='validation_img', shape=(105, 105, 3))
    distances = l1(embedding_model(input_img), embedding_model(validation_img))
    classifier = Dense(1, activation='sigmoid')(distances)
    return Model(inputs=[input_img, validation_img], outputs=classifier, name='SiameseNeuralNetwork')

# Create the embedding model
embedding = build_embedding_layer()

# Create the siamese model using the embedding model
siamese_model = make_siamese_model(embedding)

# Training setup
binary_loss = BinaryCrossentropy()
opt = Adam(1e-4)

checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(opt=opt, siamese_model=siamese_model)

In [15]:
def train_step(batch):
    with tf.GradientTape() as tape:
        x = batch[:2]
        y = batch[2]
        yvector = siamese_model(x, training=True)
        loss = binary_loss(y, yvector)

    gradient = tape.gradient(loss, siamese_model.trainable_variables)
    opt.apply_gradients(zip(gradient, siamese_model.trainable_variables))
    return loss

def train_loop(data, epochs):
    for epoch in range(1, epochs + 1):
        print('\nEpoch {}/{}'.format(epoch, epochs))
        progressbar = tf.keras.utils.Progbar(len(data))

        for idx, batch in enumerate(data):
            loss = train_step(batch)
            progressbar.update(idx + 1, [('Loss', loss.numpy())])
        if epoch % 10 == 0:
            checkpoint.save(file_prefix=checkpoint_prefix)

EPOCHS = 100
train_loop(train_data, EPOCHS)


Epoch 1/100

Epoch 2/100

Epoch 3/100

Epoch 4/100

Epoch 5/100

Epoch 6/100

Epoch 7/100

Epoch 8/100

Epoch 9/100

Epoch 10/100

Epoch 11/100

Epoch 12/100

Epoch 13/100

Epoch 14/100

Epoch 15/100

Epoch 16/100

Epoch 17/100

Epoch 18/100

Epoch 19/100

Epoch 20/100

Epoch 21/100

Epoch 22/100

Epoch 23/100

Epoch 24/100

Epoch 25/100

Epoch 26/100

Epoch 27/100

Epoch 28/100

Epoch 29/100

Epoch 30/100

Epoch 31/100

Epoch 32/100

Epoch 33/100

Epoch 34/100

Epoch 35/100

Epoch 36/100

Epoch 37/100

Epoch 38/100

Epoch 39/100

Epoch 40/100

Epoch 41/100

Epoch 42/100

Epoch 43/100

Epoch 44/100

Epoch 45/100

Epoch 46/100

Epoch 47/100

Epoch 48/100

Epoch 49/100

Epoch 50/100

Epoch 51/100

Epoch 52/100

Epoch 53/100

Epoch 54/100

Epoch 55/100

Epoch 56/100

Epoch 57/100

Epoch 58/100

Epoch 59/100

Epoch 60/100

Epoch 61/100

Epoch 62/100

Epoch 63/100

Epoch 64/100

Epoch 65/100

Epoch 66/100

Epoch 67/100

Epoch 68/100

Epoch 69/100

Epoch 70/100

Epoch 71/100

Epoch 72/100




Epoch 100/100


In [50]:
siamese_model.save('siamesefaceidmodel.h5')





In [16]:
from tensorflow.keras.metrics import Precision, Recall

In [17]:
test_batch = next(iter(test_data))
test_input, test_val, y_true = test_batch
test_input = test_input.numpy()
test_val = test_val.numpy()
y_true = y_true.numpy()

In [21]:
predict = siamese_model.predict([test_input, test_val])
predict



array([[1.8772344e-05],
       [9.9791312e-01],
       [9.9996877e-01],
       [9.9999803e-01],
       [2.7994947e-05],
       [2.0443402e-04],
       [2.1766123e-04],
       [9.9634504e-01],
       [4.2643605e-04],
       [9.8830295e-01],
       [9.9976277e-01],
       [9.9967408e-01],
       [9.8407143e-01],
       [9.9985605e-01],
       [1.6597282e-06],
       [3.2699118e-05]], dtype=float32)

In [22]:
predict = np.array(predict)
predictions_binary = (predict > 0.5).astype(int)
predictions_list = predictions_binary.flatten().tolist()
predictions_list

[0, 1, 1, 1, 0, 0, 0, 1, 0, 1, 1, 1, 1, 1, 0, 0]

In [23]:
y_true

array([0., 1., 1., 1., 0., 0., 0., 1., 0., 1., 1., 1., 1., 1., 0., 0.],
      dtype=float32)

In [24]:
m = Precision()
m.update_state(y_true, predictions_list)
m.result().numpy()

1.0

In [25]:
m = Recall()
m.update_state(y_true, predictions_list)
m.result().numpy()

1.0

In [26]:
import matplotlib.pyplot as plt

In [27]:
model = tf.keras.models.load_model('siamesefaceidmodel.h5', custom_objects={'L1Distance': L1Distance, 'BinaryCrossentropy':tf.losses.BinaryCrossentropy})



In [28]:
model.predict([test_input, test_val])



array([[9.9966782e-01],
       [9.9965864e-01],
       [2.7980012e-01],
       [9.9999803e-01],
       [9.9921364e-01],
       [9.3288195e-01],
       [1.0000000e+00],
       [9.9997789e-01],
       [9.9994057e-01],
       [9.3153822e-01],
       [9.9973702e-01],
       [5.3218720e-08],
       [9.1975421e-01],
       [9.9999815e-01],
       [9.9999142e-01],
       [9.1079937e-06]], dtype=float32)

In [29]:
def verify(model, detection_threshold, verification_threshold):
    resultingdata = []
    for image in os.listdir(os.path.join('InfoStack', 'VerifiedImgs')):
        input_img = preprocess_tf(os.path.join('InfoStack', 'InputImgs', 'InputImgs.jpg'))
        valid_img = preprocess_tf(os.path.join('InfoStack', 'VerifiedImgs', image))
    
        results = model.predict(list(np.expand_dims([input_img, valid_img], axis=1)))
        resultingdata.append(results)
    
    detection = np.sum(np.array(resultingdata) > detection_threshold)
    verification = detection / len(os.listdir(os.path.join('InfoStack', 'VerifiedImgs')))
    verified = False
    if verification > verification_threshold:
        verified = True
    else:
        verified = False
    
    return resultingdata, verified


In [32]:
cap = cv2.VideoCapture(0)
while cap.isOpened():
    ret, frame = cap.read()
    frame = frame[60:60+250, 250:250+250, :]
    cv2.imshow('Verification', frame)
    
    if cv2.waitKey(10) & 0xFF == ord('v'):
        imgname = os.path.join('InfoStack', 'InputImgs', 'InputImgs.jpg')
        cv2.imwrite(imgname, frame)
        resultingdata, verified = verify(model, 0.5, 0.5)
        print(verified)
        
    if cv2.waitKey(10) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()

True


In [100]:
resultingdata

[array([[1.]], dtype=float32),
 array([[1.]], dtype=float32),
 array([[1.]], dtype=float32),
 array([[1.]], dtype=float32),
 array([[1.]], dtype=float32),
 array([[1.]], dtype=float32),
 array([[1.]], dtype=float32),
 array([[1.]], dtype=float32),
 array([[1.]], dtype=float32),
 array([[0.99999964]], dtype=float32),
 array([[1.]], dtype=float32),
 array([[0.99999994]], dtype=float32),
 array([[0.9999999]], dtype=float32),
 array([[0.94146734]], dtype=float32),
 array([[1.]], dtype=float32),
 array([[1.]], dtype=float32),
 array([[1.]], dtype=float32),
 array([[1.]], dtype=float32),
 array([[1.]], dtype=float32),
 array([[1.]], dtype=float32),
 array([[0.9999997]], dtype=float32),
 array([[0.9999998]], dtype=float32),
 array([[1.]], dtype=float32),
 array([[0.99999994]], dtype=float32),
 array([[0.99999994]], dtype=float32),
 array([[0.99999994]], dtype=float32),
 array([[0.99999994]], dtype=float32),
 array([[0.99999875]], dtype=float32),
 array([[1.]], dtype=float32),
 array([[1.]], d

In [102]:
np.sum(np.squeeze(resultingdata) > 0.5)

50