<a href="https://colab.research.google.com/github/Nnamaka/ML_specializations/blob/main/Computer_vision/face_Detection/faceRecognition.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Install Dependencies

In [None]:
!pip install tensorflow==2.4.1 tensorflow-gpu==2.4.1 opencv-python matplotlib
!pip install wget

##Import Dependencies

In [None]:
import cv2
import os
import random
import numpy as np
from matplotlib import pyplot as plt

In [None]:
# Import tensorflow dependencies - Functional API
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten
import tensorflow as tf

##Create Folder Structures


In [None]:
POS_PATH = os.path.join('data', 'positive')
NEG_PATH = os.path.join('data', 'negative')
ANC_PATH = os.path.join('data', 'anchor')

In [None]:
# make the directories
os.makedirs(POS_PATH)
os.makedirs(NEG_PATH)
os.makedirs(ANC_PATH)

#Collect Positive and Anchors.   
And preprocess them

  
My positive and anchor images were stored in my Google Drive.

In [None]:
from google.colab import drive
drive.mount('/content/drive/')

In [None]:
!cp "/content/drive/MyDrive/TFOD images/face.tar.gz" /content && cp "/content/drive/MyDrive/TFOD images/anchor.tar.gz" /content

uncompressed data file of positive image

In [None]:
!tar -xzf face.tar.gz && tar -xzf anchor.tar.gz

resize and store positive and anchor images in data folder, in their appropriate folders.

In [None]:
width = 250
height = 250
dim = (width, height)


In [None]:
anch_pos = ["/content/Anchor/","/content/Myimages"]

for path in anch_pos:
  for filename in os.listdir(path):
    if filename.endswith('.jpg'):
      try:
        if 'Anchor' in path:
          img = cv2.imread("/content/Anchor/" + filename, cv2.IMREAD_UNCHANGED)
          new_img = cv2.resize(img, dim, interpolation = cv2.INTER_AREA)
          cv2.imwrite(ANC_PATH + '/' + filename, new_img)

        if 'Myimages' in path:
          img = cv2.imread("/content/Myimages/" + filename, cv2.IMREAD_UNCHANGED)
          new_img = cv2.resize(img, dim, interpolation = cv2.INTER_AREA)
          cv2.imwrite(POS_PATH + '/' + filename, new_img)
      except:
        pass
      

Augument all positive and anchor images to increase data size and possible expose the model to newer samples

In [None]:
def augment(img):
  data = []
  for i in range(15):
    img = tf.image.stateless_random_brightness(img, max_delta=0.02, seed=(1,2))
    img = tf.image.stateless_random_contrast(img, lower=0.6, upper=1, seed=(1,3))
    img = tf.image.stateless_random_flip_left_right(img, seed=(np.random.randint(100), np.random.randint(100)))
    img = tf.image.stateless_random_jpeg_quality(img, min_jpeg_quality=90, max_jpeg_quality=100, seed=(np.random.randint(100),np.random.randint(100)))
    img = tf.image.stateless_random_saturation(img, lower=0.9,upper=1, seed=(np.random.randint(100),np.random.randint(100)))

    data.append(img)

  return data

In [None]:
import os
import uuid

In [None]:
paths = [POS_PATH, ANC_PATH]

for path in paths:
  print("inside " + path)
  for file_name in os.listdir(os.path.join(path)):
    img_path = os.path.join(path, file_name)
    img = cv2.imread(img_path)
    augmented_images = augment(img) 
    
    for image in augmented_images:
        cv2.imwrite(os.path.join(path, '{}.jpg'.format(uuid.uuid1())), image.numpy())

check the no of positive and anchor images

In [None]:
for path in paths:
  print("no of images in " + path + " : ", end="")
  a = 0
  for image in os.listdir(os.path.join(path)):
    if image.endswith('.jpg'):
      a = a + 1
  print(a)

#Collect Negative Images

In [None]:
!wget -O wildImages.tar http://vis-www.cs.umass.edu/lfw/lfw.tgz

