# **Deep Facial Recognition With A Siamese Network** #

In [13]:
import os
import cv2
import random
import numpy as np
from matplotlib import pyplot as plt

In [14]:
import tensorflow as tf 
from tensorflow.keras.models import Model 
from tensorflow.keras.layers import Layer , Conv2D , MaxPooling2D , Dense , Flatten , Input

In [15]:
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus :
    tf.config.experimental.set_memory_growth(gpu,True)

In [16]:
POS_PATH = os.path.join('data' , 'positive')
NEG_PATH = os.path.join('data' , 'negative')
ANC_PATH = os.path.join('data' , 'anchor')

if not os.path.exists(POS_PATH):
    os.makedirs(POS_PATH)
else:
    print(f"The directory '{POS_PATH}' already exists.")

if not os.path.exists(NEG_PATH):
    os.makedirs(NEG_PATH)
else:
    print(f"The directory '{NEG_PATH}' already exists.")

if not os.path.exists(ANC_PATH):
    os.makedirs(ANC_PATH)
else:
    print(f"The directory '{ANC_PATH}' already exists.")

The directory 'data\positive' already exists.
The directory 'data\negative' already exists.
The directory 'data\anchor' already exists.


In [17]:
for directory in os.listdir('lfw'):
    for file in os.listdir(os.path.join('lfw' , directory)):
        EX_PATH = os.path.join('lfw' , directory , file)
        NEW_PATH = os.path.join(NEG_PATH , file)
        os.replace(EX_PATH , NEW_PATH)

FileNotFoundError: [WinError 3] The system cannot find the path specified: 'lfw'

In [18]:
import uuid

In [19]:
cap = cv2.VideoCapture(0)
while cap.isOpened():
    
    ret , frame = cap.read()
    frame = frame[120:120+250,200:200+250,:]
    
    if cv2.waitKey(1) & 0XFF == ord('a'):
        print(1)
        img_name = os.path.join(ANC_PATH , '{}.jpg'.format(uuid.uuid1()))
        print(img_name)
        cv2.imwrite(img_name , frame)
        
    if cv2.waitKey(1) & 0XFF == ord('p'):
        img_name = os.path.join(POS_PATH , '{}.jpg'.format(uuid.uuid1()))
        cv2.imwrite(img_name , frame)


    cv2.imshow('Image Collection' , frame)
    if cv2.waitKey(1) & 0XFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()

1
data\anchor\caebf88a-cc07-11ee-8d96-bc542f0a7ab1.jpg


In [20]:
anchor = tf.data.Dataset.list_files(ANC_PATH+'\*.jpg').take(300)
positive = tf.data.Dataset.list_files(POS_PATH+'\*.jpg').take(300)
negative = tf.data.Dataset.list_files(NEG_PATH+'\*.jpg').take(300)

In [21]:
def preprocess(file_path):
    
    byte_img = tf.io.read_file(file_path)
    img = tf.io.decode_jpeg(byte_img)
    img = tf.image.resize(img,(100,100))
    img = img/255.0
    return img

