In [1]:
import cv2
import os
import random
import numpy as np
import matplotlib.pyplot as plt
import uuid

In [2]:
#Using tensorflow - Function API
os.chdir(r'C:\Users\RaloP\FaceDetection')
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten
import tensorflow as tf

In [3]:
#Avoid out of memory errors my limiting gpu memory consumption growth
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpy,True)

In [4]:
#Setup up paths
POS_PATH = os.path.join('data','postive')
NEG_PATH = os.path.join('data','negative')
ANC_PATH = os.path.join('data','anchor')

In [5]:
#Making Directories
os.makedirs(POS_PATH)
os.makedirs(NEG_PATH)
os.makedirs(ANC_PATH)

FileExistsError: [WinError 183] Cannot create a file when that file already exists: 'data\\postive'

In [None]:
#Uncompress Tar Gz Labelled Faces in the Wild Dataset
!tar -xf lfw.tgz

In [None]:
#Moving Images to data/negative repo
for directory in os.listdir('lfw'):
    for file in os.listdir(os.path.join('lfw',directory)):
        EX_PATH = os.path.join('lfw',directory,file)
        NEW_PATH = os.path.join(NEG_PATH,file)
        os.replace(EX_PATH,NEW_PATH)

In [None]:
#Accessing web cam
cap = cv2.VideoCapture(0)
while cap.isOpened():
    ret,frame = cap.read()
    #Cut Down frame to 250x250 pixels
    frame = frame[120:120+250,200:200+250, :]
    #Collect anchors
    if cv2.waitKey(1) & 0XFF == ord('a'):
        #Create unique file path
        imgname = os.path.join(ANC_PATH,'{}.jpg'.format(uuid.uuid1()))
        #Write anchor image
        cv2.imwrite(imgname,frame)
        
    #Collect positivies
    if cv2.waitKey(1) & 0XFF == ord('p'):
        #Create unique file path
        imgname = os.path.join(POS_PATH,'{}.jpg'.format(uuid.uuid1()))
        #Write positive image
        cv2.imwrite(imgname,frame)
        
    cv2.imshow('Image Collection',frame)
    
    if cv2.waitKey(1) & 0XFF == ord('q'):
        break
cap.release()
cv2.destroyAllWindows()

In [6]:
#GRABS IMAGES FROM THEIR DIRECTORY
anchor = tf.data.Dataset.list_files(ANC_PATH+'\*.jpg').take(300)
positive = tf.data.Dataset.list_files(POS_PATH+'\*.jpg').take(300)
negative = tf.data.Dataset.list_files(NEG_PATH+'\*.jpg').take(300)

In [7]:
dir_test = anchor.as_numpy_iterator()

In [8]:
dir_test.next()

b'data\\anchor\\40736ef2-392a-11ee-b8d7-744ca192b5ac.jpg'

In [9]:
def preprocess(file_path):
    #Reading in image from file path
    byte_img = tf.io.read_file(file_path)
    #Load in Image
    img = tf.io.decode_jpeg(byte_img)
    #Preprocess(Reize image to 100x100x3)
    img = tf.image.resize(img,(100,100))
    #Scaling the image to be between 0 and 1
    img = img/255.0
    return img

In [10]:
preprocess('data\\anchor\\19641f29-392a-11ee-85ee-744ca192b5ac.jpg')

<tf.Tensor: shape=(100, 100, 3), dtype=float32, numpy=
array([[[0.5806373 , 0.58455884, 0.564951  ],
        [0.58210784, 0.58406866, 0.5644608 ],
        [0.5953431 , 0.5855392 , 0.560049  ],
        ...,
        [0.5468137 , 0.52892154, 0.49436274],
        [0.547549  , 0.5252451 , 0.49215686],
        [0.54338235, 0.5276961 , 0.48455882]],

       [[0.58235294, 0.58431375, 0.5647059 ],
        [0.5772059 , 0.57279414, 0.5531863 ],
        [0.5995098 , 0.5877451 , 0.56960785],
        ...,
        [0.57058823, 0.5355392 , 0.4997549 ],
        [0.57303923, 0.5387255 , 0.50147057],
        [0.56421566, 0.5357843 , 0.49558824]],

       [[0.59166664, 0.58186275, 0.5563725 ],
        [0.6068627 , 0.595098  , 0.5718137 ],
        [0.6012255 , 0.58210784, 0.56911767],
        ...,
        [0.5296569 , 0.50416666, 0.45808825],
        [0.53088236, 0.5009804 , 0.4512255 ],
        [0.5504902 , 0.51911765, 0.47009805]],

       ...,

       [[0.889951  , 0.8664216 , 0.90563726],
        [0.87

In [11]:
positives = tf.data.Dataset.zip(((anchor,positive,tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor))))))
negatives = tf.data.Dataset.zip(((anchor,negative,tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor))))))
data = positives.concatenate(negatives)

