<a href="https://colab.research.google.com/github/aditya02shah/FaceRecognition/blob/main/FaceRecognitionAugmented.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

##Importing Libraries

In [None]:
#Standard Dependancies
from matplotlib import pyplot as plt
import cv2
import pandas as pd
import os
import random
import numpy as np
import tensorflow as tf

In [None]:
##Tensorflow dependencies-Functional API
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense,Layer,Conv2D,MaxPooling2D,Input,Flatten
import tensorflow as tf

In [None]:
POS_PATH=os.path.join('data','positive')
NEG_PATH=os.path.join('data','negative')
ANC_PATH=os.path.join('data','anchor')

In [None]:
os.makedirs(POS_PATH)
os.makedirs(NEG_PATH)
os.makedirs(ANC_PATH)

##Collecting Negatives

In [None]:
!wget "http://vis-www.cs.umass.edu/lfw/lfw.tgz"

--2023-04-21 03:28:08--  http://vis-www.cs.umass.edu/lfw/lfw.tgz
Resolving vis-www.cs.umass.edu (vis-www.cs.umass.edu)... 128.119.244.95
Connecting to vis-www.cs.umass.edu (vis-www.cs.umass.edu)|128.119.244.95|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 180566744 (172M) [application/x-gzip]
Saving to: ‘lfw.tgz’


2023-04-21 03:28:12 (52.3 MB/s) - ‘lfw.tgz’ saved [180566744/180566744]



In [None]:
##Uncompress Tar GZ Labelled Faces in the wild Dataset
!tar -xf lfw.tgz

In [None]:
#Moving lfw images to data/negative
for directory in os.listdir('lfw'):
  for filename in os.listdir(os.path.join('lfw',directory)):
    CUR_PATH=os.path.join('lfw',directory,filename)
    NEW_PATH=os.path.join(NEG_PATH,filename)
    os.replace(CUR_PATH,NEW_PATH)

##Collect Positives and Anchors

In [None]:
import uuid

In [None]:
#Establish a connection to the webcam
cap=cv2.VideoCapture(3)
while cap.isOpened():
  ret,frame=cap.read()

  frame=frame[120:120+250,200:200+250,:]
  #Show image back to screen
  cv2.imshow("Image Collection",frame)

  #Collecting Anchors
  if cv2.waitKey(1) & 0XFF == ord('a'):
    imgname=os.path.join(ANC_PATH,'{}.jpg'.format(uuid.uuid1()))
    cv2.imwrite(imgname,frame)
  #Collecting Positives
  if cv2.waitKey(1) & 0XFF == ord('p'):
    imgname=os.path.join(POS_PATH,'{}.jpg'.format(uuid.uuid1()))
    cv2.imwrite(imgname,frame)

  #Breaking gracefull
  if cv2.waitKey(1) & 0XFF == ord('q'):
    break

#Release the webcam
cap.release()
#Close the image show frame
cv2.destroyAllWindows()

In [None]:
cap.read??

In [None]:
from google.colab import drive
drive.mount('/content/gdrive/')

Mounted at /content/gdrive/


In [None]:
!unzip '/content/gdrive/MyDrive/ML/data.zip'

##Data Augmentation

In [None]:
def data_aug(img):
    data = []
    for i in range(9):
        img = tf.image.stateless_random_brightness(img, max_delta=0.02, seed=(1,2))
        img = tf.image.stateless_random_contrast(img, lower=0.6, upper=1, seed=(1,3))
        # img = tf.image.stateless_random_crop(img, size=(20,20,3), seed=(1,2))
        img = tf.image.stateless_random_flip_left_right(img, seed=(np.random.randint(100),np.random.randint(100)))
        img = tf.image.stateless_random_jpeg_quality(img, min_jpeg_quality=90, max_jpeg_quality=100, seed=(np.random.randint(100),np.random.randint(100)))
        img = tf.image.stateless_random_saturation(img, lower=0.9,upper=1, seed=(np.random.randint(100),np.random.randint(100)))

        data.append(img)

    return data

In [None]:
for file_name in os.listdir(os.path.join(POS_PATH)):
    img_path = os.path.join(POS_PATH, file_name)
    img = cv2.imread(img_path)
    augmented_images = data_aug(img)

    for image in augmented_images:
        cv2.imwrite(os.path.join(POS_PATH, '{}.jpg'.format(uuid.uuid1())), image.numpy())

