In [1]:
import cv2
import os
import random
import uuid
import numpy as np
from matplotlib import pyplot as plt

In [2]:
from tensorflow.keras.models import Model
from tensorflow.keras.metrics import Precision, Recall
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten
import tensorflow as tf

In [3]:
pos_path = "C:\\Users\\ameya\\OneDrive\\Desktop\\Project\\Data\\positive"
neg_path = "C:\\Users\\ameya\\OneDrive\\Desktop\\Project\\Data\\negative"
anc_path = "C:\\Users\\ameya\\OneDrive\\Desktop\\Project\\Data\\anchor"

In [4]:
cap = cv2.VideoCapture(0)
while cap.isOpened():
    ret, frame = cap.read()
    frame = frame[120:120+350,200:200+350, :]

    if cv2.waitKey(1) & 0XFF == ord('a'):

        imgname = os.path.join(anc_path, '{}.jpg'.format(uuid.uuid1()))
        cv2.imwrite(imgname, frame)

    if cv2.waitKey(1) & 0XFF == ord('p'):
        imgname = os.path.join(pos_path, '{}.jpg'.format(uuid.uuid1()))
        cv2.imwrite(imgname, frame)
    cv2.imshow('Image Collection', frame)
    if cv2.waitKey(1) & 0XFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()

In [5]:
positive = tf.data.Dataset.list_files(pos_path+'\\*.jpg').take(300)
anchor = tf.data.Dataset.list_files(anc_path+'\\*.jpg').take(300)
negative = tf.data.Dataset.list_files(neg_path+'\\*.jpg').take(300)

In [6]:
dir_test = anchor.as_numpy_iterator()

In [7]:
print(dir_test.next())

b'C:\\Users\\ameya\\OneDrive\\Desktop\\Project\\Data\\anchor\\161ecc7a-d3cd-11ef-b98f-c5d1e8c2b953.jpg'


In [8]:
def preprocess(file_path):
    byte_img = tf.io.read_file(file_path)
    img = tf.io.decode_jpeg(byte_img)
    img = tf.image.resize(img, (100,100))
    img = img / 255.0

    return img

In [9]:
positives = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data = positives.concatenate(negatives)
print(len(data))

600


In [10]:
samples = data.as_numpy_iterator()

In [11]:
exampple = samples.next()

In [12]:
exampple

(b'C:\\Users\\ameya\\OneDrive\\Desktop\\Project\\Data\\anchor\\111cc319-d3cd-11ef-8e23-c5d1e8c2b953.jpg',
 b'C:\\Users\\ameya\\OneDrive\\Desktop\\Project\\Data\\positive\\d191e8a9-d3cc-11ef-95b5-c5d1e8c2b953.jpg',
 1.0)

In [13]:
def preprocess_twin(input_img, validation_img, label):
    return(preprocess(input_img), preprocess(validation_img), label)

In [14]:
data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size=1024)

