In [1]:
%pip install tensorflow==2.10 opencv-python matplotlib

Collecting tensorflow==2.10Note: you may need to restart the kernel to use updated packages.

  Using cached tensorflow-2.10.0-cp310-cp310-win_amd64.whl (455.9 MB)
Installing collected packages: tensorflow
Successfully installed tensorflow-2.10.0



[notice] A new release of pip is available: 23.2 -> 23.2.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [8]:
# Standard dependencies
import cv2
import os
import random
import shutil
import numpy as np # For re-shaping arrays
from matplotlib import pyplot as plt # Visualise images

# Tensorflow dependencies
from tensorflow.python.keras.models import Model
from tensorflow.python.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten, Lambda
import tensorflow as tf

In [2]:
# To avoid running out of memory, we restrict the GPU memory growth aka
# how many resources the model is consuming at any given time
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    # Just in case the device we are using has more than one gpu, we are 
    # making sure to restrict the usage of ALL of them
    tf.config.experimental.set_memory_growth(gpu, True)

In [None]:
gpus # Just to make sure that the GPU device is recognized

In [10]:
# Create files that will hold the anchor, positive and negative images:
# Anchor: The image we imput
# Positive: Images that match the anchor
# Negative: Images that are different from the anchor
anc_path = os.path.join("data","anchor")
pos_path = os.path.join("data","positive")
neg_path = os.path.join("data","negative")

os.makedirs(anc_path)
os.makedirs(pos_path)
os.makedirs(neg_path)


In [None]:
# Uncompress the tar file that contains the images in the Wild Dataface
!tar -xf lfw.tgz # http://vis-www.cs.umass.edu/lfw/

In [None]:
# We take the images that we downloaded and place in the negative folder
# (these images will be used so that the machine can understand that the
# person whose image we are providing isn't the same as any of the ones in
# the negative folder)

# Go through all the directories in the lfw folder
for directory in os.listdir('lfw'):
    # Find all the images in said directory
    for file in os.listdir(os.path.join('lfw',directory)):
        # Replace the path of that image with the path of the negative folder
        # (aka place the image in the negative folder)
        previous_path = os.path.join('lfw', directory, file)
        new_path = os.path.join(neg_path, file)
        os.replace(previous_path, new_path)

In [None]:
# Importing this so that the images we save all have different names
import uuid

# Now we will get the images we will require for the anchor and positive files

# Connect to the webcam
capture = cv2.VideoCapture(0) # Keep in mind this number might vary slightly
                              # so try out a few other numbers like 1, 2, 3, 4, 5 etc in case there is a problem
while (capture.isOpened()):
    return_value, frame = capture.read()

    # Keeping in mind that the images in the negative folder have a resolution of 250x250
    # we need out frames (aka the images we will capture) to be 250x250 as well
    frame = frame[120:370, 200:450, :]

    # Show the camera feed
    cv2.imshow("Images", frame)

    # Add image to anchor if 'a' is pressed
    if (cv2.waitKey(1) & 0XFF == ord('a')):
        # Create the unique name and save the image
        name = os.path.join(anc_path, '{}.jpg'.format(uuid.uuid1()))
        cv2.imwrite(name, frame)
    
    # Add image to positive if 'p' is pressed
    if (cv2.waitKey(1) & 0XFF == ord('p')):
        # Create the unique name and save the image
        name = os.path.join(pos_path, '{}.jpg'.format(uuid.uuid1()))
        cv2.imwrite(name, frame)

    # Break by pressing he 'q' key
    if (cv2.waitKey(1) & 0XFF == ord('q')):
        break

# Release webcam
capture.release()
# Close the camera feed window
cv2.destroyAllWindows()

In [2]:
# Get 300 image paths from each image set
anchor = tf.data.Dataset.list_files(anc_path+'\*.jpg').take(300) # TO_DO Change to path names
positive = tf.data.Dataset.list_files(pos_path+'\*.jpg').take(300)
negative = tf.data.Dataset.list_files(neg_path+'\*.jpg').take(300)

In [3]:
dir_test = anchor.as_numpy_iterator()

In [4]:
print(dir_test.next())

b'data\\anchor\\2d257bc5-547c-11ee-a668-38d57a328974.jpg'


In [5]:
# Scale and resize the images
def preprocess(file_path):
    # Get byte code of image (the file path) and then decode it
    byte_img = tf.io.read_file(file_path)
    img = tf.io.decode_jpeg(byte_img)

    img = tf.image.resize(img, (105, 105)) # Resizing out image according to the "Siamese Neural Networks"
                                           # research paper
    img = img / 255.0 # Scale every pixel value to 0-1 => scale the image
    return img

