# Initialization

In [1]:
#import dependencies
import cv2
import os
import random
import numpy as np 
from matplotlib import pyplot as plt 
import uuid

import tensorflow as tf
from keras.models import Model 
from keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten
from tensorflow.keras.metrics import Precision, Recall

In [2]:
# Avoid OOM errors by setting GPU Memory Consumption Growth
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus: 
    tf.config.experimental.set_memory_growth(gpu, True)
gpus = tf.config.experimental.list_physical_devices('GPU')


In [3]:
POS_PATH = os.path.join('data', 'positive')
NEG_PATH = os.path.join('data', 'negative')
ANC_PATH = os.path.join('data', 'anchor')

In [4]:
#makedirectories
if not os.path.exists('data'):
    os.makedirs(POS_PATH)
    os.makedirs(NEG_PATH)
    os.makedirs(ANC_PATH)

In [5]:
#Moving LFW data to negative dataset
# for i in os.listdir('lfw'):
#     for j in os.listdir(os.path.join('lfw', i)):
#         ex_path = os.path.join('lfw', i, j)
#         new_path = os.path.join(NEG_PATH, j)
#         os.replace(ex_path, new_path)

In [6]:
# cam = cv2.VideoCapture(0)
# cam.set(3, 640)
# cam.set(4, 480)
# while cam.isOpened():
#     ret, frame = cam.read()
#     frame = cv2.flip(frame, 1)
#     frame = frame[140:140+250,150:150+250,:]
#     cv2.imshow('Images', frame)
#     if cv2.waitKey(1) & 0xFF == ord('a'):
#         imgname = os.path.join(ANC_PATH, '{}.jpg'.format(uuid.uuid1()))
#         cv2.imwrite(imgname, frame)
#     if cv2.waitKey(1) & 0xFF == ord('p'):
#         imgname = os.path.join(POS_PATH, '{}.jpg'.format(uuid.uuid1()))
#         cv2.imwrite(imgname, frame)
       
#     if cv2.waitKey(1) & 0xFF == ord('q'):
#         break

# cam.release()
# cv2.destroyAllWindows()

# Preprocessing

In [7]:
anchor = tf.data.Dataset.list_files(ANC_PATH + '/*.jpg').take(300)
positive = tf.data.Dataset.list_files(POS_PATH + '/*.jpg').take(300)
negative = tf.data.Dataset.list_files(NEG_PATH + '/*.jpg').take(300)

2025-01-03 14:00:22.381955: I metal_plugin/src/device/metal_device.cc:1154] Metal device set to: Apple M3
2025-01-03 14:00:22.382047: I metal_plugin/src/device/metal_device.cc:296] systemMemory: 16.00 GB
2025-01-03 14:00:22.382069: I metal_plugin/src/device/metal_device.cc:313] maxCacheSize: 5.33 GB
2025-01-03 14:00:22.382495: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:303] Could not identify NUMA node of platform GPU ID 0, defaulting to 0. Your kernel may not have been built with NUMA support.
2025-01-03 14:00:22.382686: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:269] Created TensorFlow device (/job:localhost/replica:0/task:0/device:GPU:0 with 0 MB memory) -> physical PluggableDevice (device: 0, name: METAL, pci bus id: <undefined>)


In [8]:
def preprocess(file_path):

    byte_img = tf.io.read_file(file_path)
    img = tf.io.decode_jpeg(byte_img)
    img = tf.image.resize(img, (105, 105))
    img = img/255.0
    return img 

In [9]:
anchor_preprocessed = anchor.map(preprocess)
positive_preprocessed = positive.map(preprocess)
negative_preprocessed = negative.map(preprocess)

