<a href="https://colab.research.google.com/github/hd9189/Facial_Recognition/blob/main/Facial_verification_with_SNS.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **1. Setup**

**1.1 Install Dependencies**

In [None]:
# !pip install tensorflow==2.4.1 tensorflow-gpu==2.4.1 opencv-python matplotlib

**Import Dependencies**

In [3]:
#import standard dependencies
import cv2
import os
import random
# good to work for arrays, good for tensorflow
import numpy as np
# can help visualize an image
from matplotlib import pyplot as plt

In [4]:
import tensorflow as tf

# import tensorflow dependencies
# usually people use the sequential tensorflow api, but functional tensorflow api is much more flexible in terms of hardcore processing
# building a siamanese nerual network. comparing two images and seeing if their the same
# passing 2 images at the same time, and finding the similarity between the two images
# importing functional api

# most imported = Model(Input=[input image, verification image], Output=[1,0])
from tensorflow.keras.models import Model
# lots of layer types for NN
# Layer=high level class (creating a custom layer)
# Conv2D (standard layer) Called a convoluntional layer usually for CNN, forming convolutions
# Dense, a fully connected layer
# MaxPooling2D, Pool layers together and shrinks info together, taking max value overa certain region
# Input, determines what you input into NN, input(shape=)
# Flatten, takes all info form a layer and flattens it into more simpler
# Import tensorflow dependencies - Functional API
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten

**Setup GPU Growth**

In [5]:
# Avoid out of memory errors, by setting GPU memory consumption Growth

# accessing all GPUs on machine
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    # setting memory growth 
    tf.config.experimental.set_memory_growth(gpu, True)

**Create Folder Structures**

In [6]:
# create 3 folders for data
# anchor (real time input), positive, and negative

# setup paths
POS_PATH = os.path.join('data', "positive")
NEG_PATH = os.path.join('data', 'negative')
ANC_PATH = os.path.join('data', 'anchor')

In [None]:
# making directories

# os.makedirs(POS_PATH)
# os.makedirs(NEG_PATH)
# os.makedirs(ANC_PATH)

# **2. Collect Positives and Anchors**

Notes:

Anchor and positive data will be collected through the webcam via opencv, while the negative data are going to be through a data set

A SNS basically has 2 comparision NN, where there is a distance function determining whether or not the images when comapred to the anchor look the same.

In Part two, we will be collecting the data

**Untar Labelled Faces in the Wild Dataset**

In [7]:
# # # Uncompress TAR GZ Labelled Faces in the Wild Dataset

# !tar -xf lfw.tgz


gzip: stdin: unexpected end of file
tar: Unexpected EOF in archive
tar: Unexpected EOF in archive
tar: Error is not recoverable: exiting now


In [8]:
# # Move LFW Images from folder to the following respository data/negative, done by redirecting the path

# # gives directory/folder name for every single folder in 'lfw' folder
# for directory in os.listdir('lfw'):
#   # loops through every image in every folder and get path
#   for file in os.listdir(os.path.join('lfw', directory)):
#     # manipulate directory
#     EX_PATH = os.path.join('lfw', directory, file)
#     NEW_PATH = os.path.join(NEG_PATH, file)
#     os.replace(EX_PATH, NEW_PATH)

**Collect Positive and Anchor Classes**

In [None]:
# Import uuid library to generate unique image names
import uuid

In [None]:
# # Establish a connection to the webcam
# cap = cv2.VideoCapture(0)
# while cap.isOpened(): 
#     ret, frame = cap.read()
   
#     # Cut down frame to 250x250px, (y,x)
#     frame = frame[200:200+250,600:600+250, :]
    
#     # Collect anchors when click a
#     if cv2.waitKey(1) & 0XFF == ord('a'):
#         # Create the unique file path and placing in folder
#         imgname = os.path.join(ANC_PATH, f'{uuid.uuid1()}.jpg')
#         # Write out anchor image
#         cv2.imwrite(imgname, frame)
    