In [None]:
for file_name in os.listdir(os.path.join(ANC_PATH)):
    img_path = os.path.join(ANC_PATH, file_name)
    img = cv2.imread(img_path)
    augmented_images = data_aug(img)

    for image in augmented_images:
        cv2.imwrite(os.path.join(ANC_PATH, '{}.jpg'.format(uuid.uuid1())), image.numpy())

In [None]:
len(os.listdir(os.path.join('/content','data','anchor'))),len(os.listdir(os.path.join('/content','data','positive'))),len(os.listdir(os.path.join('/content','data','negative')))
#/content/data/anchor

(4140, 3160, 13233)

##Pre-processing Data

In [None]:
anchor=tf.data.Dataset.list_files('/content/data/anchor/'+'*.jpg').take(3000)
positive=tf.data.Dataset.list_files('/content/data/positive/'+'*.jpg').take(3000)
negative=tf.data.Dataset.list_files('/content/data/negative/'+'*.jpg').take(3000)

In [None]:
dir_test=anchor.as_numpy_iterator()
dir_test.next()

b'/content/data/anchor/a690414c-e9d0-11ed-970e-0242ac1c000c.jpg'

In [None]:
def preprocess(file_path):
  #Read in image from file path
  byte_img=tf.io.read_file(file_path)
  #Load in the image
  img=tf.io.decode_jpeg(byte_img)
  img=tf.image.resize(img,(100,100))
  img=img/255.0
  return img

##Create Labelled Dataset

