In [None]:
# importing standard dependencies
import cv2
import os
import random
import numpy as np
from matplotlib import pyplot as plt

In [None]:
# import tensorflow dependencies - functional API
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer , Conv2D , Dense , MaxPooling2D , Input , Flatten
import tensorflow as tf

In [None]:
# setup paths
POS_Path = os.path.join('data', 'positive') # for positive verification
NEG_Path = os.path.join('data', 'negative') # for different verification of face that it is different from input or label
ANC_Path = os.path.join('data', 'anchor') # our real data

In [None]:
os.chdir('/content/drive/MyDrive/Colab Notebooks')

In [None]:
os.getcwd()

'/content/drive/MyDrive/Colab Notebooks'

In [None]:
# !unzip -q lfw.zip

In [None]:
# path = 'lfw-deepfunneled/lfw-deepfunneled'
# i=0
# for directory in os.listdir(path):
#   if(i>=3500):
#     break
#   for file in os.listdir(os.path.join(path,directory)):
#     old_path = os.path.join(path,directory,file)
#     new_path = os.path.join(NEG_Path,file)
#     os.replace(old_path,new_path)
#     i+=1

# print("done")

In [None]:
# i

In [None]:
# there are only 3088 images in lfw so train on that only

In [None]:
# the line creates a TensorFlow dataset named anchor containing the absolute paths of up to the first 300 JPEG files found in the directory specified by ANC_Path
anchor = tf.data.Dataset.list_files(ANC_Path+'/*.jpg').take(1500)
positive = tf.data.Dataset.list_files(POS_Path+'/*.jpg').take(1500)
negative = tf.data.Dataset.list_files(NEG_Path+'/*.jpg').take(1500)

In [None]:
dir_test = anchor.as_numpy_iterator()

In [None]:
dir_test.next()

b'data/anchor/df4b9ba6-bbc1-11f0-8f69-485f99285894.jpg'

In [None]:
def preprocess(file_path):
  byte_img = tf.io.read_file(file_path)
  img = tf.io.decode_jpeg(byte_img)
  img = tf.image.resize(img,(100,100))
  img = img/255.0

  return img

