In [1]:
import cv2
import os
import random
import numpy as np
from matplotlib import pyplot as plt

In [2]:
from keras._tf_keras.keras.models import Model
from keras._tf_keras.keras.layers import Layer,Conv2D,Dense,MaxPooling2D,Input,Flatten
import tensorflow as tf

In [3]:
gpus=tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu,True)

In [4]:
pos_path=os.path.join('data','positive')
neg_path=os.path.join('data','negative')
anc_path=os.path.join('data','anchor')

In [5]:
os.makedirs(pos_path)
os.makedirs(neg_path)
os.makedirs(anc_path)

In [6]:

for directory in os.listdir('lfw'):
    for img in os.listdir(os.path.join('lfw',directory)):
        curr_path=os.path.join('lfw',directory,img)
        new_path=os.path.join(neg_path,img)
        os.replace(curr_path,new_path)

In [7]:
import uuid 

In [8]:
cap=cv2.VideoCapture(0)
while cap.isOpened():
    ret,frame=cap.read()
    frame=frame[130:130+250,250:250+250,: ]
    if (cv2.waitKey(1) & 0XFF==ord('a')):
         image_name=os.path.join(anc_path,'{}.jpg'.format(uuid.uuid1()))
         cv2.imwrite(image_name,frame)

    if (cv2.waitKey(1) & 0XFF==ord('p')):
         image_name=os.path.join(pos_path,'{}.jpg'.format(uuid.uuid1()))
         cv2.imwrite(image_name,frame)
    
    cv2.imshow("Image Collection",frame)
    if (cv2.waitKey(1) & 0XFF==ord('q')):
         break
cap.release()
cv2.destroyAllWindows()   

In [9]:
anchor=tf.data.Dataset.list_files(anc_path +'\\*.jpg').take(300)
positive=tf.data.Dataset.list_files(pos_path +'\\*.jpg').take(300)
negative=tf.data.Dataset.list_files(neg_path +'\\*.jpg').take(300)

In [None]:
itr=anchor.as_numpy_iterator()
itr.next()

In [11]:
def preprocessing(file_path):
    byte_image=tf.io.read_file(file_path)
    img=tf.io.decode_jpeg(byte_image)
    img=tf.image.resize(img,(100,100))
    img=img/255.0
    return img

In [None]:
image=preprocessing('data\\anchor\\5dc95cdd-5bee-11ef-9edd-202b20ec451a.jpg')
# plt.imshow(image)
image.numpy().min()