###Untar Labelled Faces in the wild Dataset



In [None]:
!tar -xzf wildImages.tar

In [None]:
# move ifw images to the negative folder image directory
# a = 0
# no_of_neg_img = 450
for directory in os.listdir('lfw'):
  for file in os.listdir(os.path.join('lfw', directory)):
    # a = a + 1;

    # if ( a == no_of_neg_img):
    #   break

    old_path = os.path.join('lfw', directory, file)
    new_path = os.path.join(NEG_PATH, file)
    os.replace(old_path, new_path)

#Load and Preprocess Images

##Get image directories

In [None]:
anchor = tf.data.Dataset.list_files(ANC_PATH+'/*.jpg').take(450)
positive = tf.data.Dataset.list_files(POS_PATH+'/*.jpg').take(450)
negative = tf.data.Dataset.list_files(NEG_PATH+'/*.jpg').take(450)


In [None]:
dir_test = anchor.as_numpy_iterator()

In [None]:
print(dir_test.next())

#Preprocessing - Scale and Resize

In [None]:
def preprocess(file_Path):

  byte_img = tf.io.read_file(file_Path)

  img = tf.io.decode_jpeg(byte_img)

  img = tf.image.resize(img, (100, 100))
  img = img / 255.0

  return img

In [None]:
img = preprocess('data/anchor/IMG_20220920_152033_954.jpg')


In [None]:
img.numpy().max() 


#Create Labelled Dataset

In [None]:
positives = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data = positives.concatenate(negatives)


In [None]:
samples = data.as_numpy_iterator()

In [None]:
example = samples.next()

In [None]:
example

#Build Train and Test Partition

In [None]:
def preprocess_twin(input_img, validation_img, label):
  return( preprocess(input_img), preprocess(validation_img), label)

In [None]:
res = preprocess_twin(*example)

In [None]:
plt.imshow(res[1])

In [None]:
res[2]

Build dataloader and split data into train and test partition

In [None]:
data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size=1000)