In [6]:
img = preprocess('data\\anchor\\47c0c4c7-547c-11ee-b25f-38d57a328974.jpg')

In [7]:
img.numpy().max() 

1.0

In [8]:
# Depending on the inputs (anchor, positive) or (anchor, negative) we will be
# getting a result ( a label ) as follows:
# (anchor, positive) => 1
# (anchor, negative) => 0

positives = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data = positives.concatenate(negatives) # Combine the positives and negatives

In [7]:
samples = data.as_numpy_iterator()

In [8]:
ex = samples.next()


In [9]:
ex

(b'data\\anchor\\4d3f92d9-547c-11ee-801d-38d57a328974.jpg',
 b'data\\positive\\6394933b-547c-11ee-bfa9-38d57a328974.jpg',
 1.0)

In [9]:
# Create function to scale and resize both images we pass
def twin_preprocess(anc, verification_image, label):
    return (preprocess(anc), preprocess(verification_image), label)


In [11]:
res = twin_preprocess(*ex)

In [None]:
plt.imshow(res[1])

In [13]:
res[2]

1.0

In [10]:
# Dataloader pipeline
data = data.map(twin_preprocess)
data = data.cache()
data = data.shuffle(buffer_size=1024) # Simply mix the positive and negative images

In [11]:
# Training partition
train_data = data.take((round(len(data)*0.7))) # Get 70% of the samples
train_data = train_data.batch(16) # Pass 16 images each time
train_data = train_data.prefetch(8) # Preprocess the next image beforehand


In [12]:
# Testing Partition
test_data = data.skip((round(len(data)*0.7)))
test_data = test_data.take((round(len(data)*0.3)))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

In [13]:
# Embedding layer

def make_embedding():
    # Input
    inp = Input(shape=(105, 105, 3), name="input_image")

    # First block
    # Convolusion layer
    c1 = Conv2D(64, (10, 10), activation="relu")(inp)
    # Max pooling layer
    m1  = MaxPooling2D(64, (2, 2), padding="same")(c1)

    # Second block
    c2 = Conv2D(128, (7, 7), activation="relu")(m1)
    m2 = MaxPooling2D(64, (2, 2), padding="same")(c2)

    # Third block
    c3 = Conv2D(128, (4, 4), activation="relu")(m2)
    m3 = MaxPooling2D(64, (2, 2), padding="same")(c3)

    # Final embedding block
    c4 = Conv2D(256, (4, 4), activation="relu")(m3)
    f1 = Flatten()(c4)
    d1 = Dense(4096, activation="sigmoid")(f1)

    return Model(inputs=[inp], outputs=[d1], name="embedding")

In [16]:
embedding = make_embedding()

In [17]:
embedding.summary()

