In [None]:
# pip installing
!pip install tensorflow opencv-python matplotlib

In [1]:
# Import standard dependencies
import cv2
import os
import random
import numpy as np
from matplotlib import pyplot as plt
# Import tensorflow dependencies - Functional API
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten
import tensorflow as tf
import uuid
from tensorflow.keras.metrics import Precision, Recall

In [2]:
gpus = tf.config.experimental.list_physical_devices('GPU')
print(gpus)

[PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]


In [3]:
from google.colab import drive
drive.mount('/content/drive/')

Mounted at /content/drive/


In [4]:
# Setup paths
path='/content/drive/MyDrive/FaceRecognition-main'
POS_PATH = os.path.join(path, 'positive')
NEG_PATH = os.path.join(path, 'negative')
ANC_PATH = os.path.join(path, 'anchor')

In [5]:
# Make the directories
if(not os.path.isdir(path +'//positive')):
    os.makedirs(POS_PATH)
if(not os.path.isdir(path +'//negative')):
    os.makedirs(NEG_PATH)
if(not os.path.isdir(path +'//anchor')):
    os.makedirs(ANC_PATH)
if(not os.path.isdir(path+'//training_checkpoints')):
    os.makedirs(path+'//training_checkpoints')
if(not os.path.isdir(path+'//application_data')):
    os.makedirs(path+'//application_data')
if(not os.path.isdir(path+'//images')):
    os.makedirs(path+'//images')
if(not os.path.isdir(path+'//application_data'+'//input_image')):
    os.makedirs(path+'//application_data'+'//input_image')
if(not os.path.isdir(path+'//application_data'+'//verification_image')):
    os.makedirs(path+'//application_data'+'//verification_image')

In [None]:
for directory in os.listdir(os.path.join(path, 'lfw')):
  for file in os.listdir(os.path.join(os.path.join(path, 'lfw'), directory)):
    EX_PATH = os.path.join(os.path.join(path, 'lfw'), directory, file)
    NEW_PATH = os.path.join(NEG_PATH, file)
    os.replace(EX_PATH, NEW_PATH)

KeyboardInterrupt: ignored

In [6]:
# Dummy input
user_name='Harsha'
USR_ANC = os.path.join(ANC_PATH, user_name)
USR_POS= os.path.join(POS_PATH, user_name)

In [None]:
# user_name=input("Enter Username :")

# while(os.path.isdir(os.path.join(path ,'positive', user_name))):
#     print("User name already exists, Choose different username")
#     user_name=input("Enter Username :")


# os.makedirs(os.path.join(POS_PATH, user_name))
# os.makedirs(os.path.join(ANC_PATH, user_name))
# USR_ANC = os.path.join(ANC_PATH, user_name)
# USR_POS= os.path.join(POS_PATH, user_name)

# video = cv2.VideoCapture(0)

# i = 0

# while(True):
#     i = i+1
#     ret, frame = video.read()
#     frame =frame[120:370,200:450,:] 
#     if cv2.waitKey(1) & 0xFF == ord('a'):
#         imgname=os.path.join(ANC_PATH , user_name,'{}.jpg'.format(uuid.uuid1()))
#         cv2.imwrite(imgname,frame)
#     if cv2.waitKey(1) & 0xFF == ord('p'):
#         imgname=os.path.join(POS_PATH , user_name,'{}.jpg'.format(uuid.uuid1()))
#         cv2.imwrite(imgname,frame)
#     cv2.imshow('Validation', frame)
#     if cv2.waitKey(1) & 0xFF == ord('q'):
#         break

# video.release()
# cv2.destroyAllWindows()


In [7]:
anchor= tf.data.Dataset.list_files(USR_ANC+'/*.jpg').take(400)
positive= tf.data.Dataset.list_files(USR_POS+'/*.jpg').take(400)
negative= tf.data.Dataset.list_files (NEG_PATH+'/*.jpg').take(400)

In [8]:
def preprocess(file_path):
    img_byte = tf.io.read_file(file_path)
    img = tf.io.decode_jpeg(img_byte)
    img = tf.image.resize(img,(100,100))
    img = img / 255.0
    return img