In [None]:
# train partition
train_data = data.take(round(len(data)*.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

In [None]:
# test partition
test_data = data.skip(round(len(data)*.7))
test_data = test_data.take(round(len(data)*.3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

#Model Engineering

##Build Embedding Layer   
create a function that creates our embedding layer

In [None]:
def make_embedding():
  inp = Input(shape=(100, 100,3), name='input_image')

  # block 1
  c1 = Conv2D(64, (10,10), activation='relu')(inp)
  m1 = MaxPooling2D(64, (2,2), padding='same')(c1)

  # block 2
  c2 = Conv2D(128, (7,7), activation='relu')(m1)
  m2 = MaxPooling2D(64, (2,2), padding='same')(c2)

  # block 3
  c3 = Conv2D(128, (4,4), activation='relu')(m2)
  m3 = MaxPooling2D(64, (2,2), padding='same')(c3)

  # block 4
  c4 = Conv2D(128, (4,4), activation='relu')(m3)
  f1 = Flatten()(c4)
  d1 = Dense(4096, activation='sigmoid')(f1)

  return Model(inputs=[inp], outputs=[d1], name='embedding')

In [None]:
embedding = make_embedding()

In [None]:
embedding.summary()

###Build Distance layer

In [None]:
# siamese L1 Distance class
class L1Dist(Layer):

  def __init_(self, **kwargs):
    super().__inti__()

  def call(self, input_embedding, validation_embedding):
    return tf.math.abs(input_embedding - validation_embedding)

In [None]:
l1 = L1Dist()

##Make Siamese Model

In [None]:
def make_siamese_model():

  input_image = Input(name='input_img', shape=(100, 100, 3))

  # validation image
  validation_image = Input(name='validation_ing', shape=(100, 100, 3))

  # combine siamese distance components
  siamese_layer = L1Dist()
  siamese_layer._name = 'distance'
  distances = siamese_layer(embedding(input_image), embedding(validation_image))

  # classification layer
  classifier = Dense(1, activation='sigmoid')(distances)

  return Model(inputs=[input_image, validation_image], outputs=classifier, name='SiameseNetwork')


In [None]:
siamese_model = make_siamese_model()

In [None]:
siamese_model.summary()

#Training

In [None]:
binary_cross_loss = tf.losses.BinaryCrossentropy()


In [None]:
opt = tf.keras.optimizers.Adam(1e-4)

##Establish Checkpoints

In [None]:
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(opt=opt, siamese_model=siamese_model)


Build train step function

In [None]:
@tf.function
def train_step(batch):

  # record all of the operations
  with tf.GradientTape() as tape:

    X = batch[:2]
    # get label
    Y = batch[2]

    # forward pass
    yhat = siamese_model(X, training=True)
    # calculate loss
    loss = binary_cross_loss(Y, yhat)

  print(loss)

  # calculate gradients
  grad = tape.gradient(loss, siamese_model.trainable_variables)

  # calculate updated weights and apply to siamese model
  opt.apply_gradients(zip(grad, siamese_model.trainable_variables))

  # return loss
  return loss


build training loop

In [None]:
# import metric calculations
from tensorflow.keras.metrics import Precision, Recall

In [None]:
def train(data, EPOCHS):
  for epoch in range(1, EPOCHS+1):
    print('\n Epoch {}/{}'.format(epoch, EPOCHS))
    progbar = tf.keras.utils.Progbar(len(data))

    # initializing metric objects
    r = Recall()
    p = Precision()

    # go through each batch
    for idx, batch in enumerate(data):

      # train step
      loss = train_step(batch)
      yhat = siamese_model.predict(batch[:2])
      r.update_state(batch[2], yhat)
      p.update_state(batch[2], yhat)
      progbar.update(idx+1)

      print(loss.numpy(), r.result().numpy(), p.result().numpy())

      # save checkpoints
      if epoch % 10 == 0:
        checkpoint.save(file_prefix=checkpoint_prefix)


##Train the model

In [None]:
EPOCHS = 50

In [None]:
train(train_data, EPOCHS)

Evaluate Model

import metrics

In [None]:
# import metric calculations
from tensorflow.keras.metrics import Precision, Recall

make predictions

In [None]:
# Get a batch of test data
test_input, test_val, y_true = test_data.as_numpy_iterator().next()


In [None]:
y_hat = siamese_model.predict([test_input, test_val])

In [None]:
# post processing the results
[1 if prediction > 0.5 else 0 for prediction in y_hat]

In [None]:
y_true

#Calculate Metrics

In [None]:
# creating a metric object
m = Recall()

# calculating the recall value
m.update_state(y_true, y_hat)

# return recall result
m.result().numpy()

In [None]:
# creating a metric object
m = Precision()

# calculating the recall value
m.update_state(y_true, y_hat)

# return recall result
m.result().numpy()

In [None]:
r = Recall()
p = Precision()

for test_input, test_val, y_true in test_data.as_numpy_iterator():
  yhat = siamese_model.predict([test_input, test_val])
  r.update_state(y_true, yhat)
  p.update_state(y_true, yhat)

print(r.result().numpy(), p.result().numpy())

#Visualize results

In [None]:
# set plot size
plt.figure(figsize=(10,8))

# set first subplot
plt.subplot(1,2,1)
plt.imshow(test_input[4])

# set second subplot
plt.subplot(1,2,2)
plt.imshow(test_val[4])

# renders cleanly
plt.show()

#Save Model

In [None]:
# save weights
siamese_model.save('siamesemodelv2.h5')

In [None]:
L1Dist

In [None]:
siamese_model = tf.keras.models.load_model('siamesemodelv2.h5', 
                                   custom_objects={'L1Dist':L1Dist, 'BinaryCrossentropy':tf.losses.BinaryCrossentropy})


In [None]:
# make predictions with reloaded model
siamese_model.predict([test_input, test_val])

In [None]:
siamese_model.summary()

##save model to google drive

In [None]:
!cp /content/siamesemodelv2.h5 "/content/drive/MyDrive/Colab Notebooks/"