# Import Dependancies

In [1]:
import tensorflow as tf
import matplotlib.pyplot as plt
import cv2
import os
import random

In [2]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer,MaxPooling2D,Conv2D,Input,Flatten,Dense


# Setup folder paths:

In [3]:
pwd

'C:\\Users\\a0g0btd\\Documents\\Anisha\\ML project\\Facial_Recognition'

In [4]:
POS_PATH=os.path.join('data','positive')
NEG_PATH=os.path.join('data','negative')
ANC_PATH=os.path.join('data','anchor')

# Collecting Data for anchor and postive images:

In [5]:
cap=cv2.VideoCapture(0)
counter=0
while cap.isOpened():
    ret,frame=cap.read()
    
    frame=frame[120:120+250,200:200+250]
    
    cv2.imshow("Image Collection",frame)
    if cv2.waitKey(1) & 0XFF ==ord('a'):
        counter=counter+1
        img=ANC_PATH+'\\anchor_'+str(counter)+'.jpg'
        cv2.imwrite(img,frame)
    
    if cv2.waitKey(1) & 0XFF ==ord('p'):
        counter=counter+1
        img=POS_PATH+'\\positive_'+str(counter)+'.jpg'
        cv2.imwrite(img,frame)
        
        
    if cv2.waitKey(1)& 0XFF==ord('q'):
        break
cap.release()
cv2.destroyAllWindows()

# Collecting data for Negative images:

In [None]:
!tar -xf lfw.tgz

In [None]:
import shutil
for directory in os.listdir('lfw'):
    print(directory)
    for file in os.listdir(os.path.join('lfw',directory)):
        EX_PATH=os.path.join('lfw',directory,file)
        NEW_PATH=os.path.join(NEG_PATH,file)
        shutil.copy(EX_PATH,NEG_PATH)

# Creating Data Augmentation for anchor and positive images:

In [6]:
def data_aug(img):
    
    data=[]
    
    for i in range(10):
        img=tf.image.stateless_random_brightness(img,max_delta=0.02,seed=(1,2))
        img=tf.image.stateless_random_flip_left_right(img, seed=(np.random.randint(100),np.random.randint(100)))
        img=tf.image.stateless_random_contrast(img,0.9,1,seed=(np.random.randint(100),np.random.randint(100)))
        img=tf.image.stateless_random_jpeg_quality(img,90,100,seed=(np.random.randint(100),np.random.randint(100)))
        img=tf.image.stateless_random_saturation(img,0.9,1.0,seed=(np.random.randint(100),np.random.randint(100)))
        
        data.append(img)
    return data

In [7]:
import numpy as np
import uuid

for file_name in os.listdir(os.path.join(POS_PATH)):
    img_path = os.path.join(POS_PATH, file_name)
    img = cv2.imread(img_path)
    augmented_images = data_aug(img) 
    
    for image in augmented_images:
        cv2.imwrite(os.path.join(POS_PATH, '{}.jpg'.format(uuid.uuid1())), image.numpy())

In [8]:
for file_name in os.listdir(os.path.join(ANC_PATH)):
    img_path = os.path.join(ANC_PATH, file_name)
    img = cv2.imread(img_path)
    augmented_images = data_aug(img) 
    
    for image in augmented_images:
        cv2.imwrite(os.path.join(ANC_PATH, '{}.jpg'.format(uuid.uuid1())), image.numpy())

# Creating dataset:

In [9]:
anchor=tf.data.Dataset.list_files(ANC_PATH+'\*.jpg').take(2200)
positive=tf.data.Dataset.list_files(POS_PATH+'\*.jpg').take(2200)
negative=tf.data.Dataset.list_files(NEG_PATH+'\*.jpg').take(2200)

In [10]:
anchor.as_numpy_iterator().next()

b'data\\anchor\\8ec55d96-29be-11ed-947d-c696b7c0cd20.jpg'

In [11]:
positives = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data = positives.concatenate(negatives)