#     # Collect positives when click p
#     if cv2.waitKey(1) & 0XFF == ord('p'):
#         # Create the unique file path 
#         imgname = os.path.join(POS_PATH, f'{uuid.uuid1()}.jpg')
#         # Write out positive image
#         cv2.imwrite(imgname, frame)
    
#     # Show image back to screen
#     cv2.imshow('Image Collection', frame)
    
#     # Breaking gracefully
#     if cv2.waitKey(1) & 0XFF == ord('q'):
#         break

# # good to help forcefully stop cv2 webcam, in case of freezing
# # Release the webcam
# cap.release()
# # Close the image show frame
# cv2.destroyAllWindows

In [None]:
# print frame for testing
# plt.imshow(frame)

# **3. Load and Preprocess Images**

** Get Image Directories**

In [None]:
# getting images within directory
# Basically get 300 files that have the PATH and the .jpg in the name
anchor = tf.data.Dataset.list_files(ANC_PATH+'/*.jpg').take(300)
positive = tf.data.Dataset.list_files(POS_PATH+'/*.jpg').take(300)
negative = tf.data.Dataset.list_files(NEG_PATH+'/*.jpg').take(300)

In [None]:
def preprocess(file_path):
    # Reading image from file path
    byte_img = tf.io.read_file(file_path)
    # loading image
    img = tf.io.decode_jpeg(byte_img)
    # preprocessing steps
    # resize image
    img = tf.image.resize(img, (100,100))
    # scaling image to be between 1 and 0
    img /= 255.0 
    return img

**Create Labelled Dataset**

(anchor, positive) => [1,1,1,1,1]
(anchor, negative) => [0,0,0,0,0]

In [None]:
# .zip allows to zip everything up and iterate everything at the same time

# creating a dataset using anchor, positive/negative, and an array of 1s or 0s that matches the shape of the anchor
# saying that if we compare the anchor to positive, we want it to be 1, if we compare anchor to negative, we want it to be 0
postives = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))

# joining positives and negatives dataset together
data = postives.concatenate(negatives)

In [None]:
samples = data.as_numpy_iterator()
exampple = samples.next()
exampple


(b'data/anchor/1ac735e4-a911-11ed-8669-f2189828324b.jpg',
 b'data/positive/14ddf5be-a911-11ed-8669-f2189828324b.jpg',
 1.0)

**Build, Train, and Test Partition**

In [None]:
def preprocess_twin(input_img, validation_img, label):
    # input_img -> anchor, validation_img -> positive/negative
    return(preprocess(input_img), preprocess(validation_img), label)



In [None]:
# Build Dataloader pipline
data = data.map(preprocess_twin) #applies function to the entire data set
data = data.cache()
data = data.shuffle(buffer_size=1024)

In [None]:
# Training partition
train_data = data.take(round(len(data)*.7))
# but 16 images in one group to train at one time
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

