In [4]:
import cv2
import os
import random
import uuid
import numpy as np
from matplotlib import pyplot as plt

In [5]:
import tensorflow as tf
from tensorflow import data
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten

In [6]:
gpus=tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
  tf.config.experimental.set_memory_growth(gpu, True)

In [9]:
pos_path=os.path.join('C:\\Users\\shrir\\OneDrive\\Desktop\\data','positive')
neg_path=os.path.join('C:\\Users\\shrir\\OneDrive\\Desktop\\data','negative')
anc_path=os.path.join('C:\\Users\\shrir\\OneDrive\\Desktop\\data','anchor')

In [None]:
os.makedirs(pos_path)
os.makedirs(neg_path)
os.makedirs(anc_path)

In [None]:
for directory in os.listdir('lfw'):
  for file in os.listdir(os.path.join('lfw',directory)):
    ex_path=os.path.join('lfw',directory,file)
    new_path=os.path.join(neg_path, file)
    os.replace(ex_path, new_path)

In [None]:
cap=cv2.VideoCapture(0)
while cap.isOpened():
  ret, frame = cap.read()
  frame=cv2.flip(frame,1)
  frame=frame[120:370,200:450,:]

  if cv2.waitKey(1) & 0XFF==ord('a'):
    imgname=os.path.join(anc_path,'{}.jpg'.format(uuid.uuid1()))
    cv2.imwrite(imgname, frame)
  if cv2.waitKey(1) & 0XFF==ord('p'):
    imgname=os.path.join(pos_path,'{}.jpg'.format(uuid.uuid1()))
    cv2.imwrite(imgname, frame)

  cv2.imshow("Image Collection", frame)

  if cv2.waitKey(1) & 0XFF==ord('q'):
    break
cap.release()
cv2.destroyAllWindows()

In [11]:
anchor=data.Dataset.list_files(os.path.join(anc_path ,'*.jpg')).take(300)
positive=data.Dataset.list_files(os.path.join(pos_path,'*.jpg')).take(300)
negative=data.Dataset.list_files(os.path.join(neg_path,'*.jpg')).take(300)

In [12]:
def preprocess(file_path):
  byte_img=tf.io.read_file(file_path)
  img=tf.io.decode_jpeg(byte_img)
  img=tf.image.resize(img,(105,105))
  img=img/255.0
  return img

