# import the libraries

In [1]:
import os, shutil, random, tensorflow, tensorflow.keras, numpy as np, matplotlib.pyplot as plt
import cv2, uuid 
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras import optimizers
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import RMSprop


# Data pre-processing

create positive / negative / anchor folders

In [2]:
# create the paths
pos_path= os.path.join('dataset', 'positive')
neg_path= os.path.join('dataset', 'negative')
anc_path= os.path.join('dataset', 'anchor')

In [3]:
# create the directories
if not os.path.exists(pos_path):
    os.makedirs(pos_path)
  
if not os.path.exists(neg_path):  
    os.makedirs(neg_path)
    
if not os.path.exists(anc_path):
    os.makedirs(anc_path)

move the images from the lfw-deepfunneled directory to the neg_path

In [5]:
orginal=os.getcwd()
os.listdir(orginal)

['dataset', 'lfw-deepfunneled', 'Siames-facialVerification.ipynb']

In [8]:
for directory in os.listdir('lfw-deepfunneled'):
    for file in os.listdir(os.path.join('lfw-deepfunneled', directory)):
        ex_path = os.path.join('lfw-deepfunneled', directory, file)
        new_path = os.path.join(neg_path, file)
        os.replace(ex_path, new_path)

Collect Positive and Anchor classes

In [9]:
# capture every frame

# connect to the webcam
cap = cv2.VideoCapture(0)

# flag to start/stop capturing frames
start_capturing = False

while cap.isOpened():
    ret, frame = cap.read()
    # cut the frame
    frame = frame[170:170+250, 230:230+250, :]

    # check for 's' key press
    if cv2.waitKey(1) & 0XFF == ord('a'):
        start_capturing = not start_capturing

    # collect anchor images if 's' has been pressed
    if start_capturing:
        imgName = os.path.join(anc_path, '{}.jpg'.format(uuid.uuid1())) # to create a unique image name
        cv2.imwrite(imgName, frame) # save the anchor img

    cv2.imshow('image collection', frame)

    if cv2.waitKey(1) & 0XFF == ord('q'): # to close the frame when 'q' is clicked
        break

cap.release()
cv2.destroyAllWindows()


load the images

In [4]:
#list_file will grap all images 
# these will search for every jpg file in the path
anchor = tensorflow.data.Dataset.list_files(anc_path+'\*.jpg').take(300) 
positive = tensorflow.data.Dataset.list_files(pos_path+'\*.jpg').take(300)
negative = tensorflow.data.Dataset.list_files(neg_path+'\*.jpg').take(300)

In [5]:
dir_test = anchor.as_numpy_iterator()
dir_test.next()

b'dataset\\anchor\\e523be94-d260-11ee-96d0-920f0c4673c7.jpg'

resize and scale the images

In [5]:
def preprocess(file_path):
    load_img= tensorflow.io.read_file(file_path) #read the img
    image = tensorflow.io.decode_jpeg(load_img) #load the img
    image = tensorflow.image.resize( image, (105,105)) #resize the img
    image = image / 255.0 #scale the img
    return image

create the labelled dataset

In [9]:
#anchor + positive => 1,1,1,1,1
tensorflow.ones(len(anchor))

<tf.Tensor: shape=(13000,), dtype=float32, numpy=array([1., 1., 1., ..., 1., 1., 1.], dtype=float32)>

In [10]:
#anchor + negative => 0,0,0,0,0
tensorflow.zeros(len(anchor))

<tf.Tensor: shape=(13000,), dtype=float32, numpy=array([0., 0., 0., ..., 0., 0., 0.], dtype=float32)>

In [6]:
#Dataset.zip will zip its arguments together so i can iterate over them 
#from_tensor_slices is a data loader 

positives = tensorflow.data.Dataset.zip((anchor, positive, tensorflow.data.Dataset.from_tensor_slices(tensorflow.ones(len(anchor))))) 
negatives = tensorflow.data.Dataset.zip((anchor, negative, tensorflow.data.Dataset.from_tensor_slices(tensorflow.zeros(len(anchor))))) 

data = positives.concatenate(negatives) #to join the postives and negatives sample together in one big dataset

In [8]:
data

<ConcatenateDataset shapes: ((), (), ()), types: (tf.string, tf.string, tf.float32)>

In [12]:
sample= data.as_numpy_iterator()
example1= sample.next()

preprocess the twin images

In [7]:
def preprocess_twin(input_img, val_img, label):
    return (preprocess(input_img), preprocess(val_img)), label

In [14]:
check = preprocess_twin(*example1) #the * will unpack the example1

In [None]:
plt.imshow(check[0][0])
plt.show()

In [None]:
plt.imshow(check[0][1])
plt.show()

In [None]:
check[1]

build dataloader pipeline