In [9]:
positives=tf.data.Dataset.zip((anchor,positive,tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives=tf.data.Dataset.zip((anchor,negative,tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data=positives.concatenate(negatives)

In [10]:
def preprocess_twin(img_input, img_validation, label):
    return (preprocess(img_input), preprocess(img_validation), label)

In [11]:
data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size=1024)

In [12]:
train_data = data.take(round(len(data)*.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

In [13]:
test_data = data.skip(round(len(data)*.7))
test_data = test_data.take(round(len(data)*.3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

In [14]:
def make_embed():
    inp = Input(shape=(100,100,3),name='input_image')
    
    c1= Conv2D(64, (10,10), activation = 'relu')(inp)
    m1 = MaxPooling2D(64,(2,2), padding = 'same')(c1)
    
    c2= Conv2D(128, (7,7), activation='relu')(m1)
    m2 = MaxPooling2D(64, (2,2) , padding = 'same')(c2)
    
    c3= Conv2D(128 , (4,4) , activation='relu')(m2)
    m3=MaxPooling2D(64, (2,2), padding = 'same')(c3)
    
    c4= Conv2D(256 , (4,4) , activation='relu')(m3)
    f1= Flatten()(c4)
    d1= Dense(4096, activation='sigmoid')(f1)
    
    return Model(inputs=[inp] , outputs=[d1] , name='embedding')

In [15]:
embed = make_embed()

In [16]:
embed.summary()

Model: "embedding"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_image (InputLayer)    [(None, 100, 100, 3)]     0         
                                                                 
 conv2d (Conv2D)             (None, 91, 91, 64)        19264     
                                                                 
 max_pooling2d (MaxPooling2D  (None, 46, 46, 64)       0         
 )                                                               
                                                                 
 conv2d_1 (Conv2D)           (None, 40, 40, 128)       401536    
                                                                 
 max_pooling2d_1 (MaxPooling  (None, 20, 20, 128)      0         
 2D)                                                             
                                                                 
 conv2d_2 (Conv2D)           (None, 17, 17, 128)       26

In [17]:
class L1Dist(Layer):
    def __init__(self, **kwargs):
        super().__init__()
    
    
    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding)

In [18]:
def make_siamese_model():
    input_image = Input(name='input_img', shape=(100,100,3))
    
    validation_image= Input(name='validation_img', shape=(100,100,3))
    
    siamese_layer = L1Dist()
    siamese_layer._name = 'distance'
    distances= siamese_layer(embed(input_image),embed(validation_image))
    
    classifier = Dense(1, activation='sigmoid')(distances)
    return Model(inputs=[input_image, validation_image],outputs=classifier,name='SiameseNetwork')

In [19]:
siam_model=make_siamese_model()
siam_model.summary()

Model: "SiameseNetwork"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_img (InputLayer)         [(None, 100, 100, 3  0           []                               
                                )]                                                                
                                                                                                  
 validation_img (InputLayer)    [(None, 100, 100, 3  0           []                               
                                )]                                                                
                                                                                                  
 embedding (Functional)         (None, 4096)         38960448    ['input_img[0][0]',              
                                                                  'validation_img[0][

In [20]:
binary_cross_loss=tf.losses.BinaryCrossentropy()
opt = tf.keras.optimizers.Adam(1e-4)

In [21]:
checkpoint_dir = path+'//training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir,'ckpt')
checkpoint = tf.train.Checkpoint(opt=opt, siamese_model=siam_model)

In [22]:
@tf.function
def train_step(batch):
    with tf.GradientTape() as tape:
        X=batch[:2]
        y= batch[2]
        
        yhat = siam_model(X, training=True)
        loss=binary_cross_loss(y, yhat)
        
    grad = tape.gradient(loss, siam_model.trainable_variables)
    opt.apply_gradients(zip(grad, siam_model.trainable_variables))
    return loss

In [23]:
def train(data , EPOCHS):
    for epoch in range(1, EPOCHS+1):
        print('\n Epoch {}/{}'.format(epoch,EPOCHS))
        progbar = tf.keras.utils.Progbar(len(data))
        for idx, batch in enumerate(data):
            train_step(batch)
            progbar.update(idx+1)
        if epoch % 10 == 0:
            checkpoint.save(file_prefix=checkpoint_prefix)

In [24]:
EPOCHS=100

In [25]:
train(train_data,EPOCHS)


 Epoch 1/100

 Epoch 2/100

 Epoch 3/100

 Epoch 4/100

 Epoch 5/100

 Epoch 6/100

 Epoch 7/100

 Epoch 8/100

 Epoch 9/100

 Epoch 10/100

 Epoch 11/100

 Epoch 12/100

 Epoch 13/100

 Epoch 14/100

 Epoch 15/100

 Epoch 16/100

 Epoch 17/100

 Epoch 18/100

 Epoch 19/100

 Epoch 20/100

 Epoch 21/100

 Epoch 22/100

 Epoch 23/100

 Epoch 24/100

 Epoch 25/100

 Epoch 26/100

 Epoch 27/100

 Epoch 28/100

 Epoch 29/100

 Epoch 30/100

 Epoch 31/100

 Epoch 32/100

 Epoch 33/100

 Epoch 34/100

 Epoch 35/100

 Epoch 36/100

 Epoch 37/100

 Epoch 38/100

 Epoch 39/100

 Epoch 40/100

 Epoch 41/100

 Epoch 42/100

 Epoch 43/100

 Epoch 44/100

 Epoch 45/100

 Epoch 46/100

 Epoch 47/100

 Epoch 48/100

 Epoch 49/100

 Epoch 50/100

 Epoch 51/100

 Epoch 52/100

 Epoch 53/100

 Epoch 54/100

 Epoch 55/100

 Epoch 56/100

 Epoch 57/100

 Epoch 58/100

 Epoch 59/100

 Epoch 60/100

 Epoch 61/100

 Epoch 62/100

 Epoch 63/100

 Epoch 64/100

 Epoch 65/100

 Epoch 66/100

 Epoch 67/100

 Ep

In [26]:
test_input, test_val, y_true= test_data.as_numpy_iterator().next()
y_hat = siam_model.predict([test_input, test_val])
[1 if prediction > 0.5 else 0 for prediction in y_hat]

[0, 1, 1, 1, 1, 0, 0, 0, 0, 0, 1, 1, 0, 0, 1, 0]

In [27]:
m= Recall()
m.update_state(y_true,y_hat)
m.result().numpy()

1.0

In [28]:
m= Precision()
m.update_state(y_true,y_hat)
m.result().numpy()

1.0

In [29]:
siam_model.save(path+'//'+'siamesemodelv2.h5')



In [30]:
# Reload model 
model = tf.keras.models.load_model(path+'//'+'siamesemodelv2.h5', custom_objects={'L1Dist':L1Dist, 'BinaryCrossentropy':tf.losses.BinaryCrossentropy})



In [31]:
model.predict([test_input, test_val])

array([[4.4630629e-13],
       [9.9999917e-01],
       [1.0000000e+00],
       [9.9977952e-01],
       [1.0000000e+00],
       [6.3462874e-11],
       [4.6726263e-15],
       [4.9249506e-12],
       [7.3429670e-17],
       [2.8252012e-08],
       [1.0000000e+00],
       [9.9999619e-01],
       [4.2344045e-08],
       [3.2221343e-15],
       [9.9999976e-01],
       [3.9674058e-12]], dtype=float32)

In [32]:
model.summary()

Model: "SiameseNetwork"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_img (InputLayer)         [(None, 100, 100, 3  0           []                               
                                )]                                                                
                                                                                                  
 validation_img (InputLayer)    [(None, 100, 100, 3  0           []                               
                                )]                                                                
                                                                                                  
 embedding (Functional)         (None, 4096)         38960448    ['input_img[0][0]',              
                                                                  'validation_img[0][

In [33]:
def verify(frame ,model, detection_threshold,verification_threshold):
    results=[]
    for image in os.listdir(os.path.join(path+'//'+ 'application_data','verification_image')):
        input_img = preprocess(os.path.join(path+'//'+ 'application_data','input_image','input_image.jpg'))
        validation_img= preprocess(os.path.join(path+'//'+ 'application_data','verification_image',image))
        
        result= model.predict(list(np.expand_dims([input_img,validation_img],axis=1)))
        results.append(result)
    
    detection=np.sum(np.array(results)>detection_threshold)
    verification= detection / len(os.listdir(os.path.join(path+'//'+ 'application_data','verification_image')))
    verified=verification > verification_threshold
    return results,verified

In [34]:
cap = cv2.VideoCapture(0)
while cap.isOpened():
    ret,frame=cap.read()
    frame = frame[120:370,200:450,:] 
    cv2.imshow('Validation', frame)
    if cv2.waitKey(10) & 0xFF == ord('v'):
        cv2.imwrite(os.path.join(path+'//'+ 'application_data','input_image','input_image.jpg'),frame)
        results, verified=verify(frame, model,0.9,0.7)
        print(verified)
    if cv2.waitKey(10) & 0xFF == ord('q'):
        break
cap.release()
cv2.destroyAllWindows()