In [12]:
data.as_numpy_iterator().next()

(b'data\\anchor\\b75faa6c-29be-11ed-bd4f-c696b7c0cd20.jpg',
 b'data\\positive\\68a8f60c-29be-11ed-9306-c696b7c0cd20.jpg',
 1.0)

# Pre-processing the image:

In [13]:
def preprocess(file_path):
    
    img=tf.io.read_file(file_path)
    
    img=tf.io.decode_jpeg(img)
    
    img=tf.image.resize(img,(100,100))
    
    img=img/255.0
    
    return img

In [14]:
def data_preprocess(train_img,validation_img,label):
    return(preprocess(train_img),preprocess(validation_img),label)

# Partition into training and testing data:

In [15]:
data=data.map(data_preprocess).cache().shuffle(buffer_size=10000)

In [16]:
train_data=data.take(round(len(data)*.7))
train_data=train_data.batch(16).prefetch(tf.data.AUTOTUNE)

In [17]:
test_data=data.skip(round(len(data)*0.7)).take(round(len(data)*0.3))
test_data=test_data.batch(16).prefetch(tf.data.AUTOTUNE)

# Building the embedding layer:

In [18]:
def make_embedding():
    
    inp=Input(shape=(100,100,3),name='input_layer')
    
    c1=Conv2D(64,(10,10),activation='relu')(inp)
    m1=MaxPooling2D(64,(2,2),padding='same')(c1)
    
    c2=Conv2D(128,(7,7),activation='relu')(m1)
    m2=MaxPooling2D(64,(2,2),padding='same')(c2)
    
    c3=Conv2D(128,(4,4),activation='relu')(m2)
    m3=MaxPooling2D(64,(2,2),padding='same')(c3)
    
    c4=Conv2D(256,(4,4),activation='relu')(m3)
    f1=Flatten()(c4)
    d1=Dense(4096,activation='sigmoid')(f1)
    
    return Model(inputs=[inp],outputs=[d1],name='embedding')   

In [19]:
embedding_layer=make_embedding()

In [20]:
embedding_layer.summary()

Model: "embedding"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_layer (InputLayer)    [(None, 100, 100, 3)]     0         
                                                                 
 conv2d (Conv2D)             (None, 91, 91, 64)        19264     
                                                                 
 max_pooling2d (MaxPooling2D  (None, 46, 46, 64)       0         
 )                                                               
                                                                 
 conv2d_1 (Conv2D)           (None, 40, 40, 128)       401536    
                                                                 
 max_pooling2d_1 (MaxPooling  (None, 20, 20, 128)      0         
 2D)                                                             
                                                                 
 conv2d_2 (Conv2D)           (None, 17, 17, 128)       26

# Building Distance Layer:

In [21]:
class L1Dist(Layer):
    
    def __init__(self,**kwargs):
        super().__init__()
        
    def call(self,input_embedding,validation_embedding):
        return tf.math.abs(input_embedding-validation_embedding)
    

# Constructing Siamese Model:

In [22]:
def siamese_model():
    
    inp_img=Input(shape=(100,100,3),name='input_layer')
    valid_img=Input(shape=(100,100,3),name='validation_layer')
    
    inp_emb=embedding_layer(inp_img)
    valid_emb=embedding_layer(valid_img)
    
    siamese_layer=L1Dist()
    siamese_layer._name='distance'
    
    distances=siamese_layer(inp_emb,valid_emb)
    
    d2=Dense(1,activation='sigmoid')(distances)
    
    return Model(inputs=[inp_img,valid_img],outputs=d2,name='siamese_layer')

In [23]:
siamese=siamese_model()

In [24]:
siamese.summary()