In [None]:
positives = tf.data.Dataset.zip((anchor,positive,tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor,negative,tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data = positives.concatenate(negatives)


In [None]:
def preprocess_twin_withLabel(input_img, validation_img, label):
  return(preprocess(input_img),preprocess(validation_img),label)

In [None]:
# build dataLoader pipeline
data = data.map(preprocess_twin_withLabel)
data = data.cache()
data = data.shuffle(buffer_size=5000)

In [None]:
data

<_ShuffleDataset element_spec=(TensorSpec(shape=(100, 100, None), dtype=tf.float32, name=None), TensorSpec(shape=(100, 100, None), dtype=tf.float32, name=None), TensorSpec(shape=(), dtype=tf.float32, name=None))>

In [None]:
len(data)

6000

In [None]:
train_data = data.take(round(len(data)*.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

In [None]:
test_data = data.skip(round(len(data)*.7))
test_data = test_data.take(round(len(data)*.3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

In [None]:
train_data

<_PrefetchDataset element_spec=(TensorSpec(shape=(None, 100, 100, None), dtype=tf.float32, name=None), TensorSpec(shape=(None, 100, 100, None), dtype=tf.float32, name=None), TensorSpec(shape=(None,), dtype=tf.float32, name=None))>

Model Engineering

In [None]:
def make_embedding():
  inp = Input(shape=(100,100,3),name='input_image')

  # first block
  c1 = Conv2D(64,(10,10), activation='relu')(inp)
  m1 = MaxPooling2D((2,2), padding='same')(c1)

  # second block
  c2 = Conv2D(128,(7,7), activation='relu')(m1)
  m2 = MaxPooling2D((2,2), padding='same')(c2)

  # third block
  c3 = Conv2D(128,(4,4), activation='relu')(m2)
  m3 = MaxPooling2D((2,2), padding='same')(c3)

  # final embedding block
  c4 = Conv2D(256,(4,4), activation='relu')(m3)
  f1 = Flatten()(c4)
  d1 = Dense(2048, activation='sigmoid')(f1)

  return Model(inputs=[inp], outputs=[d1], name='embedding')

In [None]:
embedding = make_embedding()
embedding.summary()

Building Distance Layer

In [None]:
class L1Dist(Layer):
  def __init__(self, **kwargs):
    super().__init__()

  # simpilarity comparison
  def call(self, input_embedding, validation_embedding):
    input_embedding = tf.convert_to_tensor(input_embedding)
    validation_embedding = tf.convert_to_tensor(validation_embedding)
    return tf.math.abs(input_embedding - validation_embedding)

In [None]:
def make_siamese_model():

  # handle inputs
  # anchor image input in the network
  input_image = Input(name='input_img', shape=(100,100,3))

  # validation image input
  validation_image = Input(name='validation_img', shape=(100,100,3))

  distances = L1Dist()(embedding(input_image), embedding(validation_image))


  # classification layer
  classifier = Dense(1, activation='sigmoid')(distances)

  return Model(inputs=[input_image, validation_image], outputs=classifier, name='SiameseNetwork')

In [None]:
siamese_model = make_siamese_model()

In [None]:
# def contrastive_loss(y_true, y_pred, margin=1.0):
#     """
#     y_true: 1 if same, 0 if different
#     y_pred: model output distance between embeddings
#     """
#     y_true = tf.cast(y_true, y_pred.dtype)
#     squared_dist = tf.square(y_pred)
#     margin_dist = tf.square(tf.maximum(margin - y_pred, 0))
#     return tf.reduce_mean(y_true * squared_dist + (1 - y_true) * margin_dist)


In [None]:
binary_cross_loss = tf.losses.BinaryCrossentropy()

In [None]:
opt= tf.keras.optimizers.Adam(1e-4) #0.0001

In [None]:
checkpoints_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoints_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(opt=opt, siamese_model=siamese_model)

In [None]:
@tf.function
def train_step(batch):

  # Record all of our ooperations
  with tf.GradientTape() as tape:
    # get anchor and positive/negative image
    X = batch[:2]
    # get label
    y = batch[2]

    # forward pass
    yhat = siamese_model(X, training=True)
    yhat = tf.reshape(yhat, [-1]) # Reshape yhat to match the shape of y

    # calculate loss
    loss = binary_cross_loss(y, yhat)

  print(loss)

  # calculate gradient
  grad = tape.gradient(loss, siamese_model.trainable_variables)

  # calculate updated weights and apply to siamese model
  opt.apply_gradients(zip(grad, siamese_model.trainable_variables))

  return loss


In [None]:
def train(data,EPOCHS):
  # loop through epochs
  for epoch in range(1,EPOCHS+1):
    print('\n Epoch {}/{}'.format(epoch,EPOCHS))
    progbar = tf.keras.utils.Progbar(len(data))

    # loop through each batch
    for idx, batch in enumerate(data):
      # train step
      train_step(batch)
      progbar.update(idx+1)

    # save checkpoints
    if epoch % 10 == 0:
      checkpoint.save(file_prefix=checkpoint_prefix)

In [None]:
EPOCHS = 40

In [None]:
train(train_data,EPOCHS)


 Epoch 1/40


In [None]:
test_input, test_val, y_true = test_data.as_numpy_iterator().next()

In [None]:
len(test_input)

In [None]:
y_hat = siamese_model.predict([test_input,test_val])

In [None]:
y_hat

In [None]:
y_hat = y_hat.reshape(16,)
y_hat =[1 if prediction > 0.5 else 0 for prediction in y_hat ]
print(y_hat)
print(y_true)

In [None]:
# model = tf.keras.models.load_model('siamesemodel1new.h5', custom_objects={'L1Dist':L1Dist, 'BinaryCrossentropy':tf.losses.BinaryCrossentropy})


In [None]:
# siamese_model.save('siamesemodelversion2.h5')

In [None]:
# from google.colab import files
# files.download('siamesemodelversion2.h5')