# Facial Recognition

### Import Standard and Tensorflow Libraries

In [None]:
# Importing standard libraries
import cv2
import os
import random
import numpy as np
import matplotlib.pyplot as plt

# Importing Tensorflow dependencies
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten
import tensorflow as tf

### Set Memory Growth to aviod OOM Errors

In [None]:
# Avoid OOM errors by setting GPU Memory Growth

gpus = tf.config.experimental.list_physical_devices('GPU')
logical_gpus = tf.config.list_logical_devices('GPU')
for gpu in gpus:
    tf.config.set_memory_growth(gpu, True)

In [None]:
print(len(gpus), gpus)
print(len(logical_gpus), logical_gpus)
print(tf.test.is_gpu_available)

### Create Folder Structures

In [None]:
# Setup Paths (Directories)

ANC_PATH = os.path.join('data', 'anchor2')
POS_PATH = os.path.join('data', 'positive2')
NEG_PATH = os.path.join('data', 'negative')

In [None]:
# # Using List to create the directories -> Done Once
# list = [ANC_PATH, NEG_PATH, POS_PATH]
# for ele in list:
#     print(ele)
#     os.makedirs(ele)

### Untar LFW Dataset

In [None]:
# Uncompress the Tar GZ Labelled Faces in the Wild Dataset

# !tar -xf lfw.tgz

In [None]:
# Transfer the images in every sub-folder in the lfw folder to NEG_PATH

# for directory in os.listdir('lfw'):
#     for file in os.listdir(os.path.join('lfw', directory)):
#         EX_PATH = os.path.join('lfw', directory, file)
#         NEW_PATH = os.path.join(NEG_PATH, file)
#         os.replace(EX_PATH, NEW_PATH)

### Collect Positive and Anchor Classes

In [None]:
# We are going to make sure the images we collect from webcam are of dimension/resolution: 
# (250 px X 250 px, i.e., of same resolution as images in the LFW dataset)
# This makes the data processing step easier.

In [None]:
# Importing uuid to generate unique image names
import uuid

In [None]:
# Establish a connection to the Webcam -> To collect Anchor and Positive images
cap = cv2.VideoCapture(0)
while cap.isOpened():
    ret, frame = cap.read()
    
    frame = frame[120:120+250, 200:200+250, :] # Changed the dimension of captured frame to 250X250
    
    # Collect Anchors
    if cv2.waitKey(1) & 0XFF == ord('a'):
        imgname = os.path.join(ANC_PATH, '{}.jpg'.format(uuid.uuid1())) # Appends unique identifer to .jpg and joins to ANC_PATH
        cv2.imwrite(imgname, frame)
    
    # Collect Positives
    if cv2.waitKey(1) & 0XFF == ord('p'):
        imgname = os.path.join(POS_PATH, '{}.jpg'.format(uuid.uuid1()))
        cv2.imwrite(imgname, frame)
    
    # Show image back to screen
    cv2.imshow('Image Collector Window', frame)

    if cv2.waitKey(1) & 0XFF == ord('q'):
        break

# Release the Webcam
cap.release()

# Close the image show frame
cv2.destroyAllWindows()

### Geting Image Directories

In [None]:
anchor = tf.data.Dataset.list_files(ANC_PATH + '\*.jpg').shuffle(buffer_size = 1024).take(300)
positive = tf.data.Dataset.list_files(POS_PATH + '\*.jpg').shuffle(buffer_size = 1024).take(300)
negative = tf.data.Dataset.list_files(NEG_PATH + '\*.jpg').shuffle(buffer_size = 1024).take(300)

### Preprocessing - Scaling And Resizing

In [None]:
def preprocess(file_path):
    byte_img = tf.io.read_file(file_path)
    img = tf.io.decode_jpeg(byte_img)
    img = tf.image.resize(img, (100,100)) # Resizing from 255x255 to 100x100 (number of pixel values for an image)
    img = img / 255.0 # Rescaling from 0 to 255 to 0 to 1 (on image pixel values)
    return img

### Creating Labelled Datasets

In [None]:
# Using the tf.data.Dataset.from_tensor_slices to create a td.data.Dataset object having a sequence of tensor of 1's (each 1 is a 0-dimensional tensor (scalar value))
positives = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data = positives.concatenate(negatives)

### Build Train and Test Partition

In [None]:
def preprocess_twin(input_img, validation_img, label):
    return(preprocess(input_img), preprocess(validation_img), label)