In [13]:
positives=tf.data.Dataset.zip((anchor,positive,tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives=tf.data.Dataset.zip((anchor,negative,tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data=positives.concatenate(negatives)

In [14]:
def preprocess_twin(input_img, validation_img, label):
  return(preprocess(input_img), preprocess(validation_img), label)

In [15]:
data=data.map(preprocess_twin)
data=data.cache()
data=data.shuffle(buffer_size=1024)

In [16]:
train_data=data.take(round(len(data)*0.7))
train_data=train_data.batch(16)
train_data=train_data.prefetch(8)

In [17]:
test_data=data.skip(round(len(data)*0.7))
test_data=test_data.take(round(len(data)*0.3))
test_data=test_data.batch(16)
test_data=test_data.prefetch(8)

In [20]:
def make_embedding():
  inp=Input(shape=(105,105,3), name='input_image')

  c1=Conv2D(64, (10,10), activation='relu')(inp)
  m1=MaxPooling2D(64, (2,2), padding='same')(c1)

  c2=Conv2D(128, (7,7), activation='relu')(m1)
  m2=MaxPooling2D(64, (2,2), padding='same')(c2)

  c3=Conv2D(128, (4,4), activation='relu')(m2)
  m3=MaxPooling2D(64, (2,2), padding='same')(c3)

  c4=Conv2D(256, (4,4), activation='relu')(m3)
  f1=Flatten()(c4)
  d1=Dense(4096, activation='sigmoid')(f1)

  return Model(inputs=[inp], outputs=[d1], name='embedding')

In [21]:
embedding = make_embedding()

In [22]:
embedding.summary()

In [43]:
class L1Dist(Layer):
  def __init__(self, **kwargs):
    super().__init__()
  def call(self, input_embedding, validation_embedding):
    out=tf.math.abs(input_embedding - validation_embedding))

In [None]:
def make_siamese_model():
  input_img=Input(name="input_image", shape=(105,105,3))
  validation_img=Input(name="validation_image", shape=(105,105,3))

  siamese_layer = L1Dist()
  inp_embedding = embedding(input_img)
  val_embedding = embedding(validation_img)
  distances = siamese_layer(inp_embedding, val_embedding)
  classifier = Dense(1, activation='sigmoid')(distances)
  return Model(inputs=[input_img, validation_img], outputs=classifier, name='SiameseNetwork')

In [None]:
siamese_model=make_siamese_model()

In [None]:
siamese_model.summary()

In [None]:
binary_cross_loss = tf.losses.BinaryCrossentropy(from_logits=True)

In [None]:
opt = tf.keras.optimizers.Adam(1e-4)

In [None]:
checkpoint_dir='./training_checkpoints'
checkpoint_prefix=os.path.join(checkpoint_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(opt=opt, siamese_model=siamese_model)

In [None]:
@tf.function
def train_step(batch):

  with tf.GradientTape() as tape:
    X=batch[:2]
    y=batch[2]

    yhat=siamese_model(X, training=True)
    loss=binary_cross_loss(y, yhat)

  grad=tape.gradient(loss, siamese_model.trainable_variables)
  opt.apply_gradients(zip(grad, siamese_model.trainable_variables))

  return loss

In [None]:
batch_1=test_data.as_numpy_iterator().next()
X=batch_1[:2]
np.array(X).shape

In [None]:
def train(data, EPOCHS):
  for epoch in range(1, EPOCHS+1):
    print('\n Epoch{}/{}'.format(epoch, EPOCHS))
    progbar = tf.keras.utils.Progbar(len(data))

    for idx, batch in enumerate(data):
      train_step(batch)
      progbar.update(idx+1)

    if epoch%10==0:
      checkpoint.save(file_prefix=checkpoint_prefix)

In [None]:
EPOCHS=50

In [None]:
train(train_data, EPOCHS)

In [None]:
from tensorflow.keras.metrics import Precision, Recall

In [None]:
test_input, test_val, y_true = test_data.as_numpy_iterator().next()

In [None]:
yhat=siamese_model.predict([test_input, test_val])

In [None]:
res=[]
for prediction in yhat:
  if prediction > 0.5:
    res.append(1)
  else:
    res.append(0)
yhat=np.array(res)

In [None]:
m=Recall()
m.update_state(y_true, yhat)
m.result().numpy()

In [None]:
m=Precision()
m.update_state(y_true, yhat)
m.result().numpy()

In [None]:
plt.figure(figsize=(10,8))
plt.subplot(1,2,1)
plt.imshow(test_input[0])
plt.subplot(1,2,2)
plt.imshow(test_val[0])
plt.show()

In [None]:
siamese_model.save('siamesemodel.h5')

In [None]:
model=tf.keras.models.load_model('siamesemodel.h5',
                                 custom_objects={L1Dist:'L1Dist', 'BinaryCrossEntropy':tf.losses.BinaryCrossEntropy})

In [None]:
def verify(model, detection_threshold, verification_threshold):
  results=[]
  for image in os.listdir(os.path.join('application_data','varification_images')):
    input_img = preprocess(os.path.join('application_data','input_image', 'input_image.jpg'))
    validation_img = preprocess(os.path.join('application_data','varification_images', image))

    result = model.predict(list(np.expand_dims([input_img, validation_img], axis=1)))
    results.append(result)

  detection = np.sum(np.array(results) > detection_threshold)

  verification = detection / len(os.listdir(os.path.join('application_data','varification_images')))
  verified = verification > verification_threshold

  return results, verified

In [None]:
cap=cv2.VideoCapture(0)
while cap.isOpened():
  ret, frame = cap.read()
  frame=cv2.flip(frame,1)
  frame=frame[120:370,200:450,:]

  cv2.imshow("Verification", frame)

  if cv2.waitKey(10) & 0XFF==ord('v'):
    cv2.imwrite(os.path.join('application_data','input_image', 'input_image.jpg'), frame)

  results, verified = verify(model, 0.5, 0.5)
  print(verified)

  if cv2.waitKey(10) & 0XFF==ord('q'):
    break
cap.release()
cv2.destroyAllWindows()