In [22]:
positives = tf.data.Dataset.zip((anchor,positive,tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor,negative,tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data = positives.concatenate(negatives)

In [23]:
samples = data.as_numpy_iterator()
example = samples.next()

In [24]:
def preprocess_twin(input_img , validation_img , label):
    return(preprocess(input_img) , preprocess(validation_img) , label)

In [25]:
twin = preprocess_twin(*example)

In [26]:
twin

(<tf.Tensor: shape=(100, 100, 3), dtype=float32, numpy=
 array([[[0.9843137 , 1.        , 1.        ],
         [0.98333335, 1.        , 1.        ],
         [0.98039216, 1.        , 1.        ],
         ...,
         [0.33235294, 0.28333333, 0.1872549 ],
         [0.3360294 , 0.28112745, 0.17916666],
         [0.34509805, 0.28235295, 0.18431373]],
 
        [[0.98039216, 1.        , 1.        ],
         [0.98039216, 1.        , 1.        ],
         [0.9789216 , 1.        , 1.        ],
         ...,
         [0.35637254, 0.31004903, 0.22181372],
         [0.35465688, 0.3009804 , 0.2017157 ],
         [0.3617647 , 0.3       , 0.20980392]],
 
        [[0.9705882 , 1.        , 1.        ],
         [0.9705882 , 1.        , 1.        ],
         [0.96911764, 1.        , 1.        ],
         ...,
         [0.3602941 , 0.3139706 , 0.22573529],
         [0.36985293, 0.3139706 , 0.22279412],
         [0.38235295, 0.32058823, 0.23039216]],
 
        ...,
 
        [[0.11176471, 0.14509805

In [27]:
data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size=1024)

In [28]:
train_data = data.take(round(len(data)*0.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

In [29]:
test_data = data.skip(round(len(data)*0.7))
test_data = test_data.take(round(len(data)*0.3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

In [30]:
def make_embedding():
    
    input = Input(shape=(100,100,3) , name='input_img')
    
    c1 = Conv2D(64,(10,10) , activation='relu')(input)
    m1 = MaxPooling2D(64,(2,2) , padding='same')(c1)
    
    c2 = Conv2D(128,(7,7) , activation='relu')(m1)
    m2 = MaxPooling2D(64,(2,2) , padding='same')(c2)

    c3 = Conv2D(128,(4,4) , activation='relu')(m2)
    m3 = MaxPooling2D(64,(2,2) , padding='same')(c3)
    
    c4 = Conv2D(256,(4,4) , activation='relu')(m3)
    f1 = Flatten()(c4)
    d1 = Dense(4096 , activation='sigmoid')(f1)
    
    return Model(inputs=[input] , outputs=[d1] , name='embedding')

In [31]:
embedding = make_embedding()

In [32]:
embedding.summary()

Model: "embedding"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_img (InputLayer)       [(None, 100, 100, 3)]     0         
_________________________________________________________________
conv2d (Conv2D)              (None, 91, 91, 64)        19264     
_________________________________________________________________
max_pooling2d (MaxPooling2D) (None, 46, 46, 64)        0         
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 40, 40, 128)       401536    
_________________________________________________________________
max_pooling2d_1 (MaxPooling2 (None, 20, 20, 128)       0         
_________________________________________________________________
conv2d_2 (Conv2D)            (None, 17, 17, 128)       262272    
_________________________________________________________________
max_pooling2d_2 (MaxPooling2 (None, 9, 9, 128)         0 

In [33]:
class L1Dist(Layer):
    
    def __init__(self,**kwargs):
        super().__init__()
        
    def call(self,input_embedding,validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding)

In [34]:
l1 = L1Dist()

In [36]:
def make_siamese_model():
    input_image = Input(name='input_image' , shape=(100,100,3))
    validation_image = Input(name='validation_image' , shape=(100,100,3))
    
    siamese_layer = L1Dist()
    siamese_layer._name = 'distance'
    distances = siamese_layer(embedding(input_image) , embedding(validation_image))
    
    classifier =Dense(1 , activation='sigmoid')(distances)
    
    return Model(inputs=[input_image , validation_image] , outputs = classifier , name='SiameseNetwork')
    

In [37]:
siamese_model = make_siamese_model()
siamese_model.summary()

Model: "SiameseNetwork"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_image (InputLayer)        [(None, 100, 100, 3) 0                                            
__________________________________________________________________________________________________
validation_image (InputLayer)   [(None, 100, 100, 3) 0                                            
__________________________________________________________________________________________________
embedding (Functional)          (None, 4096)         38960448    input_image[0][0]                
                                                                 validation_image[0][0]           
__________________________________________________________________________________________________
distance (L1Dist)               (None, 4096)         0           embedding[0][0]     

In [38]:
binary_cross_loss = tf.keras.losses.BinaryCrossentropy()
opt = tf.keras.optimizers.Adam(1e-4)

In [39]:
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir , 'ckpt')
checkpoint = tf.train.Checkpoint(opt=opt , siamese_model=siamese_model)

In [40]:
@tf.function
def train_step(batch):

    with tf.GradientTape() as tape :
        x = batch[:2]
        y = batch[2]
        
        y_hat = siamese_model(x , training=True)
        loss = binary_cross_loss(y , y_hat)
    
    print(loss)
    grad = tape.gradient(loss , siamese_model.trainable_variables)
    opt.apply_gradients(zip(grad, siamese_model.trainable_variables))
    
    return loss

In [41]:
def train(data , epochs):

    for epoch in range(epochs+1):
        print('\n Epoch {}/{}'.format(epoch,epochs))
        progbar = tf.keras.utils.Progbar(len(data))
        
        for idx , batch in enumerate(data):
            train_step(batch)
            progbar.update(idx+1)
    
    if epoch % 10 == 0 :
        checkpoint.save(file_prefix=checkpoint_prefix)

In [42]:
epochs = 50

In [46]:
train(train_data,epochs)


 Epoch 0/50
Tensor("binary_crossentropy/weighted_loss/value:0", shape=(), dtype=float32)
Tensor("binary_crossentropy/weighted_loss/value:0", shape=(), dtype=float32)

 Epoch 1/50

 Epoch 2/50

 Epoch 3/50

 Epoch 4/50

 Epoch 5/50

 Epoch 6/50

 Epoch 7/50

 Epoch 8/50

 Epoch 9/50

 Epoch 10/50

 Epoch 11/50

 Epoch 12/50

 Epoch 13/50

 Epoch 14/50

 Epoch 15/50

 Epoch 16/50

 Epoch 17/50

 Epoch 18/50

 Epoch 19/50

 Epoch 20/50

 Epoch 21/50

 Epoch 22/50

 Epoch 23/50

 Epoch 24/50

 Epoch 25/50

 Epoch 26/50

 Epoch 27/50

 Epoch 28/50

 Epoch 29/50

 Epoch 30/50

 Epoch 31/50

 Epoch 32/50

 Epoch 33/50

 Epoch 34/50

 Epoch 35/50

 Epoch 36/50

 Epoch 37/50

 Epoch 38/50

 Epoch 39/50

 Epoch 40/50

 Epoch 41/50

 Epoch 42/50

 Epoch 43/50

 Epoch 44/50

 Epoch 45/50

 Epoch 46/50

 Epoch 47/50

 Epoch 48/50

 Epoch 49/50

 Epoch 50/50


In [65]:
from tensorflow.keras.metrics import Precision , Recall

test_input , test_val , y_true = test_data.as_numpy_iterator().next()

In [66]:
predictions = siamese_model.predict([test_input,test_val])
[1 if prediction > 0.5 else 0 for prediction in predictions]

[0, 1, 1, 0, 1, 1, 0, 1, 0, 1, 0, 1, 1, 1, 1, 1]

In [67]:
m = Recall()
m.update_state(y_true , predictions)
m.result().numpy()

1.0

In [64]:
siamese_model.save('model.h5')



In [51]:
model = tf.keras.models.load_model('model.h5' , custom_objects={'L1Dist' : L1Dist} )



In [52]:
model.summary()

Model: "SiameseNetwork"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_image (InputLayer)        [(None, 100, 100, 3) 0                                            
__________________________________________________________________________________________________
validation_image (InputLayer)   [(None, 100, 100, 3) 0                                            
__________________________________________________________________________________________________
embedding (Functional)          (None, 4096)         38960448    input_image[0][0]                
                                                                 validation_image[0][0]           
__________________________________________________________________________________________________
l1_dist_2 (L1Dist)              (None, 4096)         0           embedding[0][0]     

In [None]:
model.predict([test_input , test_val])

array([[3.2758280e-12],
       [9.9985719e-01],
       [9.9999511e-01],
       [3.6980007e-12],
       [9.9999964e-01],
       [2.4744379e-08],
       [9.9996805e-01],
       [3.2490207e-05],
       [9.9974757e-01],
       [1.0000000e+00],
       [9.9979907e-01],
       [9.9999964e-01],
       [9.9999845e-01],
       [9.9982387e-01],
       [2.6359448e-12],
       [7.4328088e-10]], dtype=float32)

In [59]:


def verify(model , detection_threshold , verification_threshold):
    
    results = []
    for image in os.listdir(os.path.join('application_data' , 'verification_images')):
        input_image = preprocess(os.path.join('application_data' , 'input_image' , 'input_image.jpg'))
        validation_image = preprocess(os.path.join('application_data' , 'verification_images' , image))
        
        result = model.predict(list(np.expand_dims([input_image , validation_image] , axis=1)))
        results.append(result)
    
    detection = np.sum(np.array(results) > detection_threshold)
    verification = detection / len(os.listdir(os.path.join('application_data' , 'verification_images')))
    verified = verification > verification_threshold
    
    return results , verified

In [68]:
from unittest import result


cap = cv2.VideoCapture(0)
while cap.isOpened():
    ret , frame = cap.read()
    frame = frame[120:120+250,200:200+250,:]
    cv2.imshow('veridication' , frame)
    
    if cv2.waitKey(10) & 0xFF == ord('v'):
        cv2.imwrite(os.path.join('application_data' , 'input_image' ,'input_image.jpg') , frame)
        
        results , verified = verify(model, 0.9 , 0.7)
        print(verified)
    
    if cv2.waitKey(10) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()

False
False
True