In [15]:
train_data = data.take(round(len(data)*.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)
train_data

<_PrefetchDataset element_spec=(TensorSpec(shape=(None, 100, 100, None), dtype=tf.float32, name=None), TensorSpec(shape=(None, 100, 100, None), dtype=tf.float32, name=None), TensorSpec(shape=(None,), dtype=tf.float32, name=None))>

In [16]:
test_data = data.skip(round(len(data)*.7))
test_data = test_data.take(round(len(data)*.3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)
test_data

<_PrefetchDataset element_spec=(TensorSpec(shape=(None, 100, 100, None), dtype=tf.float32, name=None), TensorSpec(shape=(None, 100, 100, None), dtype=tf.float32, name=None), TensorSpec(shape=(None,), dtype=tf.float32, name=None))>

In [17]:
inp = Input(shape=(100,100,3), name='input_image')

In [18]:
c1 = Conv2D(64, (10,10), activation='relu')(inp)

In [19]:
m1 = MaxPooling2D(64, (2,2), padding='same')(c1)

In [20]:
c2 = Conv2D(128, (7,7), activation='relu')(m1)
m2 = MaxPooling2D(64, (2,2), padding='same')(c2)

In [21]:
c3 = Conv2D(128, (4,4), activation='relu')(m2)
m3 = MaxPooling2D(64, (2,2), padding='same')(c3)

In [22]:
c4 = Conv2D(256, (4,4), activation='relu')(m3)
f1 = Flatten()(c4)
d1 = Dense(4096, activation='sigmoid')(f1)

In [23]:
mod = Model(inputs=[inp], outputs=[d1], name='embedding')

In [24]:
mod.summary()

In [25]:
def make_embedding():
    inp = Input(shape=(100,100,3), name='input_image')
    c1 = Conv2D(64, (10,10), activation='relu')(inp)
    m1 = MaxPooling2D(64, (2,2), padding='same')(c1)
    c2 = Conv2D(128, (7,7), activation='relu')(m1)
    m2 = MaxPooling2D(64, (2,2), padding='same')(c2)
    c3 = Conv2D(128, (4,4), activation='relu')(m2)
    m3 = MaxPooling2D(64, (2,2), padding='same')(c3)
    c4 = Conv2D(256, (4,4), activation='relu')(m3)
    f1 = Flatten()(c4)
    d1 = Dense(4096, activation='sigmoid')(f1)

    return Model(inputs=[inp], outputs=[d1], name='embedding')

In [26]:
embedding = make_embedding()

In [27]:
embedding.summary()

In [28]:
class L1Dist(Layer):
    def __init__(self, **kwargs):
        super().__init__()

    # def call(self, input_embedding, validation_embedding):
    #     return tf.math.abs(input_embedding - validation_embedding)

    def call(self, input_embedding, validation_embedding):
        # Extract tensors from lists if necessary
        if isinstance(input_embedding, list):
            input_embedding = input_embedding[0]
        if isinstance(validation_embedding, list):
            validation_embedding = validation_embedding[0]

        return tf.math.abs(input_embedding - validation_embedding)

In [29]:
input_image = Input(name='input_img', shape=(100,100,3))
validation_image = Input(name='validation_img', shape=(100,100,3))

In [30]:
inp_embedding = embedding(input_image)
val_embedding = embedding(validation_image)

In [31]:
siamese_layer = L1Dist()

In [32]:
distances = siamese_layer(inp_embedding, val_embedding)
classifier = Dense(1, activation='sigmoid')(distances)
classifier
# # Assuming 'embedding' is a Keras layer and 'input_image' and 'validation_image' are Keras tensors...
# inp_embedding = embedding(input_image)
# val_embedding = embedding(validation_image)

# # Ensure these are tensors, not lists
# inp_embedding = tf.convert_to_tensor(inp_embedding) # Convert to tensor if it's a list
# val_embedding = tf.convert_to_tensor(val_embedding) # Convert to tensor if it's a list

# distances = siamese_layer(inp_embedding, val_embedding)
# classifier = Dense(1, activation='sigmoid')(distances)
# classifier




<KerasTensor shape=(None, 1), dtype=float32, sparse=False, name=keras_tensor_21>

In [33]:
siamese_network = Model(inputs=[input_image, validation_image], outputs=classifier, name='SiameseNetwork')

In [34]:
siamese_network.summary()

In [35]:
def make_siamese_model():
    input_image = Input(name='input_img', shape=(100,100,3))
    validation_image = Input(name='validation_img', shape=(100,100,3))
    siamese_layer = L1Dist()
    siamese_layer._name = 'distance'
    distances = siamese_layer(embedding(input_image), embedding(validation_image))
    classifier = Dense(1, activation='sigmoid')(distances)

    return Model(inputs=[input_image, validation_image], outputs=classifier, name='SiameseNetwork')

In [36]:
siamese_model = make_siamese_model()

In [37]:
siamese_model.summary()

In [38]:
binary_cross_loss = tf.losses.BinaryCrossentropy()

In [39]:
optim = tf.keras.optimizers.Adam(1e-4)

In [40]:
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(opt=optim, siamese_model=siamese_model)

In [41]:
@tf.function
def train_step(batch):
    with tf.GradientTape() as tape:
        # X = batch[:2]
        X = [tf.io.read_file(file_path) for file_path in batch[:2]]
        y = batch[2]


        yhat = siamese_model(X, training=True)

        loss = binary_cross_loss(y, yhat)
    print(loss)
    grad = tape.gradient(loss, siamese_model.trainable_variables)
    opt.apply_gradients(zip(grad, siamese_model.trainable_variables))

    return loss

In [42]:
def train(data, EPOCHS):
    #loop through the epochs 
    for epoch in range(1, EPOCHS+1):
        print('\n Epoch {}/{}'.format(epoch,EPOCHS))
        progbar = tf.keras.utils.Progbar(len(data))

# loop through each batch

    for idx , batch in enumerate(data):
        # run train step here 
        train_step(batch)
        progbar.update(idx+1)

    # save checkpoints
    if epoch % 10 == 0 :
        checkpoint.save(file_prefix=checkpoint_prefix)

In [43]:
EPOCHS = 100


In [44]:
train(train_data, EPOCHS)


 Epoch 1/100

 Epoch 2/100

 Epoch 3/100

 Epoch 4/100

 Epoch 5/100

 Epoch 6/100

 Epoch 7/100

 Epoch 8/100

 Epoch 9/100

 Epoch 10/100

 Epoch 11/100

 Epoch 12/100

 Epoch 13/100

 Epoch 14/100

 Epoch 15/100

 Epoch 16/100

 Epoch 17/100

 Epoch 18/100

 Epoch 19/100

 Epoch 20/100

 Epoch 21/100

 Epoch 22/100

 Epoch 23/100

 Epoch 24/100

 Epoch 25/100

 Epoch 26/100

 Epoch 27/100

 Epoch 28/100

 Epoch 29/100

 Epoch 30/100

 Epoch 31/100

 Epoch 32/100

 Epoch 33/100

 Epoch 34/100

 Epoch 35/100

 Epoch 36/100

 Epoch 37/100

 Epoch 38/100

 Epoch 39/100

 Epoch 40/100

 Epoch 41/100

 Epoch 42/100

 Epoch 43/100

 Epoch 44/100

 Epoch 45/100

 Epoch 46/100

 Epoch 47/100

 Epoch 48/100

 Epoch 49/100

 Epoch 50/100

 Epoch 51/100

 Epoch 52/100

 Epoch 53/100

 Epoch 54/100

 Epoch 55/100

 Epoch 56/100

 Epoch 57/100

 Epoch 58/100

 Epoch 59/100

 Epoch 60/100

 Epoch 61/100

 Epoch 62/100

 Epoch 63/100

 Epoch 64/100

 Epoch 65/100

 Epoch 66/100

 Epoch 67/100

 Ep

TypeError: in user code:

    File "C:\Users\ameya\AppData\Local\Temp\ipykernel_1732\2303328167.py", line 5, in train_step  *
        X = [tf.io.read_file(file_path) for file_path in batch[:2]]

    TypeError: Input 'filename' of 'ReadFile' Op has type float32 that does not match expected type of string.


In [45]:
# test_input, test_val, y_true = test_data.as_numpy_iterator().next()
# Reset the iterator for test_data
test_iterator = test_data.as_numpy_iterator()
if not len(list(test_data.as_numpy_iterator())):
    print("The dataset is empty. Check your data loading pipeline.")

# Now try to get the next batch
test_input, test_val, y_true = next(test_iterator)

In [46]:
y_hat = siamese_model.predict([test_input, test_val])

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 555ms/step


In [47]:
[1 if prediction > 0.5 else 0 for prediction in y_hat ]

[0, 0, 0, 1, 0, 1, 0, 0, 0, 1, 1, 0, 1, 1, 0, 1]

In [48]:
y_true

array([1., 1., 1., 1., 0., 0., 1., 0., 1., 1., 1., 0., 1., 1., 0., 0.],
      dtype=float32)

In [49]:
r = Recall()
p = Precision()
for test_input, test_val, y_true in test_data.as_numpy_iterator():
    yhat = siamese_model.predict([test_input, test_val])
    r.update_state(y_true, yhat)
    p.update_state(y_true,yhat)
print(r.result().numpy(), p.result().numpy())

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 331ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 313ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 319ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 313ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 315ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 315ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 329ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 315ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 315ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 318ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 329ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 206ms/step
0.22222222 0.40816328


In [50]:
siamese_model.save('siamesemodelv2.h5')



In [51]:
siamese_model = tf.keras.models.load_model('siamesemodelv2.h5',
                                   custom_objects={'L1Dist':L1Dist, 'BinaryCrossentropy':tf.losses.BinaryCrossentropy})



In [52]:
siamese_model.predict([test_input, test_val])

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 285ms/step


array([[0.5002158 ],
       [0.50017405],
       [0.4999619 ],
       [0.49941507]], dtype=float32)

In [53]:
siamese_model.summary()

In [54]:
os.listdir("C:\\Users\\ameya\\OneDrive\\Desktop\\Project\\application_data\\verification_images")

['55637c93-d3cc-11ef-9614-c5d1e8c2b953.jpg',
 '55be6318-d3cc-11ef-a8a1-c5d1e8c2b953.jpg',
 '560cbd41-d3cc-11ef-9fc4-c5d1e8c2b953.jpg',
 '56440904-d3cc-11ef-a6cd-c5d1e8c2b953.jpg',
 '56a31905-d3cc-11ef-a3b6-c5d1e8c2b953.jpg',
 '56c02c14-d3cc-11ef-96e7-c5d1e8c2b953.jpg',
 '57d9a20e-d3cc-11ef-badd-c5d1e8c2b953.jpg',
 '58141a29-d3cc-11ef-b65e-c5d1e8c2b953.jpg',
 '5869f7f6-d3cc-11ef-b1e9-c5d1e8c2b953.jpg',
 '5887276e-d3cc-11ef-b634-c5d1e8c2b953.jpg',
 '58bb1361-d3cc-11ef-989c-c5d1e8c2b953.jpg',
 '58db1da8-d3cc-11ef-be64-c5d1e8c2b953.jpg',
 '59af0723-d3cc-11ef-99c2-c5d1e8c2b953.jpg',
 '5a72c741-d3cc-11ef-8591-c5d1e8c2b953.jpg',
 '5ac3ec93-d3cc-11ef-899d-c5d1e8c2b953.jpg',
 '5b25e99f-d3cc-11ef-8ead-c5d1e8c2b953.jpg',
 '5f2d0a19-d3cc-11ef-8c72-c5d1e8c2b953.jpg',
 '67c3f417-d3cc-11ef-abc9-c5d1e8c2b953.jpg',
 '687f41f0-d3cc-11ef-abed-c5d1e8c2b953.jpg',
 '6c30a850-d3cc-11ef-927f-c5d1e8c2b953.jpg',
 'adbfb0ed-d3cc-11ef-a940-c5d1e8c2b953.jpg',
 'add862c4-d3cc-11ef-aced-c5d1e8c2b953.jpg',
 'ade1a528

In [55]:
os.path.join('application_data', 'input_image', 'input_image.jpg')

'application_data\\input_image\\input_image.jpg'

In [56]:
def verify(model, detection_threshold, verification_threshold):
    results = []
    for image in os.listdir("application_data\\verification_images"):
        input_img = preprocess(os.path.join("application_data\\input_image", 'input_image.jpg'))
        validation_img = preprocess(os.path.join("application_data\\verification_images", image))
        result = model.predict(list(np.expand_dims([input_img, validation_img], axis=1)))
        results.append(result)

    detection = np.sum(np.array(results) > detection_threshold)
    verification = detection / 50
    verified = verification > verification_threshold

    return results, verified

In [59]:
cap = cv2.VideoCapture(0)
while cap.isOpened():
    ret, frame = cap.read()
    frame = frame[120:120+350,200:200+350, :]

    cv2.imshow('Verification', frame)
    if cv2.waitKey(10) & 0xFF == ord('v'):
        imgname = os.path.join("application_data\\input_image", '{}.jpg'.format(uuid.uuid1()))
        cv2.imwrite(imgname, frame)
        results, verified = verify(siamese_model, 0.5, 0.5)
        print(verified)

    if cv2.waitKey(10) & 0xFF == ord('q'):
        break
cap.release()
cv2.destroyAllWindows()

NotFoundError: {{function_node __wrapped__ReadFile_device_/job:localhost/replica:0/task:0/device:CPU:0}} NewRandomAccessFile failed to Create/Open: application_data\input_image\input_image.jpg : The system cannot find the file specified.
; No such file or directory [Op:ReadFile]

: 

In [58]:
res , verified = verify(siamese_model, 0.5, 0.5)

NotFoundError: {{function_node __wrapped__ReadFile_device_/job:localhost/replica:0/task:0/device:CPU:0}} NewRandomAccessFile failed to Create/Open: application_data\input_image\input_image.jpg : The system cannot find the file specified.
; No such file or directory [Op:ReadFile]

In [80]:
verified

False