# 1. SETUP


# 1.1 install dependencies

In [3]:
#!pip install tensorflow==2.15.0 opencv-python matplotlib
#!pip install opencv-python

# 1.2 import dependencies

In [2]:
# import standart dependencies
import cv2 
import os
import random
import numpy as np
from matplotlib import pyplot as plt

In [3]:
# import tenserflow dependencies-function API
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, MaxPooling2D, Input, Flatten,Dense
import tensorflow as tf




# 1.3 SET GPU Growth

In [6]:
#avoid OOM erros by settings GPU memory consumption Growth
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus :
    tf.config.experimental.set_memory_growth(gpu,True)

In [7]:
gpus,

([],)

# 1.4 create folder structure

In [4]:
#set path
pos_path =os.path.join("data",'positive')
neg_path =os.path.join("data",'negative')
anc_path =os.path.join("data",'anchor')

#make the directories
#os.makedirs(pos_path)
os.makedirs(neg_path)
os.makedirs(anc_path)

FileExistsError: [WinError 183] Impossible de créer un fichier déjà existant: 'data\\negative'

# 2. collect positives and anchors

# 2.1 untar labelled faces in the wild dataset

In [None]:
#http://vis-www.cs.umass.edu/lfw/#explore
!tar -xf lfw.tgz

In [None]:
#Move LFW Images to the following repository data\negative
for directory in os.listdir('lfw'):
    for file in os.listdir(os.path.join('lfw',directory)):
        EX_PATH = os.path.join('lfw',directory,file)
        NEW_PATH = os.path.join(neg_path, file)
        os.replace(EX_PATH,NEW_PATH)

# 2.2 collect positive and anchor classes

In [5]:
#import uuid  library to generate unique image names 
import uuid

In [None]:
uuid??

In [6]:
# Establish a connection to the webcam 
cap = cv2.VideoCapture(0)
while cap.isOpened():
    ret, frame = cap.read()
    #cut down frame to 250*250px
    frame = frame[150:150+250,150:150+250,:]
    
    #collect anchors
    if cv2.waitKey(1) & 0xFF == ord('a'):
        #create the unqie file path
        imgname = os.path.join(anc_path,'{}.jpg'.format(uuid.uuid1()))
        #write out anchor path
        cv2.imwrite(imgname,frame)
    #collect positive
    if cv2.waitKey(1) & 0xFF == ord('p'):
         #create the unqie file path
        imgname = os.path.join(pos_path,'{}.jpg'.format(uuid.uuid1()))
        #write out anchor path
        cv2.imwrite(imgname,frame)
    
    #show image back to screen
    cv2.imshow('Image Collection',frame)
    #breaking gracefully
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break
#release the webcam        
cap.release()
#close the image show frame
cv2.destroyAllWindows()

In [None]:
plt.imshow(frame)

In [1]:
#plt.imshow(frame[190:190+250,210:210+250,:])

# 3. Load and Preprocess Images

# 3.1 Get Image Directories

In [7]:
anchor = tf.data.Dataset.list_files(anc_path+'\*.jpg').take(300)
positive = tf.data.Dataset.list_files(pos_path+'\*.jpg').take(300)
negative = tf.data.Dataset.list_files(neg_path+'\*.jpg').take(300)

In [8]:
dir_test = anchor.as_numpy_iterator()


In [9]:
dir_test.next()


b'data\\anchor\\c717e04b-b8af-11ee-bdef-80193424ee23.jpg'

# 3.2 Preprocessing -scale and Resize

In [13]:
def preprocess(file_path):
    #read in image from file path
    byte_img = tf.io.read_file(file_path)
    #load the image
    img = tf.io.decode_jpeg(byte_img)
    #Preprocessing steps -resizing the image to be 100*100*3
    img = tf.image.resize(img,(100,100))
    #scale image to be  between 0 and 1
    img = img/255.0
    #return image
    
    return img

In [None]:
img =preprocess('data\\anchor\\c717e04b-b8af-11ee-bdef-80193424ee23.jpg')
plt.imshow(img)

# 3.3 create Labelled  Dataset

In [None]:
# (anchor,positive) => 1,1,1,1
# (anchor,negative) => 0,0,0,0