In [10]:
positives = tf.data.Dataset.zip((anchor_preprocessed,positive_preprocessed, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor_preprocessed,negative_preprocessed, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data = positives.concatenate(negatives)

In [11]:
samples = data.as_numpy_iterator()


In [12]:
samples.next()

(array([[[0.4745098 , 0.4117647 , 0.3529412 ],
         [0.4745098 , 0.4117647 , 0.3529412 ],
         [0.4745098 , 0.4117647 , 0.35648927],
         ...,
         [0.31764707, 0.23529412, 0.21568628],
         [0.32885155, 0.23557422, 0.21960784],
         [0.32941177, 0.23529412, 0.21960784]],
 
        [[0.48655462, 0.41988796, 0.35658264],
         [0.48655462, 0.41988796, 0.35714284],
         [0.48655462, 0.41824067, 0.35879016],
         ...,
         [0.31764707, 0.23529412, 0.21568628],
         [0.32857147, 0.2352941 , 0.21932772],
         [0.32932505, 0.23520741, 0.21952114]],
 
        [[0.49019608, 0.42352942, 0.35648927],
         [0.4900427 , 0.42337602, 0.36063093],
         [0.487246  , 0.42057934, 0.35783425],
         ...,
         [0.31959006, 0.23432261, 0.21568628],
         [0.32154194, 0.22788449, 0.21533947],
         [0.3254902 , 0.23137255, 0.21923436]],
 
        ...,
 
        [[0.5305323 , 0.37871152, 0.3542484 ],
         [0.4801788 , 0.3389356 , 0.31596

In [13]:
def preprocess_twin(input_img, val_img, label):
    return (input_img, val_img), label

In [14]:
#data pipeline
# data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size=1024)

In [15]:
def split_features_label(anchor, comparison, label):
        return (anchor, comparison), label

In [16]:
# dataset = data.map(split_features_label)

# pair_data = dataset.map(lambda x, y: x)
# labels = dataset.map(lambda x, y: y)

In [17]:
#training set
train_data = data.take(round(len(data)*.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

In [18]:
#testing set
test_data = data.skip(round(len(data)*.7))
test_data = test_data.take(round(len(data)*.3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

# Model Engineering

In [19]:
#Embedding Layer
def embedding_model():
    inp = Input(shape=(105,105,3), name= 'input_name')
    
    c1 = Conv2D(64, (10,10), activation='relu')(inp)
    m1 = MaxPooling2D(64,(2,2), padding='same')(c1)
    
    c2 = Conv2D(128, (7,7), activation='relu')(m1)
    m2 = MaxPooling2D(64, (2,2), padding='same')(c2)
    
    c3 = Conv2D(128, (4,4), activation='relu')(m2)
    m3 = MaxPooling2D(64, (2,2), padding='same')(c3)
    
    c4 = Conv2D(256, (4,4), activation='relu')(m3)
    f1 = Flatten()(c4)
    d1 = Dense(4096, activation='sigmoid')(f1)
    
    
    return Model(inputs = [inp], outputs = d1 , name= 'embedding')

In [20]:
embedding = embedding_model()
embedding.summary()

Model: "embedding"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_name (InputLayer)     [(None, 105, 105, 3)]     0         
                                                                 
 conv2d (Conv2D)             (None, 96, 96, 64)        19264     
                                                                 
 max_pooling2d (MaxPooling2  (None, 48, 48, 64)        0         
 D)                                                              
                                                                 
 conv2d_1 (Conv2D)           (None, 42, 42, 128)       401536    
                                                                 
 max_pooling2d_1 (MaxPoolin  (None, 21, 21, 128)       0         
 g2D)                                                            
                                                                 
 conv2d_2 (Conv2D)           (None, 18, 18, 128)       26

In [21]:
#Distance Layer
class L1Dist(Layer):
    def __init__(self, **kwargs):
        super().__init__()
        
    def call(self, input_embedding, validation_embedding):
        
        return tf.math.abs((input_embedding - validation_embedding))

In [22]:
#siamese model
def siamese_model():
    
    inp = Input(name='input_img', shape = (105,105,3))
    
    val = Input(name='val_img', shape = (105,105,3))
    
    siamese_layer = L1Dist()
    siamese_layer._name = 'distance'
    distances = siamese_layer(embedding(inp), embedding(val))
    
    classifer = Dense(1, activation='sigmoid')(distances)
    
    return Model(inputs = [inp, val], outputs = classifer, name='siamese_model')

In [23]:
siamese_network = siamese_model()
siamese_network.summary()

Model: "siamese_model"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_img (InputLayer)      [(None, 105, 105, 3)]        0         []                            
                                                                                                  
 val_img (InputLayer)        [(None, 105, 105, 3)]        0         []                            
                                                                                                  
 embedding (Functional)      (None, 4096)                 3896044   ['input_img[0][0]',           
                                                          8          'val_img[0][0]']             
                                                                                                  
 distance (L1Dist)           (None, 4096)                 0         ['embedding[0][0]'

# Training

In [24]:
binary_cross_loss = tf.keras.losses.BinaryCrossentropy()
opt = tf.keras.optimizers.legacy.Adam(1e-4)

checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
checkpoint = tf.train.Checkpoint(opt=opt, siamese_model=siamese_network)

In [25]:
# model = siamese_network
# model.compile(optimizer=opt, loss=binary_cross_loss, metrics=['accuracy']) 
# model.fit(train_data, epochs=5, validation_data=test_data)

In [26]:

def train_step(batch):
    
    with tf.GradientTape() as tape:
        x = batch[:2]
        y = batch[2]
        
        yhat = siamese_network(x, training=True)
        loss = binary_cross_loss(y, yhat)
    
    
    grad = tape.gradient(loss, siamese_network.trainable_variables)
    
    opt.apply_gradients(zip(grad, siamese_network.trainable_variables))
    
    return loss

In [27]:
def train(x, epochs):
    
    for epoch in range(1,epochs+1):
        print('\n Epoch: {}/{} '.format(epoch, epochs))
        progbar = tf.keras.utils.Progbar(len(x))
        
        for i, batch in enumerate(x):
            loss = train_step(batch)
            print('Loss: {}'.format(loss))
            progbar.update(i+1)
            
            
        if epoch % 10 == 0:
            checkpoint.save(file_prefix = checkpoint_prefix)

In [28]:
train(train_data, 50)


 Epoch: 1/50 
Loss: 0.6908501386642456
 1/27 [>.............................] - ETA: 50sLoss: 0.6803735494613647
 2/27 [=>............................] - ETA: 36sLoss: 0.6018444895744324
 3/27 [==>...........................] - ETA: 35sLoss: 0.665878415107727
 4/27 [===>..........................] - ETA: 33sLoss: 0.5680578947067261
 5/27 [====>.........................] - ETA: 32sLoss: 0.552278995513916
 6/27 [=====>........................] - ETA: 30sLoss: 0.4440516531467438

 Epoch: 2/50 
Loss: 0.3607533574104309
 1/27 [>.............................] - ETA: 38sLoss: 0.5032770037651062
 2/27 [=>............................] - ETA: 36sLoss: 0.4681274890899658
 3/27 [==>...........................] - ETA: 35sLoss: 0.3765971064567566
 4/27 [===>..........................] - ETA: 33sLoss: 0.5176407694816589
 5/27 [====>.........................] - ETA: 32sLoss: 0.5689922571182251
 6/27 [=====>........................] - ETA: 30sLoss: 0.4295395612716675

 Epoch: 3/50 
Loss: 0.09946784377

# Model Evaluation

In [30]:
r = Recall()
p = Precision()

for test_input, test_val, y_true in test_data.as_numpy_iterator():
    yhat = siamese_network.predict([test_input, test_val])
    r.update_state(y_true, yhat)
    p.update_state(y_true,yhat) 

print(r.result().numpy(), p.result().numpy())

: 

In [None]:
siamese_model.save('siamesemodelv2.h5')

In [None]:
# Reload model 
siamese_model = tf.keras.models.load_model('siamesemodelv2.h5', 
                                   custom_objects={'L1Dist':L1Dist, 'BinaryCrossentropy':tf.losses.BinaryCrossentropy})