Model: "siamese_layer"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_layer (InputLayer)       [(None, 100, 100, 3  0           []                               
                                )]                                                                
                                                                                                  
 validation_layer (InputLayer)  [(None, 100, 100, 3  0           []                               
                                )]                                                                
                                                                                                  
 embedding (Functional)         (None, 4096)         38960448    ['input_layer[0][0]',            
                                                                  'validation_layer[0]

# Defining Loss function and Optimiser:

In [25]:
Binary_loss=tf.losses.BinaryCrossentropy()

In [26]:
opt=tf.keras.optimizers.Adam(1e-4)

# Defining Checkpoint functions:

In [27]:
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(opt=opt, siamese=siamese)

# Custom training:

In [28]:
@tf.function

def custom_training(batch):
    
    with tf.GradientTape() as tape:
        
        X=batch[:2]
        
        Y=batch[2]
        
        yhat=siamese(X,training=True)
        
        loss=Binary_loss(Y,yhat)
        
    gradient=tape.gradient(loss,siamese.trainable_variables)
        
    opt.apply_gradients(zip(gradient,siamese.trainable_variables))
        
    return loss        

In [29]:
from tensorflow.keras.metrics import Precision,Recall

def training(EPOCHS,data):
    
    for i in range(1,EPOCHS+1):
        print('\n Epoch {}/{}'.format(i, EPOCHS))
        progbar = tf.keras.utils.Progbar(len(data))
        for idx,batch in enumerate(data):
            recall=Recall()
            precision=Precision()
            loss=custom_training(batch)

            yhat=siamese.predict(batch[:2])

            recall.update_state(batch[2],yhat)
            precision.update_state(batch[2],yhat)

            progbar.update(idx+1)
        print(loss.numpy(), recall.result().numpy(), precision.result().numpy())
        
        if i%2==0:
            checkpoint.save(file_prefix=checkpoint_prefix)
        

In [30]:
training(10,train_data)


 Epoch 1/10


0.026633825 1.0 1.0

 Epoch 2/10


0.07911016 1.0 1.0

 Epoch 3/10


0.0030976455 1.0 1.0

 Epoch 4/10




0.03207208 1.0 1.0

 Epoch 5/10


0.00010588721 1.0 1.0

 Epoch 6/10


0.10697381 1.0 1.0

 Epoch 7/10


1.0967735e-05 1.0 1.0

 Epoch 8/10




0.0001253745 1.0 1.0

 Epoch 9/10


4.0616003e-05 1.0 1.0

 Epoch 10/10


0.00041822015 1.0 1.0


In [31]:
#Make predictions:
recall=Recall()
precision=Precision()

for test_anchor,test_input,y_true in test_data.as_numpy_iterator():
    yhat=siamese.predict([test_anchor,test_input])
    yhat=[1 if prediction > 0.5 else 0 for prediction in yhat]
    recall.update_state(y_true,yhat)
    precision.update_state(y_true,yhat)
print(recall.result().numpy(),precision.result().numpy())   

0.97962385 1.0


In [32]:
#Load the Model:

siamese.save('siamesemodel.h5')



In [33]:
siamese_model = tf.keras.models.load_model('siamesemodel.h5', 
                                   custom_objects={'L1Dist':L1Dist, 'BinaryCrossentropy':tf.losses.BinaryCrossentropy})



In [34]:
import numpy as np

In [35]:
def verification_image(model,verification_threshold,detection_threshold):
    output=[]
    for i in os.listdir(os.path.join("application_data","Verification_image")):
        input_img=preprocess(os.path.join("application_data","Input_image","input_image.jpg"))
        val_img=preprocess(os.path.join("application_data","Verification_image",i))
        y_pred=model.predict(list(np.expand_dims([input_img,val_img],axis=1)))
        output.append(y_pred)
        
    detection = np.sum(np.array(output) > detection_threshold)
    verification=detection/len(os.listdir(os.path.join("application_data","Verification_image")))
    
    verified=verification>verification_threshold
    return verified,output
        

In [38]:
capture=cv2.VideoCapture(0)

while capture.isOpened():
    
    _,frame=capture.read()
    
    frame=frame[120:120+250,200:200+250]
    cv2.imshow("frame",frame)
    
    if cv2.waitKey(1) & 0XFF == ord('s'):
        cv2.imwrite(os.path.join("application_data","Input_image","input_image.jpg"),frame)
        verified,output=verification_image(siamese,0.9,0.9)
        print(verified,output)
    if cv2.waitKey(1) & 0XFF == ord('q'):
        break

capture.release()
cv2.destroyAllWindows()    























True [array([[0.60447896]], dtype=float32), array([[0.6646003]], dtype=float32), array([[0.9549357]], dtype=float32), array([[0.98362535]], dtype=float32), array([[0.997218]], dtype=float32), array([[0.999747]], dtype=float32), array([[0.99985707]], dtype=float32), array([[0.9998511]], dtype=float32), array([[0.9766742]], dtype=float32), array([[0.98132104]], dtype=float32), array([[0.9284362]], dtype=float32), array([[0.9817104]], dtype=float32), array([[0.17752057]], dtype=float32), array([[0.6620202]], dtype=float32), array([[0.8335797]], dtype=float32), array([[0.86822915]], dtype=float32), array([[0.9289614]], dtype=float32), array([[0.89769685]], dtype=float32), array([[0.9405486]], dtype=float32), array([[0.83721876]], dtype=float32), array([[0.8592481]], dtype=float32), array([[0.75044346]], dtype=float32), array([[0.2246895]], dtype=float32), array([[0.47041252]], dtype=float32), array([[0.9694494]], dtype=float32), array([[0.7415047]], dtype=float32), array([[0.55652]], dtype























True [array([[0.99799925]], dtype=float32), array([[0.9972594]], dtype=float32), array([[0.9911739]], dtype=float32), array([[0.9988913]], dtype=float32), array([[0.99795157]], dtype=float32), array([[0.999537]], dtype=float32), array([[0.99984014]], dtype=float32), array([[0.9998723]], dtype=float32), array([[0.9967062]], dtype=float32), array([[0.9969344]], dtype=float32), array([[0.9982399]], dtype=float32), array([[0.99883467]], dtype=float32), array([[0.413275]], dtype=float32), array([[0.74284756]], dtype=float32), array([[0.9529829]], dtype=float32), array([[0.968393]], dtype=float32), array([[0.9825799]], dtype=float32), array([[0.9883294]], dtype=float32), array([[0.98016834]], dtype=float32), array([[0.99650335]], dtype=float32), array([[0.997962]], dtype=float32), array([[0.9984213]], dtype=float32), array([[0.8847925]], dtype=float32), array([[0.9742789]], dtype=float32), array([[0.9656333]], dtype=float32), array([[0.9962859]], dtype=float32), array([[0.9983393]], dtype=fl























True [array([[0.2695457]], dtype=float32), array([[0.3101689]], dtype=float32), array([[0.7538028]], dtype=float32), array([[0.9212134]], dtype=float32), array([[0.98085636]], dtype=float32), array([[0.9982418]], dtype=float32), array([[0.9990393]], dtype=float32), array([[0.9990138]], dtype=float32), array([[0.8615173]], dtype=float32), array([[0.88800925]], dtype=float32), array([[0.7314598]], dtype=float32), array([[0.91523314]], dtype=float32), array([[0.18681464]], dtype=float32), array([[0.56320715]], dtype=float32), array([[0.71741945]], dtype=float32), array([[0.64329827]], dtype=float32), array([[0.4324573]], dtype=float32), array([[0.34458554]], dtype=float32), array([[0.48658496]], dtype=float32), array([[0.3718712]], dtype=float32), array([[0.45194182]], dtype=float32), array([[0.29841813]], dtype=float32), array([[0.33774436]], dtype=float32), array([[0.5750157]], dtype=float32), array([[0.80031234]], dtype=float32), array([[0.48156893]], dtype=float32), array([[0.353725]]

In [None]:
verification_image(siamese,0.9,0.9)