In [15]:
positives = tf.data.Dataset.zip((anchor,positive,tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor,negative,tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data  = positives.concatenate(negatives)

In [16]:
samples = data.as_numpy_iterator()

In [17]:
example =samples.next()
example

(b'data\\anchor\\ab6f7d8a-b8af-11ee-81d4-80193424ee23.jpg',
 b'data\\positive\\abb0509d-b8af-11ee-989f-80193424ee23.jpg',
 1.0)

# 3.4 Build Train and Test Partition

In [22]:
def preprocess_twin(input_img,validation_img,label):
    return(preprocess(input_img),preprocess(validation_img),label)

In [23]:
 res = preprocess_twin(*example)

In [None]:
plt.imshow(res[0])
res[2]

In [25]:
#built dataloader pipeline
data =data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size =1024)

In [26]:
samples = data.as_numpy_iterator()
samples.next()
len(samples.next())


3

In [None]:
plt.imshow(samples.next()[0])

In [27]:
#training partition
train_data = data.take(round(len(data)*.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

In [28]:
#testing partition
test_data = data.skip(round(len(data)*.7))
test_data = test_data.take(round(len(data)*.3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

# 4. Model Engineering

# 4.1 Build Embedding Layer

In [29]:
#
def make_embedding():
    
    inp = Input(shape=(100,100,3),name= "input_image")
    #FIRST BLOCK
    
    c1 = Conv2D(64 ,(10,10),activation ='relu')(inp)
    m1= MaxPooling2D(64,(2,2),padding='same')(c1)
    
    #SECOND BLOCK
    c2 = Conv2D(128 ,(7,7),activation ='relu')(m1)
    m2= MaxPooling2D(64,(2,2),padding='same')(c2)
    
    #THIRD BLOCK
    c3 = Conv2D(128 ,(4,4),activation ='relu')(m2)
    m3= MaxPooling2D(64,(2,2),padding='same')(c3)
    
    #FINAL EMBEDDING BLOCK
    c4 = Conv2D(256 ,(4,4),activation ='relu')(m3)
    f1= Flatten()(c4)
    d1 = Dense(4096,activation='sigmoid')(f1)
    
    return Model(inputs=[inp] , outputs=[d1] , name='embedding')
    

In [30]:
embedding = make_embedding()





# 4.2 Build Distance Layer

In [31]:
#siamese L1 distance class
class L1Dist(Layer):
    #init method - inherite
    def __init__(self,**kwargs):
        super().__init__()
    #magic happens here - similarity calculation
    def call(self,input_embedding,validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding)

# 4.3 Make siemese Model

In [32]:
def make_siamese_model():
    #Anchor image input in the network
    input_image= Input(name='input_img',shape=(100,100,3))
    #Validation image in the network
    validation_image = Input(name='validation_img',shape=(100,100,3))
    
    #combine siamese distance components
    siamese_layer = L1Dist()
    siamese_layer._name = 'distance'
    distance =siamese_layer(embedding(input_image),embedding(validation_image))
    
    #classification layer
    classifier = Dense(1,activation='sigmoid')(distance)
    
    return  Model(inputs =[input_image, validation_image],outputs = classifier , name ='siameseNetwork')

In [33]:
siamese_model=make_siamese_model()
siamese_model

<keras.src.engine.functional.Functional at 0x16db2e2e510>

# 5. Training

#  5.1 Setup Loss and Optimizer

In [35]:
binary_cross_loss = tf.losses.BinaryCrossentropy()

In [34]:
opt = tf.keras.optimizers.Adam(1e-4)

# 5.2 Establish checkpoints

In [36]:
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir,'ckpt')
checkpoint = tf.train.Checkpoint(opt = opt , siamese_model = siamese_model)


# 5.3 Build Train step Function


In [37]:
@tf.function
def train_step(batch):
    
    #ecord all of our operation
    with tf.GradientTape()as tape:
        #get anchor and positive\negative image
        X = batch[:2]
        #get label
        y = batch[2]
        
        
        #forward pass
        yhat = siamese_model(X, training =True)
        #calulate loss
        loss = binary_cross_loss(y,yhat)
    print(loss)    
    #calculate gradient
    grad = tape.gradient(loss, siamese_model.trainable_variables)

    
    #calculate updated weights and apply to siamese model
    
    
    opt.apply_gradients(zip(grad,siamese_model.trainable_variables))
    #return loss
    
    return loss

# 5.4 Build Training Loop

In [38]:
def train(data, EPOCHS):
    # Loop through epochs
    for epoch in range(1, EPOCHS+1):
        print('\n Epoch {}/{}'.format(epoch, EPOCHS))
        progbar = tf.keras.utils.Progbar(len(train_data))
        
        # Loop through each batch
        for idx, batch in enumerate(train_data):
            # Run train step here
            train_step(batch)
            progbar.update(idx+1)
            
        # SAVE CHECKPOINTS
        if epoch % 10 == 0:
            checkpoint.save(file_prefix=checkpoint_prefix)


# 5.5 Train the model 

In [39]:
EPOCHS =50

In [40]:


train(train_data,EPOCHS)


 Epoch 1/50
Tensor("binary_crossentropy/weighted_loss/value:0", shape=(), dtype=float32)
Tensor("binary_crossentropy/weighted_loss/value:0", shape=(), dtype=float32)

 Epoch 2/50

 Epoch 3/50

 Epoch 4/50

 Epoch 5/50

 Epoch 6/50

 Epoch 7/50

 Epoch 8/50

 Epoch 9/50

 Epoch 10/50

 Epoch 11/50

 Epoch 12/50

 Epoch 13/50

 Epoch 14/50

 Epoch 15/50

 Epoch 16/50

 Epoch 17/50

 Epoch 18/50

 Epoch 19/50

 Epoch 20/50

 Epoch 21/50

 Epoch 22/50

 Epoch 23/50

 Epoch 24/50

 Epoch 25/50

 Epoch 26/50

 Epoch 27/50

 Epoch 28/50

 Epoch 29/50

 Epoch 30/50

 Epoch 31/50

 Epoch 32/50

 Epoch 33/50

 Epoch 34/50

 Epoch 35/50

 Epoch 36/50

 Epoch 37/50

 Epoch 38/50

 Epoch 39/50

 Epoch 40/50

 Epoch 41/50

 Epoch 42/50

 Epoch 43/50

 Epoch 44/50

 Epoch 45/50

 Epoch 46/50

 Epoch 47/50

 Epoch 48/50

 Epoch 49/50

 Epoch 50/50


# 6. Evaluate model

## 6.1 Import Metrics

In [42]:
#import metric calculations
from tensorflow.keras.metrics import Precision,Recall

## 6.2 Make Predictions

In [41]:
# Get a batch of test data
test_input, test_val, y_true = test_data.as_numpy_iterator().next()

In [43]:
y_hat = siamese_model.predict([test_input, test_val])
y_hat



array([[9.9999994e-01],
       [6.3506921e-12],
       [9.9999970e-01],
       [9.9999952e-01],
       [5.0501014e-10],
       [9.9998760e-01],
       [9.9815077e-01],
       [9.9999464e-01],
       [1.0000000e+00],
       [7.2115358e-10],
       [7.3884430e-06],
       [9.9851876e-01],
       [9.9137586e-01],
       [7.5460994e-04],
       [9.9935746e-01],
       [1.0000000e+00]], dtype=float32)

In [44]:
# Post processing the results 
[1 if prediction > 0.5 else 0 for prediction in y_hat ]

[1, 0, 1, 1, 0, 1, 1, 1, 1, 0, 0, 1, 1, 0, 1, 1]

In [45]:
y_true

array([1., 0., 1., 1., 0., 1., 1., 1., 1., 0., 0., 1., 1., 0., 1., 1.],
      dtype=float32)

## 6.3 Calculate Metrics

In [46]:
# Creating a metric object 
m = Recall()

# Calculating the recall value 
m.update_state(y_true, y_hat)

# Return Recall Result
m.result().numpy()

1.0

In [47]:
# Creating a metric object 
m = Precision()

# Calculating the recall value 
m.update_state(y_true, y_hat)

# Return Recall Result
m.result().numpy()


1.0

In [48]:
r = Recall()
p = Precision()

for test_input, test_val, y_true in test_data.as_numpy_iterator():
    yhat = siamese_model.predict([test_input, test_val])
    r.update_state(y_true, yhat)
    p.update_state(y_true,yhat) 

print(r.result().numpy(), p.result().numpy())

1.0 1.0


## 6.4 Viz Results

In [None]:
# Set plot size 
plt.figure(figsize=(10,8))

# Set first subplot
plt.subplot(1,2,1)
plt.imshow(test_input[0])

# Set second subplot
plt.subplot(1,2,2)
plt.imshow(test_val[0])

# Renders cleanly
plt.show()

# 7. Save Model

In [50]:
# Save weights
siamese_model.save('siamesemodelv4.h5')



  saving_api.save_model(


In [51]:
L1Dist

__main__.L1Dist

In [52]:
# Reload model 
siamese_model = tf.keras.models.load_model('siamesemodelv4.h5', 
                                   custom_objects={'L1Dist':L1Dist,'BinaryCrossentropy':tf.losses.BinaryCrossentropy}) 



In [53]:
# Make predictions with reloaded model
siamese_model.predict([test_input, test_val])



array([[9.9996823e-01],
       [9.9709064e-01],
       [1.0000000e+00],
       [1.0000000e+00],
       [9.9851876e-01],
       [9.9999994e-01],
       [1.9055241e-06],
       [2.4309247e-06],
       [2.3866627e-11],
       [1.2864834e-06],
       [9.9999142e-01],
       [2.5349783e-08],
       [9.9999535e-01],
       [9.9993867e-01]], dtype=float32)

In [56]:
# View model summary
siamese_model.summary()

Model: "siameseNetwork"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_img (InputLayer)      [(None, 100, 100, 3)]        0         []                            
                                                                                                  
 validation_img (InputLayer  [(None, 100, 100, 3)]        0         []                            
 )                                                                                                
                                                                                                  
 embedding (Functional)      (None, 4096)                 3896044   ['input_img[0][0]',           
                                                          8          'validation_img[0][0]']      
                                                                                     

# 8.Real Time test

## 8.1 Verification Function

In [57]:
def verify(model, detection_threshold, verification_threshold):
    # Build results array
    results = []
    for image in os.listdir(os.path.join('application_data', 'verification_images')):
        input_img = preprocess(os.path.join('application_data', 'input_image', 'input_image.jpg'))
        validation_img = preprocess(os.path.join('application_data', 'verification_images', image))
        
        # Make Predictions 
        result = model.predict(list(np.expand_dims([input_img, validation_img], axis=1)))
        results.append(result)
    
    # Detection Threshold: Metric above which a prediciton is considered positive 
    detection = np.sum(np.array(results) > detection_threshold)
    
    # Verification Threshold: Proportion of positive predictions / total positive samples 
    verification = detection / len(os.listdir(os.path.join('application_data', 'verification_images'))) 
    verified = verification > verification_threshold
    
    return results, verified
    

## 8.2 OpenCV Real Time Verification

In [None]:
cap = cv2.VideoCapture(0)
while cap.isOpened():
    ret, frame = cap.read()
    frame = frame[170:170+250,200:200+250,:]
    
    cv2.imshow('Verification', frame)
    
    # Verification trigger
    if cv2.waitKey(10) & 0xFF == ord('v'):
        # Save input image to application_data/input_image folder 
#         hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
#         h, s, v = cv2.split(hsv)

#         lim = 255 - 10
#         v[v > lim] = 255
#         v[v <= lim] -= 10
        
#         final_hsv = cv2.merge((h, s, v))
#         img = cv2.cvtColor(final_hsv, cv2.COLOR_HSV2BGR)

        cv2.imwrite(os.path.join('application_data', 'input_image', 'input_image.jpg'), frame)
        # Run verification
        results, verified = verify(siamese_model, 0.7, 0.6)
        print(verified)
    
    if cv2.waitKey(10) & 0xFF == ord('q'):
        break
cap.release()
cv2.destroyAllWindows()

True
True
True
False
False
True


In [None]:
np.sum(np.squeeze(results)>0.7)

In [None]:
15/29