In [None]:
import cv2
import random
import os
import matplotlib.pyplot as plt
import numpy as np
import uuid
import shutil

In [None]:
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, Dense, Input, Flatten, MaxPooling2D
from tensorflow.keras.metrics import Precision, Recall

In [None]:
pos_path=os.path.join('data','positive')
neg_path=os.path.join('data','negative')
anc_path=os.path.join('data','anchor')

In [None]:
os.makedirs(pos_path)
os.makedirs(neg_path)
os.makedirs(anc_path)

In [None]:
#http://vis-www.cs.umass.edu/lfw/lfw.tgz
!tar -xf lfw.tgz

In [None]:
for dic in os.listdir(os.path.join('lfw')):
    for file in os.listdir(os.path.join('lfw',dic)):
        existing_path=os.path.join('lfw',dic,file)
        new_path=os.path.join(neg_path,file)
        os.replace(existing_path,new_path)

In [None]:
cam=cv2.VideoCapture(0)
while cam.isOpened():
    ret, frame=cam.read()
    if cv2.waitKey(1) & 0XFF==ord('a'):
        cv2.imwrite(os.path.join(anc_path,f'{uuid.uuid1()}.jpg'),frame[70:200+250,200:230+250,:])
    if cv2.waitKey(1) & 0XFF==ord('p'):
        cv2.imwrite(os.path.join(pos_path,f'{uuid.uuid1()}.jpg'),frame[70:200+250,200:230+250,:])
    cv2.imshow('webCam',frame[70:200+250,200:230+250,:])
    if cv2.waitKey(1) & 0XFF == ord('q'):
        break
cam.release()
cv2.destroyAllWindows()

In [None]:
path=pos_path
for image in os.listdir(path):
    img=cv2.imread(os.path.join(path,image))
    resized=cv2.resize(img,(250,250))
    cv2.imwrite(os.path.join(path,image),resized)

In [None]:
anchor=tf.data.Dataset.list_files(anc_path+'\*.jpg').take(300)
positive=tf.data.Dataset.list_files(pos_path+'\*.jpg').take(300)
negative=tf.data.Dataset.list_files(neg_path+'\*.jpg').take(300)

In [None]:
def preprocess(file_path):
    byte_img=tf.io.read_file(file_path)
    img=tf.io.decode_jpeg(byte_img)
    img=tf.image.resize(img,(105,105))
    img=img/255.0
    return img