In [13]:
positives=tf.data.Dataset.zip((anchor,positive,tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives=tf.data.Dataset.zip((anchor,negative,tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data=positives.concatenate(negatives)

In [None]:
samples=data.as_numpy_iterator()
eg=samples.next()
eg

In [15]:
def preprocessing_twin(input_image,validation_image,label):
    return (preprocessing(input_image),preprocessing(validation_image),label)

In [16]:
res=preprocessing_twin(*eg)
# plt.imshow(res[1])

In [17]:
data=data.map(preprocessing_twin)
data=data.cache()
data=data.shuffle(buffer_size=1024)

In [18]:
train_data=data.take(round(len(data)*0.7))
train_data=train_data.batch(16)
train_data=train_data.prefetch(8)

In [None]:
train_sample=train_data.as_numpy_iterator()
print(len(train_sample.next()[2]))

In [20]:
test_data=data.skip(round(len(data)*0.7))
test_data=test_data.take(round(len(data)*0.3))
test_data=test_data.batch(16)
test_data=test_data.prefetch(8)


In [21]:
def make_embedding():
    inp=Input(shape=(100,100,3),name="Input_Image")
    
    #first block
    c1=Conv2D(64,(10,10),activation='relu')(inp)
    m1=MaxPooling2D(64,(2,2),padding='same')(c1)

    #second block
    c2=Conv2D(128,(7,7),activation='relu')(m1)
    m2=MaxPooling2D(64,(2,2),padding='same')(c2)

    #third block
    c3=Conv2D(128,(4,4),activation='relu')(m2)
    m3=MaxPooling2D(64,(2,2),padding='same')(c3)

    #final embedding
    c4=Conv2D(256,(4,4),activation='relu')(m3)
    f1=Flatten()(c4)
    d1=Dense(4096,activation='sigmoid')(f1)

    return Model(inputs=inp,outputs=d1,name="embedding")

In [None]:
embedding=make_embedding()
embedding.summary()

In [23]:
class L1Dist(Layer):
    def __init__(self,**kwargs):
        super().__init__()

    def call(self,input_embedding,validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding)    



In [24]:
def make_siamese_model():
    #taking anchor image as input
    input_image=Input(shape=(100,100,3),name='input image')

    #taking validation image as input
    validation_image=Input(shape=(100,100,3),name='validation image')

    #combining siamese distance components
    siamese_layer=L1Dist()
    siamese_layer._name = 'distance'
    distances=siamese_layer(embedding(input_image),embedding(validation_image))

    #classification layer
    classifier=Dense(1,activation='sigmoid')(distances)

    return Model(inputs=[input_image,validation_image],outputs=classifier,name='SiameseNetwork')



In [None]:
siamese_model = make_siamese_model()
siamese_model.summary()

In [26]:
binary_cross_loss=tf.losses.BinaryCrossentropy()
opt=tf.keras.optimizers.Adam(1e-4)#0.0001

In [27]:
checkpoints_dir='./training_checkpoints'
checkpoints_prefix=os.path.join(checkpoints_dir,'ckpt')
checkpoint=tf.train.Checkpoint(opt=opt,siamese_model=siamese_model)

In [28]:
@tf.function
def train_step(batch):

    with tf.GradientTape() as tape: 
        # get anchor and positive/negative image
        x=batch[:2]
        #get label
        y=batch[2]

        #forward pass
        y_predicted=siamese_model(x,training=True)
        #calculating loss
        loss=binary_cross_loss(y,y_predicted)
    print(loss)

    grad=tape.gradient(loss,siamese_model.trainable_variables)
    opt.apply_gradients(zip(grad,siamese_model.trainable_variables))    
    
    return loss


In [29]:
def train(data,EPOCHS):
    for epoch in range(1,EPOCHS+1):
        print('\n Epoch {}/{}'.format(epoch,EPOCHS))
        progbar=tf.keras.utils.Progbar(len(data))

        for idx,batch in enumerate(data):
                train_step(batch)
                progbar.update(idx+1)

        if epoch%10==0:
             checkpoint.save(file_prefix=checkpoints_prefix)    


In [None]:
EPOCHS=50
train(train_data,EPOCHS)

In [31]:
from tensorflow.keras.metrics import Precision,Recall

In [None]:
test_input,test_val,y_true=test_data.as_numpy_iterator().next()
y_true

In [None]:
#make predictions
y_hat=siamese_model.predict([test_input,test_val])
y_hat

In [None]:
res=[]
for prediction in y_hat:
    if(prediction>0.5):
        res.append(1)
    else:
        res.append(0)

        
res        


In [None]:
m=Recall()
m.update_state(y_true,y_hat)
m.result().numpy()

In [None]:
m=Precision()
m.update_state(y_true,y_hat)
m.result().numpy()

In [None]:
plt.figure(figsize=(10,8))
plt.subplot(1,2,1)
plt.imshow(test_input[1])
plt.subplot(1,2,2)
plt.imshow(test_val[1])
plt.show()


In [None]:
siamese_model.save('model.h5')

In [None]:
#reload model
model=tf.keras.models.load_model('model.h5',custom_objects={'L1Dist':L1Dist,
                                                            'BinaryCrossentropy':tf.losses.BinaryCrossentropy})

In [None]:
model.predict([test_input,test_val])

In [41]:
def verify(model,detection_threshold,verification_threshold):
    results=[]
    for img in os.listdir(os.path.join('app_data','verification_images')):
        input_image=preprocessing(os.path.join('app_data','input_images','input_image.jpg'))
        validation_image=preprocessing(os.path.join('app_data','verification_images',img))
        
        result=model.predict(list(np.expand_dims([input_image,validation_image],axis=1)))
        results.append(result)
    
    detection=np.sum(np.array(results)>detection_threshold)

    verification=detection/len(os.listdir(os.path.join('app_data','verification_images')))
    verified=(verification>verification_threshold)

    return results,verified

In [None]:
cap=cv2.VideoCapture(0)
while cap.isOpened():
    ret,frame=cap.read()
    frame=frame[130:130+250,250:250+250,: ]
    cv2.imshow('Verification',frame)

    if cv2.waitKey(10) & 0xFF ==ord('v'):
        cv2.imwrite(os.path.join('app_data','input_images','input_image.jpg'),frame)
        results , verified = verify(model,0.5,0.5)
        print(verified)

    if cv2.waitKey(10) & 0xFF ==ord('q'):
        break
cap.release()
cv2.destroyAllWindows()    