In [None]:
# Testing partition
test_data = data.skip(round(len(data)*.7))
test_data = test_data.take(round(len(data)*.3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

# **4. Model Creation**

**Embedding Layer (for NN to read image)**

In [None]:
inp = Input(shape=(100,100,3), name='input_image')
c1 = Conv2D(64, (10,10), activation='relu')(inp)
m1 = MaxPooling2D(64, (2,2), padding='same')(c1)
c2 = Conv2D(128, (7,7), activation='relu')(m1)
m2 = MaxPooling2D(64, (2,2), padding='same')(c2)
c3 = Conv2D(128, (4,4), activation='relu')(m2)
m3 = MaxPooling2D(64, (2,2), padding='same')(c3)
c4 = Conv2D(256, (4,4), activation='relu')(m3)
f1 = Flatten()(c4)
d1 = Dense(4096, activation='sigmoid')(f1)
mod = Model(inputs=[inp], outputs=[d1], name='embedding')

In [None]:
mod.summary()

Model: "embedding"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_image (InputLayer)     [(None, 100, 100, 3)]     0         
_________________________________________________________________
conv2d_16 (Conv2D)           (None, 91, 91, 64)        19264     
_________________________________________________________________
max_pooling2d_12 (MaxPooling (None, 46, 46, 64)        0         
_________________________________________________________________
conv2d_17 (Conv2D)           (None, 40, 40, 128)       401536    
_________________________________________________________________
max_pooling2d_13 (MaxPooling (None, 20, 20, 128)       0         
_________________________________________________________________
conv2d_18 (Conv2D)           (None, 17, 17, 128)       262272    
_________________________________________________________________
max_pooling2d_14 (MaxPooling (None, 9, 9, 128)         0 

In [None]:
# Used to take input of the image
def make_embedding(): 
    inp = Input(shape=(100,100,3), name='input_image')
    
    # First block
    # reads the image, taking it apart piece by piece by pixel size.
    # 64 10x10 parts
    c1 = Conv2D(64, (10,10), activation='relu')(inp) # passing inp into c1
    m1 = MaxPooling2D(64, (2,2), padding='same')(c1)
    
    # Second block
    c2 = Conv2D(128, (7,7), activation='relu')(m1)
    m2 = MaxPooling2D(64, (2,2), padding='same')(c2)
    
    # Third block
    c3 = Conv2D(128, (4,4), activation='relu')(m2)
    m3 = MaxPooling2D(64, (2,2), padding='same')(c3)
    
    # Final embedding block
    c4 = Conv2D(256, (4,4), activation='relu')(m3)
    f1 = Flatten()(c4)
    d1 = Dense(4096, activation='sigmoid')(f1)
    
    
    return Model(inputs=[inp], outputs=[d1], name='embedding')

In [None]:
embedding = make_embedding()


In [None]:
# SNN layer
embedding.summary()


Model: "embedding"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_image (InputLayer)     [(None, 100, 100, 3)]     0         
_________________________________________________________________
conv2d_20 (Conv2D)           (None, 91, 91, 64)        19264     
_________________________________________________________________
max_pooling2d_15 (MaxPooling (None, 46, 46, 64)        0         
_________________________________________________________________
conv2d_21 (Conv2D)           (None, 40, 40, 128)       401536    
_________________________________________________________________
max_pooling2d_16 (MaxPooling (None, 20, 20, 128)       0         
_________________________________________________________________
conv2d_22 (Conv2D)           (None, 17, 17, 128)       262272    
_________________________________________________________________
max_pooling2d_17 (MaxPooling (None, 9, 9, 128)         0 

**4.2 Distance Layer**

In [None]:
# Siamese L1 Distance class
# Sort of like a custom NN layer
class L1Dist(Layer):
    
    # Init method - inheritance
    def __init__(self, **kwargs):
        super().__init__()
       
    # Magic happens here - similarity calculation
    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding)

**Siamese Model**

In [None]:
input_image = Input(name='input_img', shape=(100,100,3))
validation_image = Input(name='validation_img', shape=(100,100,3))

In [None]:
inp_embedding = embedding(input_image)
val_embedding = embedding(validation_image)

In [None]:
siamese_layer = L1Dist()

In [None]:
distances = siamese_layer(inp_embedding, val_embedding)

In [None]:
classifier = Dense(1, activation='sigmoid')(distances)

In [None]:
siamese_network = Model(inputs=[input_image, validation_image], outputs=classifier, name='SiameseNetwork')

In [None]:
def make_siamese_model(): 
    
    # Anchor image input in the network
    input_image = Input(name='input_img', shape=(100,100,3))
    
    # Validation image in the network 
    validation_image = Input(name='validation_img', shape=(100,100,3))
    
    # Combine siamese distance components
    siamese_layer = L1Dist()
    siamese_layer._name = 'distance'
    distances = siamese_layer(embedding(input_image), embedding(validation_image))
    
    # Classification layer, final output, 1 or 0
    classifier = Dense(1, activation='sigmoid')(distances)
    
    # All the layers together
    return Model(inputs=[input_image, validation_image], outputs=classifier, name='SiameseNetwork')

In [None]:
siamese_model = make_siamese_model()

In [None]:
siamese_model.summary()

Model: "SiameseNetwork"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_img (InputLayer)          [(None, 100, 100, 3) 0                                            
__________________________________________________________________________________________________
validation_img (InputLayer)     [(None, 100, 100, 3) 0                                            
__________________________________________________________________________________________________
embedding (Functional)          (None, 4096)         38960448    input_img[0][0]                  
                                                                 validation_img[0][0]             
__________________________________________________________________________________________________
distance (L1Dist)               (None, 4096)         0           embedding[2][0]     

# **5. Training**

Basic flow for training on one batch:
1. make prediction
2. Calculate loss
3. Derive gradeints
4. Calculate new weights and apply

**Setting up Loss and Optimizer**

In [None]:
binary_cross_loss = tf.losses.BinaryCrossentropy()

In [None]:
# learning rate of 1e-4 --> 0.0001
opt = tf.keras.optimizers.Adam(1e-4)

**Establish Checkpoints**

If something goes wrong during training, we can go back to these checkpoints

Note to self: To reload from the checkpoint, you can use model.load('path_to_checkpoint').
this will load the pre trained weights into existing model

In [None]:
# create directory for training checkpoints
checkpoint_dir = './training_checkpoints'

# creastion checkpoint prefix, all start with ckpt
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')

checkpoint = tf.train.Checkpoint(opt=opt, siamese_model=siamese_model)


**Build, Train, Step Function**

Each batch in the dataset is comprised of 16 samples, each of which contians an anchor image, a pos or neg image, and a label


In [None]:
# decorator to complie a function into a callable tf graph
# make the entire NN into a graph to train more efficiently
@tf.function
def train_step(batch):

    # record operations for automatic differentiation
    with tf.GradientTape() as tape:

        # Get anchor and pos/neg image (features)
        X = batch[:2]

        # Get label
        y = batch[2]

        # Forward pass
        # NOTE: important to have traning=True as some layers only activate when this happens
        ypred = siamese_model(X, training=True)
        # Calculate loss, passing through y=true, ypred=predicted outcome
        loss = binary_cross_loss(y ,ypred)
    print(loss)
    # Calculate gradients
    grad = tape.gradient(loss, siamese_model.trainable_variables)

    #Calculate updated wights and apply to siamese model
    opt.apply_gradients(zip(grad, siamese_model.trainable_variables))

    return loss

**Training Loop**

In [None]:
# NOTE: train_step function focused on training one batch, and this loop will be used to iterate through every batch in the dataset
def train(data, EPOCHS):
    # Loop through epochs
    for epoch in range(1, EPOCHS+1):

        # indicate what epoch at
        print(f'\n Epoch {epoch}/{EPOCHS}')

        # defining progress bar to show it during training
        progbar = tf.keras.utils.Progbar(len(data))
    # Loop through each batch
        for index, batch in enumerate(data):
        # Run train step
            train_step(batch)
            progbar.update(index+1)

    # Save checkpoints
    if epoch % 10 ==0:
        checkpoint.save(file_prefix=checkpoint_prefix)

**Train Model**

In [None]:
# number of training datasets
EPOCHS = 50

In [None]:
train(train_data, EPOCHS)


 Epoch 1/50
Tensor("binary_crossentropy/weighted_loss/value:0", shape=(), dtype=float32)
Tensor("binary_crossentropy/weighted_loss/value:0", shape=(), dtype=float32)

 Epoch 2/50

 Epoch 3/50

 Epoch 4/50

 Epoch 5/50

 Epoch 6/50

 Epoch 7/50

 Epoch 8/50

KeyboardInterrupt: ignored