In [None]:
positives=tf.data.Dataset.zip((anchor,positive,tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives=tf.data.Dataset.zip((anchor,negative,tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data=positives.concatenate(negatives)

In [None]:
def preprocess_twin(input_img,validation_img,label):
    return(preprocess(input_img),preprocess(validation_img),label)

In [None]:
data=data.map(preprocess_twin)
data=data.cache()
data=data.shuffle(buffer_size=1024)

In [None]:
train_data=data.take(round(len(data)*0.7))
train_data=train_data.batch(16)
train_data=train_data.prefetch(8)

In [None]:
test_data=data.skip(round(len(data)*0.7))
test_data=test_data.take(round(len(data)*0.3))
test_data=test_data.batch(16)
test_data=test_data.prefetch(8)

In [None]:
def make_embedding():
    inp=Input(shape=(105,105,3),name='input_image')
    c1=Conv2D(64,(10,10),activation='relu')(inp)
    m1=MaxPooling2D(64,(2,2),padding='same')(c1)
    c2=Conv2D(128,(7,7),activation='relu')(m1)
    m2=MaxPooling2D(64,(2,2),padding='same')(c2)
    c3=Conv2D(128,(4,4),activation='relu')(m2)
    m3=MaxPooling2D(64,(2,2),padding='same')(c3)
    c4=Conv2D(256,(4,4),activation='relu')(m3)
    f1=Flatten()(c4)
    d1=Dense(4096,activation='sigmoid')(f1)
    
    return Model(inputs=[inp],outputs=[d1],name='embedding')

In [None]:
class L1Dist(Layer):
    def __init__(self,**kwargs):
        super().__init__()
    def call(self,input_embedding,validation_embedding):
        return tf.math.abs(input_embedding-validation_embedding)   

In [None]:
embedding=make_embedding()
def make_siamese_model():
    input_image=Input(name='input_image',shape=(105,105,3))
    validation_image=Input(name='validation_image',shape=(105,105,3))
    siamese_layer=L1Dist()
    siamese_layer._name='distance'
    distances=siamese_layer(embedding(input_image),embedding(validation_image))
    classifier=Dense(1,activation='sigmoid')(distances)
    return Model(inputs=[input_image,validation_image],outputs=[classifier],name='SiameseNetwork')

In [None]:
inp=embedding(Input(name='input_image',shape=(105,105,3)))
val=embedding(Input(name='validation_image',shape=(105,105,3)))

In [None]:
siamese_layer=L1Dist()
siamese_layer(inp,val)

In [None]:
siamese_model=make_siamese_model()

In [None]:
siamese_model.summary()

In [None]:
binary_cross_loss=tf.losses.BinaryCrossentropy()
optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4)

In [None]:
checkpoint_dir='./training_checkpoints'
os.makedirs(checkpoint_dir)
checkpoint_prefix=os.path.join(checkpoint_dir,'ckpt')
chechpoint=tf.train.Checkpoint(opt=optimizer,siamese_model=siamese_model)

In [None]:
@tf.function()
def train_step(batch):
    with tf.GradientTape() as tape:
        X=batch[:2]
        y=batch[2]
        yhat=siamese_model(X,training=True)
        loss=binary_cross_loss(y,yhat)
    print(loss)
    grad=tape.gradient(loss,siamese_model.trainable_variables)
    optimizer.apply_gradients(zip(grad,siamese_model.trainable_variables))
    return loss

In [None]:
def train(data,epoches):
    for epoch in range(1,epoches+1):
        print(f'Epoch {epoch}/{epoches}')
        progbar=tf.keras.utils.Progbar(len(data))
        
        for idx,batch in enumerate(data):
            train_step(batch)
            progbar.update(idx+1)
            
            
            if epoch % 10 == 0 :
                chechpoint.save(file_prefix = checkpoint_prefix)

In [None]:
train(train_data,50)

In [None]:
test_input, val_input,y_true=test_data.as_numpy_iterator().next()


In [None]:
y_hat=siamese_model.predict([test_input,val_input])
y_hat

In [None]:
res=[1 if predict> 0.5 else 0 for predict in y_hat]
res

In [None]:
recall=Recall()
recall.update_state(y_true,res)
recall_score=recall.result().numpy()
recall_score

In [None]:
precision=Precision()
precision.update_state(y_true,res)
precision_score=precision.result().numpy()
precision_score

In [None]:
f1score=2*(precision_score*recall_score)/(precision_score+recall_score)
f1score

In [None]:
plt.figure(figsize=(18,8))
plt.subplot(1,2,1)
plt.axis('off')
plt.imshow(test_input[0])
plt.subplot(1,2,2)
plt.axis('off')
plt.imshow(val_input[0])
plt.show()

In [None]:
y_true[2]

In [None]:
siamese_model.save('siames model.h5')

In [None]:
model=tf.keras.models.load_model('siames model.h5',custom_objects={'L1Dist':L1Dist})

In [None]:
model.predict([test_input,val_input])

In [None]:
model.summary()

In [None]:
verification_images=os.path.join('application_data','verification_image')
input_image=os.path.join('application_data','input_images')

In [None]:
os.makedirs(verification_images)
os.makedirs(input_images)

In [None]:
images_list=os.listdir(pos_path)
random.shuffle(images_list)
ver_img=images_list[:50]
for image in ver_img:
    shutil.copyfile(os.path.join(pos_path,image),os.path.join(verification_images,image))

In [None]:
def verfiy(model,detection_threshold,verification_threshold):
    results=[]
    
    for image in os.listdir(verification_images):
        validation_image=preprocess(os.path.join(verification_images,image))
        input_img=preprocess(os.path.join(input_image,'input_image.jpg'))
        
        result=model.predict(list(np.expand_dims(np.array([validation_image,input_img]),axis=1)))
        results.append(result)
    detection=np.sum(np.array(results)>detection_threshold)

    verification=detection/len(os.listdir(verification_images))
        
    return results, verification>verification_threshold


In [None]:
cap=cv2.VideoCapture(0)
while cap.isOpened():
    ret,frame=cap.read()
    frame=frame[70:200+250,200:230+250,:]
    cv2.imshow('verification',frame)
    
    
    if cv2.waitKey(1) & 0XFF==ord('v'):
        cv2.imwrite(os.path.join(input_image,'input_image.jpg'),frame)
        results,verified=verfiy(model,0.9,0.7)
        print(verified)
    if cv2.waitKey(1) & 0XFF==ord('q'):
        break
        
cap.release()
cv2.destroyAllWindows()