In [None]:
# build DataLoader Pipeline
data = data.map(preprocess_twin) # Preprocessing all the images in data using map funcion
data = data.cache() # Caches dataset in memory -> does not need to read from disk multiple times (improves performance)
data = data.shuffle(buffer_size = 1024) # size of Buffer used to shuffle dataset -> larger buffer: more random shuffles

In [None]:
# Training Partition
train_data = data.take(round(len(data)*0.7))
train_data = train_data.batch(16) # data will be passed for training as batches of 16 images
train_data = train_data.prefetch(8) # This starts preprocessing next set of images so that we don't bottle-nexk out network
# Now iterating the training the training_data using as_numpy_iterator will give us a batch of 16 images rather than a single image when we use the next() function

In [None]:
# Testing Partition
test_data = data.skip(round(len(data)*0.7)) # This data was taken for training. Data cannot be repeated in training and testing partition
test_data = test_data.take(round(len(data)*0.3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

### Building an embedding Layer

In [None]:
def make_embedding():
    
    # Input Layer - Base Class
    inp = Input(shape=(100,100,3), name='Input_Image')
    
    # First Block
    c1 = Conv2D(64, (10,10), strides = (1,1), activation='relu')(inp)
    # There are 64 Filters/Kernels. Each Kernel is of size 10x10. Strides(How far filters move across image) = 1x1.
    # Activation function is ReLU. inp is passed to the layer which essentially connects the input layer to the c1 Convolution Layer
    m1 = MaxPooling2D(64, (2,2), padding='same')(c1)
    
    # Second Block
    c2 = Conv2D(128, (7, 7), activation='relu')(m1)
    m2 = MaxPooling2D(64, (2,2), padding='same')(c2)
    
    # Third Block
    c3 = Conv2D(128, (4,4), activation='relu')(m2)
    m3 = MaxPooling2D(64, (2,2), padding='same')(c3)
    
    # Final Block
    c4 = Conv2D(256, (4,4), activation='relu')(m3)
    f1 = Flatten()(c4)
    d1 = Dense(4096, activation='sigmoid')(f1)
    
    return Model(inputs=[inp], outputs=[d1], name = 'Embedding')

In [None]:
embedding = make_embedding()

In [None]:
embedding.summary()

### Building L1 Distance Layer

In [None]:
# Siamese L1 Distance class
class L1Dist(Layer):
    # Init Method - inheritance
    def __init__(self, **kwargs):
        super().__init__()

    # Similarity calculation
    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding)

### Make Siamese Model

In [None]:
def make_siamese_model():
    
    # 2 images are input into the network at once 
    
    # Anchor image input to the network
    input_image = Input(shape=(100,100,3), name = 'Input_img')
    
    # Validation image input to the network
    validation_image = Input(shape=(100,100,3), name = 'Validation_img')
    
    # Combine Siamese Distance Components
    siamese_layer = L1Dist() # Creating an instance of L1Dist class
    siamese_layer._name = 'distance'
    # Using the call function to calculate the distance between the feature vector of input_img and validation_img
    distances = siamese_layer(embedding(input_image), embedding(validation_image)) # Using the embedding model
    
    # Classification Layer
    classifier = Dense(1, activation = 'sigmoid')(distances)
    
    return Model(inputs=[input_image, validation_image], outputs=[classifier], name = 'SiameseNetwork')

In [None]:
# Using the function
Siamese_Model = make_siamese_model()

In [None]:
Siamese_Model.summary()

### Setup Loss Function and Optimizer

In [None]:
binary_cross_loss = tf.losses.BinaryCrossentropy()

In [None]:
opt = tf.keras.optimizers.Adam(learning_rate = 1e-4) # 0.0001

### Establish Checkpoints

In [None]:
checkpoint_dir = '\.training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(opt = opt, siamese_model = Siamese_Model)

### Build Train Step Function - will be used on 1 Batch of data

In [None]:
@tf.function
def train_step(batch):
    
    # Recording all of our operations in tape
    with tf.GradientTape() as tape:

        # Get anchor and positive/negative image
        X = batch[:2] 
        
        # Get label
        y = batch[2]
    
        # Forward Pass
        yhat = Siamese_Model(X, training=True)
        
        # Calculate Loss
        loss = binary_cross_loss(y, yhat)
    
    print(loss)
    
    # Calculate Gradients
    grad = tape.gradient(loss, Siamese_Model.trainable_variables)
    
    # Calculate updated weights and apply to Siamese Model
    opt.apply_gradients(zip(grad, Siamese_Model.trainable_variables))
    
    # Return loss
    return loss

### Build Training Loop

In [None]:
def train(data, EPOCHS):
    # Loop through each Epoch
    for epoch in range(1, EPOCHS+1):
        print('\nEpoch {}/{}'.format(epoch, EPOCHS))
        progbar = tf.keras.utils.Progbar(len(data))
        
        r = Recall()
        p = Precision()
        
        # Loop through each batch
        for idx, batch in enumerate(data):
            # Run Train step
            loss = train_step(batch)
            yhat = Siamese_Model.predict(batch[:2])
            r.update_state(batch[2], yhat)
            p.update_state(batch[2], yhat)
            progbar.update(idx+1)
        print(loss.numpy())
        # save Checkpoints
        if epoch%10 == 0:
            checkpoint.save(file_prefix = checkpoint_prefix)

### Train the Model

In [None]:
EPOCHS = 50

In [None]:
# train(train_data, EPOCHS) # This task done by Google Colab (T4 GPU Runtime) as the training process takes a long time using CPU

### Save Model

In [None]:
# Save Weights
# Siamese_Model.save('/content/gdrive/MyDrive/SiameseModel2.h5') # Model saved from Google Colab after training and then loaded in Jupyter Notebook to be able to access the Webcam easily

### Reload Model

In [None]:
# Reload Model
model = tf.keras.models.load_model('SiameseModel2.h5', custom_objects={'L1Dist': L1Dist, 'BinaryCrossentropy': tf.losses.BinaryCrossentropy})

In [None]:
model.summary()

# #6 Evaluate Model

### Import Metrics

In [None]:
# Importing metrtic calculations
from tensorflow.keras.metrics import Precision, Recall

In [None]:
test_input, test_vali, y_true = test_data.as_numpy_iterator().next()

### Make Predictions

In [None]:
yhat = model.predict([test_input, test_vali])
yhat # Predicted output

In [None]:
# Post Processing the Results
[1 if prediction > 0.5 else 0 for prediction in yhat]

### Calculate Metrics

In [None]:
# Creating metric object
r = Recall()
p = Precision()

# Calculating Recall Value
r.update_state(y_true, yhat)
# Calculating Precision Value
p.update_state(y_true, yhat)

print("This is for a Single Batch in the test_data dataset:")
# Return Recall Result
print("Recall: ", r.result().numpy())
# Return Precision Result
print("Precision: ", p.result().numpy())

### Visualize Results

In [None]:
# Set plot Size
plt.figure(figsize=(5,8))

# Creating figure with 1 row and 2 columns

# Selecting first Subplot
plt.subplot(1,2,1)
plt.imshow(test_input[2])

# Selecting second subplot
plt.subplot(1,2,2)
plt.imshow(test_vali[2])

# Renders cleanly
plt.show()

### Verification Function

In [None]:
def verify(model, detection_threshold, verification_threshold):
  # Build results array
    results = []
    for image in os.listdir(os.path.join('application_data', 'verification_images2')):
        input_img = preprocess(os.path.join('application_data', 'input_image', 'input_image.jpg'))
        validation_img = preprocess(os.path.join('application_data', 'verification_images2', image))

        # Make predictions
        result = model.predict(list(np.expand_dims([input_img, validation_img], axis = 1)))
        results.append(result)

    # Detection Threshold: Metric above which a prediction is considered positive
    detection = np.sum(np.array(results) > detection_threshold)

    # Verification Threshold: Proportion of positive predictions / total positive samples
    verification = detection/len(os.listdir(os.path.join('application_data', 'verification_images2')))
    verified = verification > verification_threshold # returns true or false

    return results, verified

### OpenCV Real Time Verification

In [None]:
cap = cv2.VideoCapture(0)
while cap.isOpened():
    ret, frame = cap.read()

    frame = frame[120:120+250, 200:200+250, :] # Changed the dimension of captured frame to 250X250

    cv2.imshow('Verification', frame)

    # Verification Trigger
    if cv2.waitKey(10) & 0xFF == ord('v'):
        # Save Input Image to application_data/input_image folder
        cv2.imwrite(os.path.join('application_data', 'input_image', 'input_image.jpg'), frame)

        # Run Verification
        results, verified = verify(model, 0.5, 0.5)
        print(verified)

    if cv2.waitKey(10) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()

In [None]:
np.sum(np.squeeze(results) > 0.5)