Model: "embedding"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_image (InputLayer)     [(None, 105, 105, 3)]     0         
_________________________________________________________________
conv2d (Conv2D)              (None, 96, 96, 64)        19264     
_________________________________________________________________
max_pooling2d (MaxPooling2D) (None, 48, 48, 64)        0         
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 42, 42, 128)       401536    
_________________________________________________________________
max_pooling2d_1 (MaxPooling2 (None, 21, 21, 128)       0         
_________________________________________________________________
conv2d_2 (Conv2D)            (None, 18, 18, 128)       262272    
_________________________________________________________________
max_pooling2d_2 (MaxPooling2 (None, 9, 9, 128)         0 

In [19]:
# Siamese model
def make_siamese_model():
    # Anchor input image
    input_image = Input(name="input_img", shape=(105, 105, 3))
    # Positive / Negative input image
    validation_image = Input(name="validation_img", shape=(105, 105, 3))

    # Calculate L1 distance between the encoded vectors
    distances = Lambda(lambda x: tf.abs(x[0] - x[1]), name='l1_distance')([embedding(input_image), embedding(validation_image)])

    # Classification layer
    classifier = Dense(1, activation="sigmoid")(distances)

    return Model(inputs=[input_image, validation_image], outputs=classifier, name="Siamese_Network")

In [20]:
siamese_model = make_siamese_model()

In [21]:
siamese_model.summary()

Model: "Siamese_Network"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_img (InputLayer)          [(None, 105, 105, 3) 0                                            
__________________________________________________________________________________________________
validation_img (InputLayer)     [(None, 105, 105, 3) 0                                            
__________________________________________________________________________________________________
embedding (Functional)          (None, 4096)         38960448    input_img[0][0]                  
                                                                 validation_img[0][0]             
__________________________________________________________________________________________________
l1_distance (Lambda)            (None, 4096)         0           embedding[2][0]    

In [22]:
# Compile the model
siamese_model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# Print model summary
siamese_model.summary()


Model: "Siamese_Network"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_img (InputLayer)          [(None, 105, 105, 3) 0                                            
__________________________________________________________________________________________________
validation_img (InputLayer)     [(None, 105, 105, 3) 0                                            
__________________________________________________________________________________________________
embedding (Functional)          (None, 4096)         38960448    input_img[0][0]                  
                                                                 validation_img[0][0]             
__________________________________________________________________________________________________
l1_distance (Lambda)            (None, 4096)         0           embedding[2][0]    

In [23]:
# Setup loss function
binary_cross_loss = tf.losses.BinaryCrossentropy()

In [24]:
# Setup optimizer
opt = tf.keras.optimizers.Adam(1e-4) # 1e - 4 = 0.0001

In [5]:
# Checkpoints
checkpoint_dir = "./training_checkpoints"
os.mkdir(checkpoint_dir)
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(opt=opt, siamese_model=siamese_model)

In [26]:
test_batch = train_data.as_numpy_iterator()

In [27]:
batch_1 =test_batch.next()

In [28]:
x = batch_1[:2]
y = batch_1[2]

In [29]:
y

array([1., 1., 0., 0., 1., 1., 1., 1., 0., 0., 0., 1., 1., 1., 0., 0.],
      dtype=float32)

In [34]:
batch_1

(array([[[[0.99607843, 0.99607843, 0.99607843],
          [0.99607843, 0.99607843, 0.99607843],
          [0.99607843, 0.99607843, 0.99607843],
          ...,
          [0.9873016 , 0.99122316, 0.9716153 ],
          [0.98403364, 0.9879552 , 0.9683474 ],
          [0.99607843, 1.        , 0.9764706 ]],
 
         [[0.99607843, 0.99607843, 0.99607843],
          [0.99607843, 0.99607843, 0.99607843],
          [0.99607843, 0.99607843, 0.99607843],
          ...,
          [0.99203014, 0.9959517 , 0.97634387],
          [0.9795718 , 0.9834934 , 0.96388555],
          [0.99607843, 1.        , 0.9764706 ]],
 
         [[0.99607843, 0.99607843, 0.99607843],
          [0.99607843, 0.99607843, 0.99607843],
          [0.99607843, 0.99607843, 0.99607843],
          ...,
          [0.99510694, 0.9990285 , 0.97942066],
          [0.9538749 , 0.95779645, 0.9381886 ],
          [0.99215686, 0.99607843, 0.972549  ]],
 
         ...,
 
         [[0.71547735, 0.66057533, 0.66057533],
          [0.66212

In [30]:
# Define the training step for each batch of data based on the following steps:
# 1. Make a prediction
# 2. Calculate loss
# 3. Derive gradients
# 4. Calculate new weights and apply

@tf.function # Compiles the function into callable TensorFlow graph
             # aka it helps train the model efficiently
def train_step(batch):
    with tf.GradientTape() as tape: # Helps us can capture the garients
        # Get anchor and positive/negative image
        x = batch[:2]  # Each batch cointains 16 anchors, 16 positives/negatives and 16 labels
        # Get label
        y_true = batch[2]

        y_pred = siamese_model(x, training=True) # Make a prediction
                                               # **Training = True is importanyt to activate all the layers
        # Calculate loss
        loss = binary_cross_loss(y_true, y_pred) # Calculate loss (true value, prediction)
    
    # Calculate gradients with respect to the loss
    grad = tape.gradient(loss, siamese_model.trainable_variables)

    # Update weights and apply to siamese model
    opt.apply_gradients(zip(grad, siamese_model.trainable_variables))

    #return loss

In [31]:
# Train loop
def train(data, EPOCHS):
    # Loop through epochs
    for epoch in range(1, EPOCHS + 1):
        print("'n Epoch {}/{}".format(epoch, EPOCHS)) # Optional and simply for the visuals
        progbar = tf.keras.utils.Progbar(len(data)) # -//-

        # Look through each batch
        for idx, batch in enumerate(data):
            # Run train step
            train_step(batch)
            progbar.update(idx + 1) 

        # Save checkpoints
        if epoch % 10 == 0:
            checkpoint.save(file_prefix=checkpoint_prefix)




In [32]:
EPOCHS = 50

In [None]:
train(train_data, EPOCHS)

In [None]:
from tensorflow.python.keras.metrics import Recall
# Note: Precision demonstrates the proporion of correct positive identifications
#       Recall demostrates the proportion of the ACTUAL positive that were correctly identified

In [42]:
# Evaluate the model

# Get a batch of data
test_input, test_value, y_true = test_data.as_numpy_iterator().next()

In [43]:
y_true

array([1., 0., 1., 0., 1., 1., 0., 1., 1., 1., 0., 0., 1., 1., 0., 0.],
      dtype=float32)

In [44]:
# Make predictions
y_pred = siamese_model.predict([test_input, test_value])

In [45]:
y_pred

array([[9.9870914e-01],
       [1.3241939e-07],
       [1.0000000e+00],
       [5.5191106e-13],
       [9.9999988e-01],
       [9.9988294e-01],
       [5.2315460e-07],
       [9.9999607e-01],
       [9.9962521e-01],
       [9.9996722e-01],
       [3.1029652e-12],
       [6.7798722e-10],
       [9.9998313e-01],
       [1.0000000e+00],
       [9.2673785e-13],
       [1.1804138e-11]], dtype=float32)

In [46]:
y_true

array([1., 0., 1., 0., 1., 1., 0., 1., 1., 1., 0., 0., 1., 1., 0., 0.],
      dtype=float32)

In [47]:
[1 if prediction > 0.5 else 0 for prediction in y_pred]

[1, 0, 1, 0, 1, 1, 0, 1, 1, 1, 0, 0, 1, 1, 0, 0]

In [48]:
# Post processing the results (aka we create a threshhold so that any numbers above that 
# become => 1 otherwise => 0)
[1 if prediction > 0.5 else 0 for prediction in y_pred]

# Compare the above list to the labels to see if the outputs match the real results
m = Recall()

# Calculate the recall value
m.update_state(y_true,y_pred)

# Return the recall result (pretty much how accurate the model is)
# 1 = perfect  |   0 = horibble
m.result().numpy()

1.0

In [None]:
# Visualise the results
plt.figure(figsize=(10, 8))
plt.subplot(1,2,1)
plt.imshow(test_input[1])
plt.subplot(1,2,2)
plt.imshow(test_value[1])
plt.show()

In [None]:
# Save Model
siamese_model.save("siamesemodelx.h5")

In [92]:
# Reload Model
l_model = tf.keras.models.load_model("siamesemodelx.h5")

In [93]:
l_model.summary()

Model: "Siamese_Network"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_img (InputLayer)         [(None, 105, 105, 3  0           []                               
                                )]                                                                
                                                                                                  
 validation_img (InputLayer)    [(None, 105, 105, 3  0           []                               
                                )]                                                                
                                                                                                  
 embedding (Functional)         (None, 4096)         38960448    ['input_img[0][0]',              
                                                                  'validation_img[0]

In [6]:
app_ver_path = os.path.join("application_data", "verification_images")
app_inp_path = os.path.join("application_data", "input_image")

os.makedirs(app_ver_path)
os.makedirs(app_inp_path)

In [11]:
# List all files in the source directory
all_files = os.listdir(pos_path)

# Randomly select 50 unique files
selected_files = random.sample(all_files, 50)

# Iterate through the selected files and copy them to the destination directory
for filename in selected_files:
    source_file = os.path.join(pos_path, filename)
    destination_file = os.path.join(app_ver_path, filename)
    shutil.copy2(source_file, destination_file)
    #print(f'Copied: {filename} to {destination_directory}')

In [57]:
def verify(model, detection_threshold, verification_threshold):
    # Build results array
    results = []
    for image in os.listdir(os.path.join(app_ver_path)):
        input_img = preprocess(os.path.join("application_data", "input_image", "input_image.jpg"))
        validation_img = preprocess(os.path.join("application_data", "verification_images", image))

        # Make predictions
        result = model.predict(list(np.expand_dims([input_img, validation_img], axis = 1)))
        results.append(result)
    
    # Detection Threshold: A metric above which a prediction is considered positive
    detection = np.sum(np.array(results) > detection_threshold)
    
    # Verification Threshold: Proportion of positive predictions in regards to the total positive samples
    verification = detection / len(os.listdir(os.path.join("application_data", "verification_images")))
    verified = verification > verification_threshold
    
    return results, verified


In [4]:
len(os.listdir(os.path.join("application_data", "verification_images")))

50

In [None]:
# OpenCV Real Time Verification
cap = cv2.VideoCapture(0)
while cap.isOpened():
    ret, frame = cap.read()

    frame = frame[120:370, 200:450, :]

    cv2.imshow("Verification", frame)

    # Verification trigger
    if cv2.waitKey(10) & 0xFF == ord('v'):
        # Save input image to application_data/input_image folder
        cv2.imwrite(os.path.join("application_data", "input_image", "input_image.jpg"), frame)

        # Run verification l_model
        results, verified = verify(siamese_model, 0.5, 0.5)
        print(verified)
    if cv2.waitKey(10) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()