In [8]:
data = data.map(preprocess_twin) #map it to apply preprocess_twin to all of the data
data = data.cache()
data = data.shuffle(buffer_size=12000) #shuffle it to a buffer of 1024

In [12]:
data

<ShuffleDataset shapes: (((105, 105, None), (105, 105, None)), ()), types: ((tf.float32, tf.float32), tf.float32)>

In [15]:
sample2= data.as_numpy_iterator()

In [None]:
len(sample2.next())

In [40]:
example2= sample2.next()

In [None]:
plt.imshow(example2[0][0])
plt.show()

In [None]:
plt.imshow(example2[0][1])
plt.show()

In [None]:
example2[1]

training partition

In [109]:
round(len(data)*.7)

18200

In [11]:
# Training partition
train_data = data.take(round(len(data)*.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

In [66]:
train_data #the first None refere to the number of images in the batch (16)

<PrefetchDataset shapes: (((None, 105, 105, None), (None, 105, 105, None)), (None,)), types: ((tf.float32, tf.float32), tf.float32)>

validation partition

In [111]:
round(len(data)*.2)

5200

In [12]:
# Validation data
val_data = data.skip(round(len(data)*.7)) #skip 70% for training
val_data = val_data.take(round(len(data)*.2)) #take 20% for validation
val_data = val_data.batch(16)
val_data = val_data.prefetch(8)

testing partition

In [113]:
round(len(data)*.1)

2600

In [13]:
# Test data
test_data = data.skip(round(len(data)*.7)) #skip 70% for training
test_data = test_data.skip(round(len(data)*.2)) #skip 20% for validation
test_data = test_data.take(round(len(data)*.1)) #take 10% for testing
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

In [246]:
test_data

<PrefetchDataset shapes: (((None, 105, 105, None), (None, 105, 105, None)), (None,)), types: ((tf.float32, tf.float32), tf.float32)>

# Buliding the Model

make the embedding layer

In [9]:
def embedding_layer():
    #the image to be embedded
    inp_img = Input(shape=(105,105,3), name='input_image')

    conv1 = Conv2D(64, (10,10), activation='relu')(inp_img)
    maxP1 = MaxPooling2D(64, (2,2), padding='same')(conv1)

    conv2= Conv2D(128, (7,7), activation= 'relu')(maxP1)
    maxP2 = MaxPooling2D(64, (2,2), padding='same')(conv2)

    conv3= Conv2D(128, (4,4), activation='relu')(maxP2)
    maxP3= MaxPooling2D(64, (2,2), padding='same')(conv3)


    conv4= Conv2D(256, (4,4), activation='relu')(maxP3)
    flatten = Flatten()(conv4)
    dense = Dense(4096, activation='sigmoid')(flatten)

    return Model(inputs=[inp_img], outputs=[dense], name="embedding")

In [14]:
embedding = embedding_layer()

In [16]:
embedding.summary()

Model: "embedding"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_image (InputLayer)     [(None, 105, 105, 3)]     0         
_________________________________________________________________
conv2d (Conv2D)              (None, 96, 96, 64)        19264     
_________________________________________________________________
max_pooling2d (MaxPooling2D) (None, 48, 48, 64)        0         
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 42, 42, 128)       401536    
_________________________________________________________________
max_pooling2d_1 (MaxPooling2 (None, 21, 21, 128)       0         
_________________________________________________________________
conv2d_2 (Conv2D)            (None, 18, 18, 128)       262272    
_________________________________________________________________
max_pooling2d_2 (MaxPooling2 (None, 9, 9, 128)         0 

make the distance layer

In [10]:
class Distance_layer(Layer): #declaring Distance_layer class which inherits from the Layer clas
    
    # i sat (**kwargs) to accept any parameter
    def __init__(self, **kwargs):
        # i sat it to super to call the Layer class, to ensures that Distance_layer is properly initialized as a Layer
        super().__init__()
       
    # calculate the similarity of two images
    def call(self, input_embedding, validation_embedding):
        return tensorflow.math.abs(input_embedding - validation_embedding)

In [16]:
dist = Distance_layer()

In [19]:
dist

<__main__.Distance_layer at 0x2e0f63d9be0>

making the Siamese model

In [11]:
def siamese_model():
    #the input image
    anchor_image = Input(name='anchor_image', shape=(105,105,3))

    #the validation image (positive OR negative)
    validation_image= Input(name='validation_img', shape=(105,105,3))

    #get the distance between the anchor image embedding and the validation image embedding
    dist_layer = Distance_layer()
    dist_layer._name = 'distance'
    distances = dist_layer(embedding(anchor_image), embedding(validation_image))
    
    #classify the input as valid or not
    classify = Dense(1, activation='sigmoid')(distances)
    
    return Model(inputs=[anchor_image, validation_image], outputs=classify, name='SiameseNetwork')

In [18]:
siamese_model = siamese_model()

In [22]:
siamese_model.summary()

Model: "SiameseNetwork"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
anchor_image (InputLayer)       [(None, 105, 105, 3) 0                                            
__________________________________________________________________________________________________
validation_img (InputLayer)     [(None, 105, 105, 3) 0                                            
__________________________________________________________________________________________________
embedding (Functional)          (None, 4096)         38960448    anchor_image[0][0]               
                                                                 validation_img[0][0]             
__________________________________________________________________________________________________
distance (Distance_layer)       (None, 4096)         0           embedding[0][0]     

# Train the Model

In [None]:
from tensorflow.keras.losses import BinaryCrossentropy

siamese_model.compile(optimizer=optimizers.RMSprop(learning_rate=1e-4), loss=BinaryCrossentropy(), metrics=['accuracy'])
history = siamese_model.fit(train_data, validation_data=val_data, epochs=10)

In [None]:
test_loss, test_accuracy = siamese_model.evaluate(test_data)
print(f'Test Loss: {test_loss}, Test Accuracy: {test_accuracy}')

In [None]:
accuracy = history.history['accuracy']
val_accuracy = history.history['val_accuracy']
loss= history.history['loss']
val_loss = history.history['val_loss']

epochs = range(1, len(accuracy)+1)

plt.plot(epochs, accuracy, 'r', label='training accuracy')
plt.plot(epochs, val_accuracy, 'b', label='val accuracy')
plt.title('training and validation accuracy')
plt.legend()
plt.show()

plt.plot(epochs, loss, 'r', label='training loss')
plt.plot(epochs, val_loss, 'b', label='val loss')
plt.title('training and validation loss')
plt.legend()
plt.show()


In [135]:
from tensorflow.keras.metrics import Precision

In [164]:
test_input, y_true = test_data.as_numpy_iterator().next()

In [None]:
y_hat = siamese_model.predict([test_input])
y_hat

In [None]:
[1 if prediction > 0.5 else 0 for prediction in y_hat ]

In [None]:
y_true

In [None]:
precision = Precision()
precision.update_state(y_true, y_hat)
precision.result().numpy()

In [None]:
# Assuming test_input is a tuple of two lists of images
first_image_pair = test_input[0]
second_image_pair = test_input[1]

# Select image from each pair
first_image = first_image_pair[0]
second_image = second_image_pair[0]

# Create a figure with two subplots
fig, axs = plt.subplots(1, 2, figsize=(10, 5))

# Display the first image
axs[0].imshow(first_image)
axs[0].set_title('Label: ' + str(y_true[0]))

# Display the second image
axs[1].imshow(second_image)
axs[1].set_title('Label: ' + str(y_true[0]))

# Remove the x and y ticks
for ax in axs:
    ax.set_xticks([])
    ax.set_yticks([])

plt.show()

In [None]:
siamese_model.save('siamesemodel600.h5')

# Check the save Model

In [35]:
Distance_layer

__main__.Distance_layer

In [12]:
model = tensorflow.keras.models.load_model('siamesemodel600.h5', 
                                   custom_objects={'Distance_layer':Distance_layer, 'RMSprop': tensorflow.keras.optimizers.RMSprop, 'BinaryCrossentropy':tensorflow.losses.BinaryCrossentropy}, compile=False)

In [20]:
model.summary()

Model: "SiameseNetwork"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
anchor_image (InputLayer)       [(None, 105, 105, 3) 0                                            
__________________________________________________________________________________________________
validation_img (InputLayer)     [(None, 105, 105, 3) 0                                            
__________________________________________________________________________________________________
embedding (Functional)          (None, 4096)         38960448    anchor_image[0][0]               
                                                                 validation_img[0][0]             
__________________________________________________________________________________________________
distance_layer_2 (Distance_laye (None, 4096)         0           embedding[0][0]     

# Real World Test

In [13]:
# create the paths
inp_path= os.path.join('realData', 'inputImage')
ver_path= os.path.join('realData', 'verifyImage')

In [22]:
inp_path

'realData\\inputImage'

In [14]:
# create the directories
if not os.path.exists(inp_path):
    os.makedirs(inp_path)

if not os.path.exists(ver_path):
    os.makedirs(ver_path)

In [31]:
# connect to the webcam
cap = cv2.VideoCapture(0)
while cap.isOpened():
    ret, frame = cap.read()
    #cut the frame, discription bellow
    frame = frame[170:170+250, 230:230+250, :]

    #collect verify images
    if cv2.waitKey(1) & 0XFF == ord('v'): #to collect verifiy image click on 'v'
        imgName = os.path.join(ver_path, '{}.jpg'.format(uuid.uuid1())) #to create a unique image name
        cv2.imwrite(imgName, frame) #save the anchor img

    cv2.imshow('image collection', frame)

    if cv2.waitKey(1) & 0XFF == ord('q'): #to close the fram when i click 'q'
        break

cap.release()
cv2.destroyAllWindows()

In [None]:
os.listdir(ver_path)

create the verification function

In [15]:
def verification(model, detection_threshold, ver_threshold):
    results = []
    for image in os.listdir(ver_path):
        input_image_path = os.path.join('realData', 'inputImage', 'inputImage.jpg')
        ver_image_path = os.path.join('realData', 'verifyImage', image)
        
        # Preprocess input and verification images
        input_img = preprocess(input_image_path)
        ver_img = preprocess(ver_image_path)

        # Make Predictions 
        result = model.predict(list(np.expand_dims([input_img, ver_img], axis=1))) #wrap it into one array because i want one sample 
        results.append(result) #the results will be in one big array
    
    # Detection Threshold is the matrix of which a prediciton is considered positive 
    detectionThreshold = np.sum(np.array(results) > detection_threshold) #how many of the results are a match
    
    # Verify Threshold is the positive predictions / all positive sample
    verificationThreshold = detectionThreshold / len(os.listdir(ver_path))

    # check if the user is verify or not (TRUE / FALSE)
    verified = verificationThreshold > ver_threshold

    return results, verified

In [47]:
# Example usage
detection_threshold = 0.8
verification_threshold = 0.5
results, verified = verification(model, detection_threshold, verification_threshold)
print(f"Results: {results}")
print(f"Verified: {verified}")

Results: [array([[0.5403813]], dtype=float32), array([[0.26774818]], dtype=float32), array([[0.26774818]], dtype=float32), array([[0.27946565]], dtype=float32), array([[0.4498254]], dtype=float32), array([[0.5433409]], dtype=float32), array([[0.73319113]], dtype=float32), array([[0.9261545]], dtype=float32), array([[0.96841496]], dtype=float32), array([[0.99098]], dtype=float32), array([[0.9863265]], dtype=float32), array([[0.995268]], dtype=float32), array([[0.11137912]], dtype=float32), array([[0.38676274]], dtype=float32), array([[0.9965515]], dtype=float32), array([[0.99954706]], dtype=float32), array([[0.99940175]], dtype=float32), array([[0.9967697]], dtype=float32), array([[0.0675973]], dtype=float32), array([[0.26572478]], dtype=float32), array([[0.39905134]], dtype=float32), array([[0.6652823]], dtype=float32), array([[0.8194928]], dtype=float32), array([[0.9408945]], dtype=float32), array([[0.49062756]], dtype=float32), array([[0.996929]], dtype=float32), array([[0.99987763]]

In [22]:
len(results)

50

In [48]:
# Extract the float values from each sub-array
float_values = [item[0][0] for item in results]

# Sort the float values in ascending order
sorted_float_values = sorted(float_values)

print(f"Sorted Results: {sorted_float_values}")

Sorted Results: [0.0675973, 0.09563181, 0.101403475, 0.11137912, 0.17334747, 0.247266, 0.26572478, 0.26774818, 0.26774818, 0.27946565, 0.38676274, 0.39905134, 0.4498254, 0.49062756, 0.5403813, 0.5433409, 0.6652823, 0.7034562, 0.73319113, 0.7621397, 0.76431036, 0.76727486, 0.8194928, 0.8431608, 0.88197935, 0.9261545, 0.9408945, 0.9671873, 0.9671873, 0.96841496, 0.98216444, 0.9863265, 0.98650455, 0.98654133, 0.98820543, 0.99098, 0.9943975, 0.99445915, 0.995268, 0.99598396, 0.99606216, 0.9965515, 0.9967697, 0.9968039, 0.996929, 0.99931765, 0.99940175, 0.99954706, 0.99984926, 0.99987763]


In [25]:
np.sum(np.squeeze(results) > 0.9)

28

In [26]:
28/len(os.listdir(ver_path))

0.56

OpenCV real time verification

In [24]:
# connect to the webcam
cap = cv2.VideoCapture(0)
while cap.isOpened():
    ret, frame = cap.read()
    #cut the frame, discription bellow
    frame = frame[170:170+250, 230:230+250, :]
    cv2.imshow('Verification', frame)

    #verification trigger
    if cv2.waitKey(1) & 0XFF == ord('v'): #to verify the user click on 'v'
        cv2.imwrite(os.path.join('realData', 'inputImage', 'inputImage.jpg'), frame) #save the anchor img
        # call the verify function
        results, verified = verification(model, 0.9, 0.7) 
        print(verified)

    if cv2.waitKey(1) & 0XFF == ord('q'): #to close the fram when i click 'q'
        break

cap.release()
cv2.destroyAllWindows()

False
False