In [None]:
positives=tf.data.Dataset.zip((anchor,positive,tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
##tensor_slices converts tf.ones(len(anchor)) into same datatype as anchor and positive(tf.data.Dataset files)
negatives=tf.data.Dataset.zip((anchor,negative,tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data=positives.concatenate(negatives)

In [None]:
data

<_ConcatenateDataset element_spec=(TensorSpec(shape=(), dtype=tf.string, name=None), TensorSpec(shape=(), dtype=tf.string, name=None), TensorSpec(shape=(), dtype=tf.float32, name=None))>

In [None]:
samples=data.as_numpy_iterator()

In [None]:
example=samples.next()

In [None]:
example

(b'/content/data/anchor/ab0900ba-e9d0-11ed-970e-0242ac1c000c.jpg',
 b'/content/data/positive/8afc4098-e9d0-11ed-970e-0242ac1c000c.jpg',
 1.0)

##Train and Test Partition

In [None]:
def preprocess_twin(input_img,validation_img,label):
  return (preprocess(input_img),preprocess(validation_img),label)

In [None]:
# '*' is used to unpack the values
res=preprocess_twin(*example)

In [None]:
res[0]

<tf.Tensor: shape=(100, 100, 3), dtype=float32, numpy=
array([[[0.65612745, 0.7463235 , 0.714951  ],
        [0.66838235, 0.7507353 , 0.7232843 ],
        [0.68063724, 0.76004905, 0.7267157 ],
        ...,
        [0.66519606, 0.75147057, 0.7436274 ],
        [0.6514706 , 0.7348039 , 0.7357843 ],
        [0.6607843 , 0.74313724, 0.7470588 ]],

       [[0.65294117, 0.7352941 , 0.70980394],
        [0.6698529 , 0.75171566, 0.7245098 ],
        [0.67745095, 0.7534314 , 0.7291667 ],
        ...,
        [0.6970588 , 0.78063726, 0.7759804 ],
        [0.6897059 , 0.76887256, 0.7708333 ],
        [0.6872549 , 0.7637255 , 0.76862746]],

       [[0.66568625, 0.74215686, 0.7254902 ],
        [0.6637255 , 0.73970586, 0.7198529 ],
        [0.675     , 0.74509805, 0.72156864],
        ...,
        [0.6620098 , 0.7375    , 0.73063725],
        [0.685049  , 0.76053923, 0.7536765 ],
        [0.68921566, 0.7627451 , 0.75686276]],

       ...,

       [[0.5882353 , 0.6607843 , 0.60784316],
        [0.59

In [None]:
plt.imshow(res[0])

##Data Pipeline

In [None]:
data=data.map(preprocess_twin)
data=data.cache()
data=data.shuffle(buffer_size=10000)

In [None]:
data

<_ShuffleDataset element_spec=(TensorSpec(shape=(100, 100, None), dtype=tf.float32, name=None), TensorSpec(shape=(100, 100, None), dtype=tf.float32, name=None), TensorSpec(shape=(), dtype=tf.float32, name=None))>

In [None]:
ele=data.as_numpy_iterator()

In [None]:
sample=ele.next()

In [None]:
plt.imshow(sample[1])

In [None]:
sample[2]

1.0

##Training and Test Partition

In [None]:
# Training partition
train_data = data.take(round(len(data)*.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

In [None]:
test_data = data.skip(round(len(data)*.7))
test_data = test_data.take(round(len(data)*.3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

In [None]:
len(train_data),len(test_data)

(263, 113)

##Building Siamese Neural Network

####Embedding Layer

In [None]:
def make_embedding():
  inp=Input(shape=(100,100,3),name='input_image')

  #First Block
  c1=Conv2D(64,(10,10),activation='relu')(inp)
  m1=MaxPooling2D(64,(2,2),padding='same')(c1)

  #Second Block
  c2=Conv2D(128,(7,7),activation='relu')(m1)
  m2=MaxPooling2D(64,(2,2),padding='same')(c2)

  #Third Block
  c3=Conv2D(128,(4,4),activation='relu')(m2)
  m3=MaxPooling2D(64,(2,2),padding='same')(c3)

  #Final Embedding Block
  c4=Conv2D(256,(4,4),activation='relu')(m3)
  f1=Flatten()(c4)
  d1=Dense(4096,activation='sigmoid')(f1)

  return Model(inputs=[inp],outputs=[d1],name='embedding')

In [None]:
embedding=make_embedding()

In [None]:
embedding.summary()

Model: "embedding"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_image (InputLayer)    [(None, 100, 100, 3)]     0         
                                                                 
 conv2d (Conv2D)             (None, 91, 91, 64)        19264     
                                                                 
 max_pooling2d (MaxPooling2D  (None, 46, 46, 64)       0         
 )                                                               
                                                                 
 conv2d_1 (Conv2D)           (None, 40, 40, 128)       401536    
                                                                 
 max_pooling2d_1 (MaxPooling  (None, 20, 20, 128)      0         
 2D)                                                             
                                                                 
 conv2d_2 (Conv2D)           (None, 17, 17, 128)       26

####Distance Layer

In [None]:
class L1Dist(Layer):
  def __init__(self,**kwargs):
    super().__init__()

  def call(self,input_embedding,validation_embedding):
    return tf.math.abs(input_embedding - validation_embedding)

In [None]:
L1Dist=L1Dist()

####Siamese Model

In [None]:
def make_siamese_model():
  #Handle input
  input_image=Input(name='input_img',shape=(100,100,3))
  validation_image=Input(name='validation_img',shape=(100,100,3))

  #Building siamese network components
  siamese_layer=L1Dist
  siamese_layer._name='distance'
  distances=siamese_layer(embedding(input_image),embedding(validation_image))

  #Classification Layer
  classifier=Dense(1,activation='sigmoid')(distances)

  return Model(inputs=[input_image,validation_image],outputs=classifier,name='SiameseNetwork')


In [None]:
siamese_model=make_siamese_model()

In [None]:
siamese_model.summary()

Model: "SiameseNetwork"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_img (InputLayer)         [(None, 100, 100, 3  0           []                               
                                )]                                                                
                                                                                                  
 validation_img (InputLayer)    [(None, 100, 100, 3  0           []                               
                                )]                                                                
                                                                                                  
 embedding (Functional)         (None, 4096)         38960448    ['input_img[0][0]',              
                                                                  'validation_img[0][

##Training

In [None]:
##Loss and Optimizer
binary_cross_loss=tf.losses.BinaryCrossentropy()
opt=tf.keras.optimizers.Adam(1e-4)

In [None]:
#Checkpoints
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(opt=opt, siamese_model=siamese_model)

In [None]:
@tf.function
def train_step(batch):
  '''
  This function focuses on training a singlr batch
  '''
  with tf.GradientTape() as tape:
    #Get anchor and positive/negative images
    X=batch[:2]
    #Get label
    Y=batch[2]
    #Forward pass
    yhat=siamese_model(X,training=True)
    #Calculate loss
    loss=binary_cross_loss(Y,yhat)
  print(loss)
  #Calculate Gradients
  grad=tape.gradient(loss,siamese_model.trainable_variables)

  #Calculate updated weights and apply to siamese model
  opt.apply_gradients(zip(grad,siamese_model.trainable_variables))
  return loss

In [None]:
# Import metric calculations
from tensorflow.keras.metrics import Precision, Recall

In [None]:
def train(data, EPOCHS):
    # Loop through epochs
    for epoch in range(1, EPOCHS+1):
        print('\n Epoch {}/{}'.format(epoch, EPOCHS))
        progbar = tf.keras.utils.Progbar(len(data))

        # Creating a metric object
        r = Recall()
        p = Precision()

        # Loop through each batch
        for idx, batch in enumerate(data):
            # Run train step here
            loss = train_step(batch)
            yhat = siamese_model.predict(batch[:2])
            r.update_state(batch[2], yhat)
            p.update_state(batch[2], yhat)
            progbar.update(idx+1)
        print(loss.numpy(), r.result().numpy(), p.result().numpy())

        # Save checkpoints
        if epoch % 10 == 0:
            checkpoint.save(file_prefix=checkpoint_prefix)

In [None]:
EPOCHS=5

In [None]:
train(train_data,EPOCHS)

##Evaluate Model

In [None]:
from tensorflow.keras.metrics import Precision,Recall

In [None]:
#Getting a batch of test data
test_input,test_val,y_true=test_data.as_numpy_iterator().next()

In [None]:
#Making Predictions
yhat=siamese_model.predict([test_input,test_val])



In [None]:
yhat

array([[8.3647328e-06],
       [1.0000000e+00],
       [2.1215435e-05],
       [9.9502891e-01],
       [1.9056566e-08],
       [9.9984276e-01],
       [2.0317341e-06],
       [9.9986219e-01],
       [5.9283258e-07],
       [3.0470394e-05],
       [5.0404535e-05],
       [6.8875946e-05],
       [1.0000000e+00],
       [1.0000000e+00],
       [9.9999189e-01],
       [9.9999857e-01]], dtype=float32)

In [None]:
#Post-Processing the results
[1 if prediction>0.5 else 0 for prediction in yhat]

[0, 1, 0, 1, 0, 1, 0, 1, 0, 0, 0, 0, 1, 1, 1, 1]

In [None]:
y_true

array([0., 1., 0., 1., 0., 1., 0., 1., 0., 0., 0., 0., 1., 1., 1., 1.],
      dtype=float32)

In [None]:
#Recall-Measures how many actual positives are correctly predicted by model

#Creating a Metric Object
m=Recall()
#Calculating Recall Value
m.update_state(y_true,yhat)
#Return Recall Result
m.result().numpy()

1.0

In [None]:
#Precision-Measures how many of the predicted positive instances are actually positive

p=Precision()
p.update_state(y_true,yhat)
p.result().numpy()

1.0

In [None]:
r = Recall()
p = Precision()

for test_input, test_val, y_true in test_data.as_numpy_iterator():
    yhat = siamese_model.predict([test_input, test_val])
    r.update_state(y_true, yhat)
    p.update_state(y_true,yhat)

print(r.result().numpy(), p.result().numpy())

####Visualise Results

In [None]:
plt.figure(figsize=(10,10))
plt.subplot(1,2,1)
plt.imshow(test_input[0])
plt.subplot(1,2,2)
plt.imshow(test_val[0])

##Save Model

In [None]:
siamese_model.save('siamese_model.h5')



In [None]:
#Loading Model
siamese_model = tf.keras.models.load_model('/content/siamese_model.h5',
                                   custom_objects={'L1Dist':L1Dist, 'BinaryCrossentropy':tf.losses.BinaryCrossentropy})



##Real Time Test

In [None]:
def verify(model,detection_threshold,verification_threshold):
  '''
  Detection Threhold-Metric above which image is considered positive
  Verification Threhold-Proportion of positive predictions/total positive samples
  '''
  results=[]
  for image in os.listdir(os.path.join('application_data','verification_images')):
    input_img=preprocess(os.path.join('application_data','input_images','input_image.jpg'))
    validation_img=preprocess(os.path.join('application_data','verification_images',image))

    result=model.predict(list(np.expand_dims([input_img,verification_threshold],axis=1)))
    results.append(result)

  detection=np.sum(np.array(results)>detection_threshold)
  verification=detection/len(os.listdir(os.path.join('application_data','verification_images')))
  verified=verification>verification_threshold
  return verified

In [None]:
cap = cv2.VideoCapture(4)
while cap.isOpened():
    ret, frame = cap.read()
    frame = frame[120:120+250,200:200+250, :]

    cv2.imshow('Verification', frame)

    # Verification trigger
    if cv2.waitKey(10) & 0xFF == ord('v'):
        cv2.imwrite(os.path.join('application_data', 'input_image', 'input_image.jpg'), frame)
        # Run verification
        results, verified = verify(siamese_model, 0.9, 0.7)
        print(verified)

    if cv2.waitKey(10) & 0xFF == ord('q'):
        break
cap.release()
cv2.destroyAllWindows()