In [21]:
import cv2
import os
import random
import numpy as np
from matplotlib import pyplot as plt
from tensorflow.keras.models import Model #type:ignore
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten #type:ignore
import tensorflow as tf


In [22]:
import uuid

In [23]:
cam = cv2.VideoCapture(0)
while cam.isOpened():
    returnvalue, frame = cam.read()
    frame = frame[115:365,235:485, :]

    cv2.imshow("Image", frame)

    #saving anchor images
    if cv2.waitKey(10) & 0XFF == ord('a'):
        imgname = os.path.join(os.path.join('data','anchor'),'{}.jpg'.format(uuid.uuid1()))
        cv2.imwrite(imgname,frame)

    #saving positive images
    if cv2.waitKey(10) & 0XFF == ord('p'):
        imgname = os.path.join(os.path.join('data','positive'),'{}.jpg'.format(uuid.uuid1()))
        cv2.imwrite(imgname,frame)

    #closing the image collection
    if cv2.waitKey(1) & 0XFF == ord('q'):
        break
    
cam.release()
cv2.destroyAllWindows()

In [24]:
frame.shape

(250, 250, 3)

In [25]:
anchor = tf.data.Dataset.list_files('data/anchor/*.jpg').take(200)
positive = tf.data.Dataset.list_files('data/positive/*.jpg').take(200)
negative = tf.data.Dataset.list_files('data/negative/*.jpg').take(200)

In [26]:
def preprocess(file_path):
    byte_img = tf.io.read_file(file_path)
    img = tf.io.decode_jpeg(byte_img)
    img = tf.image.resize(img, (105,105))
    img = img / 255.0
    return img

In [27]:
positives = tf.data.Dataset.zip(anchor,positive,tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor))))
negatives = tf.data.Dataset.zip(anchor,negative,tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor))))
data = positives.concatenate(negatives)

In [28]:
def preprocess_twin(input,validation,label):
    return(preprocess(input),preprocess(validation),label)

In [29]:
data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size = 512)

In [30]:
#training partition
train_data = data.take(round(len(data)*.8))
train_data = train_data.batch(2)
train_data = train_data.prefetch(1)

# Testing partition
test_data = data.skip(round(len(data)*.8))
test_data = test_data.take(round(len(data)*.2))
test_data = test_data.batch(2)
test_data = test_data.prefetch(1)



In [31]:
def make_embedding_model():
    inp = Input(shape=(105,105,3), name="image_input_layer")
    #Block 1
    c1 = Conv2D(64, (10,10), activation='relu')(inp)
    mp1 = MaxPooling2D(64, (2,2), padding='same')(c1)
    #Block 2
    c2 = Conv2D(128, (7,7), activation='relu')(mp1)
    mp2 = MaxPooling2D(64, (2,2), padding='same')(c2)
    #Block 3
    c3 = Conv2D(128, (4,4), activation='relu')(mp2)
    mp3 = MaxPooling2D(64, (2,2), padding='same')(c3)
    #Final Block
    c4 = Conv2D(256, (4,4), activation='relu')(mp3)
    f1 = Flatten()(c4)
    d = Dense(4096, activation='sigmoid')(f1)

    return Model(inputs=[inp] ,outputs=[d], name='embedding')

In [32]:
em_mod = make_embedding_model()
em_mod.summary()

In [33]:
class L1Dist(Layer):
    
    # Init method - inheritance
    def __init__(self, **kwargs):
        super().__init__()
       
    # Similarity calculation
    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding)

In [34]:
def make_siamese_model(): 
    
    input_image = Input(name='input_img', shape=(105,105,3))
    validation_image = Input(name='validation_img', shape=(105,105,3))
    
    emb_a = em_mod(input_image)
    emb_b = em_mod(validation_image)
    if isinstance(emb_a, list):
        emb_a = emb_a[0]
        emb_b = emb_b[0]
    
    # Combine siamese distance components
    siamese_layer = L1Dist()
    siamese_layer._name = 'distance'
    distances = siamese_layer(emb_a, emb_b)
    
    # Classification layer 
    c = Dense(1, activation='sigmoid')(distances)
    
    return Model(inputs=[input_image, validation_image], outputs=c, name='SiameseNetwork')

In [35]:
s_model = make_siamese_model()
s_model.summary()

## TRAINING


In [36]:
loss = tf.losses.BinaryCrossentropy()
adam = tf.optimizers.Adam(learning_rate=1e-6) #0.000001

In [37]:
check_dir = './checkpoints'
check_prefix = os.path.join(check_dir,'ckpt')
checkpoint = tf.train.Checkpoint(opt=adam, siamese_model= s_model)

In [38]:


@tf.function
def train_step(batch):
    
    with tf.GradientTape() as tape:     

        X = batch[:2]
        y_true = batch[2]
        
        pre_y = s_model(X, training=True)
        lossvalue = loss(y_true, pre_y)
    print(lossvalue)

    grad = tape.gradient(lossvalue, s_model.trainable_variables)
    
    adam.apply_gradients(zip(grad, s_model.trainable_variables))
        
    return lossvalue    



In [39]:
def train(d, EPOCHS):
    for epoch in range(1, EPOCHS+1):
        print('\n Epoch {}/{}'.format(epoch, EPOCHS))
        progbar = tf.keras.utils.Progbar(len(d))
        
        r = tf.keras.metrics.Recall()
        p = tf.keras.metrics.Precision()
        
        for idx, batch in enumerate(d):
            
            l = train_step(batch)
            yhat = s_model.predict(batch[:2], verbose=0)
            r.update_state(batch[2], yhat)
            p.update_state(batch[2], yhat) 
            progbar.update(idx+1)
        print(l.numpy(), r.result().numpy(), p.result().numpy())
        
        # Save checkpoints
        if epoch % 10 == 0: 
            checkpoint.save(file_prefix=check_prefix)


In [40]:
train(train_data, EPOCHS=10)


 Epoch 1/10
Tensor("binary_crossentropy/div_no_nan:0", shape=(), dtype=float32)
Tensor("binary_crossentropy/div_no_nan:0", shape=(), dtype=float32)
[1m160/160[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m69s[0m 426ms/step
0.6482251 0.36942676 0.73417723

 Epoch 2/10


2026-01-18 13:24:51.633256: I tensorflow/core/framework/local_rendezvous.cc:407] Local rendezvous is aborting with status: OUT_OF_RANGE: End of sequence


[1m 39/160[0m [32m━━━━[0m[37m━━━━━━━━━━━━━━━━[0m [1m52s[0m 436ms/step

KeyboardInterrupt: 

In [None]:
test_input, test_val, y_true = test_data.as_numpy_iterator().next()
y_pred = s_model.predict([test_input, test_val])
y_pred = [1 if y > 0.5 else 0 for y in y_pred]

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 467ms/step


In [None]:
print(y_pred)
print(y_true)

[0, 1, 1, 0, 1, 0, 0, 0, 0, 1, 0, 0, 1, 1, 0, 0]
[1. 1. 1. 0. 1. 0. 0. 0. 0. 1. 0. 0. 1. 1. 1. 0.]


In [None]:
r = tf.keras.metrics.Recall()
p = tf.keras.metrics.Precision()

for test_input, test_val, y_true in test_data.as_numpy_iterator():
    yhat = s_model.predict([test_input, test_val])
    r.update_state(y_true, yhat)
    p.update_state(y_true,yhat) 

print(r.result().numpy(), p.result().numpy())

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 527ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 506ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 491ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 499ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 498ms/step
0.7878788 1.0