In [12]:
def preprocess_twin(input_img,validation_img,label):
    return(preprocess(input_img), preprocess(validation_img), label)

In [13]:
#Building dataloader pipline
data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size=1024)

In [14]:
#Training partition 
train_data = data.take(round(len(data)*.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

In [15]:
#Testing partition
test_data = data.skip(round(len(data)*.7))
test_data = test_data.take(round(len(data)*.3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

In [16]:
def make_embedding():
    inp = Input(shape=(100,100,3),name='input_image')
    #First Block
    c1 = Conv2D(64,(10,10),activation='relu')(inp)
    m1 = MaxPooling2D(64,(2,2),padding='same')(c1)
    #Second Block
    c2 = Conv2D(128,(7,7),activation='relu')(m1)
    m2 = MaxPooling2D(64,(2,2),padding='same')(c2)
    #Third Block
    c3 = Conv2D(128,(4,4),activation='relu')(m2)
    m3 = MaxPooling2D(64,(2,2),padding='same')(c3)
    #Fourth Block
    c4 = Conv2D(256,(4,4),activation='relu')(m3)
    f1 = Flatten()(c4)
    d1 = Dense(4096,activation='sigmoid')(f1)
    
    return Model(inputs=[inp],outputs=[d1],name='embedding')

In [17]:
embedding = make_embedding()

In [18]:
embedding.summary()

Model: "embedding"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_image (InputLayer)    [(None, 100, 100, 3)]     0         
                                                                 
 conv2d (Conv2D)             (None, 91, 91, 64)        19264     
                                                                 
 max_pooling2d (MaxPooling2  (None, 46, 46, 64)        0         
 D)                                                              
                                                                 
 conv2d_1 (Conv2D)           (None, 40, 40, 128)       401536    
                                                                 
 max_pooling2d_1 (MaxPoolin  (None, 20, 20, 128)       0         
 g2D)                                                            
                                                                 
 conv2d_2 (Conv2D)           (None, 17, 17, 128)       26

In [19]:
# Siamese L1 Distance Class
class L1Dist(Layer):
    #Init Inheritance
    def __init__(self,**kwargs):
        super().__init__()
        
    #Similarity Calucaltion
    def call(self,input_embedding,validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding)

In [20]:
def make_siamese_model():
    
    # Anchor Input
    input_image = Input(name='input_img',shape=(100,100,3))
    # Validation Input
    validation_image = Input(name='validation_img',shape=(100,100,3))
    
    # Combing the siamese distance compenents
    siamese_layer = L1Dist()
    siamese_layer._name = 'distance'
    distances = siamese_layer(embedding(input_image),embedding(validation_image))
    
    # Classification Layer
    classifier = Dense(1,activation='sigmoid')(distances)
    
    return Model(inputs=[input_image,validation_image],outputs=classifier,name='SiameseNetwork')

In [21]:
siamese_model = make_siamese_model()

In [22]:
siamese_model.summary()

Model: "SiameseNetwork"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_img (InputLayer)      [(None, 100, 100, 3)]        0         []                            
                                                                                                  
 validation_img (InputLayer  [(None, 100, 100, 3)]        0         []                            
 )                                                                                                
                                                                                                  
 embedding (Functional)      (None, 4096)                 3896044   ['input_img[0][0]',           
                                                          8          'validation_img[0][0]']      
                                                                                     

In [23]:
binary_cross_loss = tf.losses.BinaryCrossentropy()

In [24]:
opt = tf.keras.optimizers.Adam(1e-4)

In [25]:
#Checkpoints
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir,'ckpt')
checkpoint = tf.train.Checkpoint(opt=opt,siamese_model=siamese_model)

In [26]:
@tf.function
def train_step(batch):
    #Recording All Operations
    with tf.GradientTape() as tape:
        #Anchor, postive, negative image
        X=batch[:2]
        #Label
        y=batch[2]
        
        #Forward Pass
        yhat = siamese_model(X,training=True)
        
        #Find Loss
        loss = binary_cross_loss(y,yhat)
    print(loss)
    
    #Calulcate gradients
    grad = tape.gradient(loss,siamese_model.trainable_variables)
    
    #Calulcate updates weights and apply to model
    opt.apply_gradients(zip(grad,siamese_model.trainable_variables))
        
    return loss

In [27]:
def train(data, EPOCHS):
    #Loop through epochs
    for epoch in range(1,EPOCHS+1):
        print('\n Epoch {}/{}'.format(epoch, EPOCHS))
        progbar = tf.keras.utils.Progbar(len(data))
    
    #Loop through batchs
    for idx, batch in enumerate(data):
        #Train Step
        train_step(batch)
        progbar.update(idx+1)
    #Save Checkpoints
    if epoch % 10 ==0:
        checkpoint.save(file_prefix=checkpoint_prefix)

In [28]:
EPOCHS = 50

In [29]:
train(train_data,EPOCHS)


 Epoch 1/50

 Epoch 2/50

 Epoch 3/50

 Epoch 4/50

 Epoch 5/50

 Epoch 6/50

 Epoch 7/50

 Epoch 8/50

 Epoch 9/50

 Epoch 10/50

 Epoch 11/50

 Epoch 12/50

 Epoch 13/50

 Epoch 14/50

 Epoch 15/50

 Epoch 16/50

 Epoch 17/50

 Epoch 18/50

 Epoch 19/50

 Epoch 20/50

 Epoch 21/50

 Epoch 22/50

 Epoch 23/50

 Epoch 24/50

 Epoch 25/50

 Epoch 26/50

 Epoch 27/50

 Epoch 28/50

 Epoch 29/50

 Epoch 30/50

 Epoch 31/50

 Epoch 32/50

 Epoch 33/50

 Epoch 34/50

 Epoch 35/50

 Epoch 36/50

 Epoch 37/50

 Epoch 38/50

 Epoch 39/50

 Epoch 40/50

 Epoch 41/50

 Epoch 42/50

 Epoch 43/50

 Epoch 44/50

 Epoch 45/50

 Epoch 46/50

 Epoch 47/50

 Epoch 48/50

 Epoch 49/50

 Epoch 50/50
Tensor("binary_crossentropy/weighted_loss/value:0", shape=(), dtype=float32)
Tensor("binary_crossentropy/weighted_loss/value:0", shape=(), dtype=float32)


In [30]:
from tensorflow.keras.metrics import Precision, Recall

In [59]:
#Get test data
test_input, test_val, y_true = test_data.as_numpy_iterator().next()

In [60]:
#Make Predictions
y_hat = siamese_model.predict([test_input,test_val])



In [61]:
y_hat

array([[9.6889752e-01],
       [7.5893313e-01],
       [8.9923579e-01],
       [6.2437022e-05],
       [3.2804685e-04],
       [2.9745263e-06],
       [9.4402158e-01],
       [9.4987637e-01],
       [2.9939509e-05],
       [9.4828510e-01],
       [9.4586074e-01],
       [9.2154354e-01],
       [5.7886116e-04],
       [7.0442044e-04],
       [1.3593493e-01],
       [8.5507852e-01]], dtype=float32)

In [62]:
#Post processing
[1 if prediction > 0.5 else 0 for prediction in y_hat]

[1, 1, 1, 0, 0, 0, 1, 1, 0, 1, 1, 1, 0, 0, 0, 1]

In [63]:
# Creating a metric object
m = Recall()
#Calculating the recall value
m.update_state(y_true,y_hat)
# Return Recall Result
m.result().numpy()

1.0

In [64]:
# Creating a metric object
m = Precision()
#Calculating the Precision value
m.update_state(y_true,y_hat)
# Return Precision Result
m.result().numpy()

1.0

In [74]:
#Save weights
siamese_model.save('siamesemodel.h5')



  saving_api.save_model(


In [76]:
#ReLoading in model
model = tf.keras.models.load_model('siamesemodel.h5',custom_objects={'L1Dist':L1Dist,'BinaryCrossentropy':tf.losses.BinaryCrossentropy})



In [77]:
model.predict([test_input,test_val])



array([[9.6889752e-01],
       [7.5893313e-01],
       [8.9923579e-01],
       [6.2437022e-05],
       [3.2804685e-04],
       [2.9745263e-06],
       [9.4402158e-01],
       [9.4987637e-01],
       [2.9939509e-05],
       [9.4828510e-01],
       [9.4586074e-01],
       [9.2154354e-01],
       [5.7886116e-04],
       [7.0442044e-04],
       [1.3593493e-01],
       [8.5507852e-01]], dtype=float32)

In [95]:
def verify(model,detection_threshold,verification_threshold):
    results = []
    
    for image in os.listdir(os.path.join('application_data','verification_images')):
        input_img = preprocess(os.path.join('application_data','input_image','input_image.jpg'))
        validation_img = preprocess(os.path.join('application_data','verification_images',image))
        
        result = model.predict(list(np.expand_dims([input_img,validation_img],axis=1)))
        results.append(result)
    detection = np.sum(np.array(results)>detection_threshold)
    verification = detection / len(os.listdir(os.path.join('application_data','verification_images')))
    verified = verification > verification_threshold
    return results, verified

In [99]:
cap = cv2.VideoCapture(0)

while cap.isOpened():
    ret,frame = cap.read()
    frame = frame[120:120+250,200:200+250, :]
    cv2.imshow('Verification',frame)
    
    if cv2.waitKey(10) & 0xFF == ord('v'):
        cv2.imwrite(os.path.join('application_data','input_image','input_image.jpg'),frame)
        results, verified = verify(model,0.5,0.5)
        print(verified)
    
    if cv2.waitKey(10) & 0xFF == ord('q'):
        break
        
cap.release()
cv2.destroyAllWindows()

False
True
