In [None]:
import cv2
import os
import uuid
import random
import numpy as np
import tensorflow as tf
from matplotlib import pyplot as plt

import warnings
warnings.filterwarnings('ignore')

from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D,Input, InputLayer, Flatten,ReLU

In [49]:
POS_PATH = os.path.join('Data','positive')
NEG_PATH = os.path.join('Data','negative')
ANK_PATH = os.path.join('Data','anchor')
DATAA = os.path.join('Verify','anchor')
os.makedirs(DATAA)

In [None]:
os.makedirs(POS_PATH)
os.makedirs(NEG_PATH)
os.makedirs(ANK_PATH)

In [None]:
!tar -xf lfw.tgz

In [None]:
for directory in os.listdir('lfw'):
    for file in os.listdir(os.path.join('lfw', directory)):
        EX_PATH = os.path.join('lfw', directory, file)
        NEW_PATH = os.path.join(NEG_PATH, file)
        os.replace(EX_PATH, NEW_PATH)

## Data Augmentation and Processing 

In [None]:
def data_aug(img):
    data = []
    for i in range(9):
        img = tf.image.stateless_random_brightness(img, max_delta=0.02, seed=(1,2))
        img = tf.image.stateless_random_contrast(img, lower=0.6, upper=1, seed=(1,3))
        img = tf.image.stateless_random_flip_left_right(img, seed=(np.random.randint(100),np.random.randint(100)))
        img = tf.image.stateless_random_jpeg_quality(img, min_jpeg_quality=90, max_jpeg_quality=100, seed=(np.random.randint(100),np.random.randint(100)))
        img = tf.image.stateless_random_saturation(img, lower=0.9,upper=1, seed=(np.random.randint(100),np.random.randint(100)))
            
        data.append(img)
    
    return data

In [None]:
for file_name in os.listdir(os.path.join(POS_PATH)):
    img_path = os.path.join(POS_PATH, file_name)
    img = cv2.imread(img_path)
    augmented_images = data_aug(img) 
    
    for image in augmented_images:
        cv2.imwrite(os.path.join(POS_PATH, '{}.jpg'.format(uuid.uuid1())), image.numpy())

### Data Processing

In [3]:
anchor = tf.data.Dataset.list_files(ANK_PATH+'/*.jpg').take(40)
positive = tf.data.Dataset.list_files(POS_PATH+'/*.jpg').take(40)
negative = tf.data.Dataset.list_files(NEG_PATH+'/*.jpg').take(40)

In [4]:
def preprocess(file_path):

    byte_img = tf.io.read_file(file_path)
    img = tf.io.decode_jpeg(byte_img)
    
    img = tf.image.resize(img, (100,100))
    img = img / 255.0
    return img

### Labelling and Building Dataset 

In [5]:
positives = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data = positives.concatenate(negatives)

In [6]:
def preprocess_twin(input_img, validation_img, label):
    return (preprocess(input_img), preprocess(validation_img), label)

In [7]:
# Build dataloader pipeline
data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size=32)

In [8]:
# Training partition
train_data = data.take(round(len(data)*.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

In [9]:
# Testing partition
test_data = data.skip(round(len(data)*.7))
test_data = test_data.take(round(len(data)*.3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

# Model

In [13]:
def make_embedding(): 
    inp = Input(shape=(100,100,3), name='input_image')
    
    # First block
    c1 = Conv2D(64, (10,10), activation='relu')(inp)
    m1 = MaxPooling2D(64, (2,2), padding='same')(c1)
    
    # Second block
    c2 = Conv2D(128, (7,7), activation='relu')(m1)
    m2 = MaxPooling2D(64, (2,2), padding='same')(c2)
    
    # Third block 
    c3 = Conv2D(128, (4,4), activation='relu')(m2)
    m3 = MaxPooling2D(64, (2,2), padding='same')(c3)
    
    # Final embedding block
    c4 = Conv2D(256, (4,4), activation='relu')(m3)
    f1 = Flatten()(c4)
    d1 = Dense(4096, activation='sigmoid')(f1)
    
    
    return Model(inputs=[inp], outputs=[d1], name='embedding')

embedding = make_embedding()
embedding.summary()

Model: "embedding"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_image (InputLayer)    [(None, 100, 100, 3)]     0         
                                                                 
 conv2d_8 (Conv2D)           (None, 91, 91, 64)        19264     
                                                                 
 max_pooling2d_6 (MaxPoolin  (None, 46, 46, 64)        0         
 g2D)                                                            
                                                                 
 conv2d_9 (Conv2D)           (None, 40, 40, 128)       401536    
                                                                 
 max_pooling2d_7 (MaxPoolin  (None, 20, 20, 128)       0         
 g2D)                                                            
                                                                 
 conv2d_10 (Conv2D)          (None, 17, 17, 128)       26

In [11]:
model = tf.keras.Sequential([
    Conv2D(64, (10,10), input_shape=(100,100,3)),
    ReLU(),
    MaxPooling2D(64, (2,2), padding= 'same'),
    
    Conv2D(128, (7,7), activation='relu'),
    MaxPooling2D(64, (2,2), padding= 'same'),
    
    Conv2D(128, (4,4), activation='relu'),
    MaxPooling2D(64, (2,2), padding= 'same'),
    
    Conv2D(256, (4,4), activation='relu'),

    Flatten(),
    Dense(4096, activation='sigmoid')
])

In [12]:
model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d_4 (Conv2D)           (None, 91, 91, 64)        19264     
                                                                 
 re_lu (ReLU)                (None, 91, 91, 64)        0         
                                                                 
 max_pooling2d_3 (MaxPoolin  (None, 46, 46, 64)        0         
 g2D)                                                            
                                                                 
 conv2d_5 (Conv2D)           (None, 40, 40, 128)       401536    
                                                                 
 max_pooling2d_4 (MaxPoolin  (None, 20, 20, 128)       0         
 g2D)                                                            
                                                                 
 conv2d_6 (Conv2D)           (None, 17, 17, 128)       2

### Building Distance Layer

In [14]:
# Siamese L1 Distance class
class L1Dist(Layer):
    
    # Init method - inheritance
    def __init__(self, **kwargs):
        super().__init__()
       
    # Magic happens here - similarity calculation
    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding)

In [15]:
def create_siamese_model(): 

    input_image = Input(name='input_img', shape=(100,100,3))
    validation_image = Input(name='validation_img', shape=(100,100,3))
    
    # Combine siamese distance components
    siamese_layer = L1Dist()
    distances = siamese_layer(model(input_image), model(validation_image))

    classifier = Dense(1, activation='sigmoid')(distances)
    return Model(inputs=[input_image, validation_image], outputs=classifier, name='SiameseNetwork')


siamese_model = create_siamese_model()
siamese_model.summary()      

Model: "SiameseNetwork"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_img (InputLayer)      [(None, 100, 100, 3)]        0         []                            
                                                                                                  
 validation_img (InputLayer  [(None, 100, 100, 3)]        0         []                            
 )                                                                                                
                                                                                                  
 sequential (Sequential)     (None, 4096)                 3896044   ['input_img[0][0]',           
                                                          8          'validation_img[0][0]']      
                                                                                     

### Lose Function and Optimizer

In [None]:
def contrastive_loss(y_true, y_pred):
    margin = 1.0
    return tf.reduce_mean(y_true * tf.square(y_pred) + (1 - y_true) * tf.square(tf.maximum(margin - y_pred, 0)))

In [16]:
opt = tf.keras.optimizers.Adam(1e-4) # 0.0001
binary_cross_loss = tf.losses.BinaryCrossentropy()

In [None]:
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(opt=opt, siamese_model=siamese_model)

## Training

In [20]:
from tensorflow.keras.metrics import Precision, Recall

In [21]:
@tf.function
def train_step(batch):
    
    # Record all of our operations 
    with tf.GradientTape() as tape:     
        # Get anchor and positive/negative image
        X = batch[:2]
        # Get label
        y = batch[2]
        
        # Forward pass
        yhat = siamese_model(X, training=True)
        # Calculate loss
        loss = binary_cross_loss(y, yhat)
    print(loss)
        
    # Calculate gradients
    grad = tape.gradient(loss, siamese_model.trainable_variables)
    
    # Calculate updated weights and apply to siamese model
    opt.apply_gradients(zip(grad, siamese_model.trainable_variables))
        
    # Return loss
    return loss

In [22]:
def train(data, EPOCHS):
    # Loop through epochs
    for epoch in range(1, EPOCHS+1):
        print('\n Epoch {}/{}'.format(epoch, EPOCHS))
        progbar = tf.keras.utils.Progbar(len(data))
        
        # Creating a metric object 
        r = Recall()
        p = Precision()
        
        # Loop through each batch
        for idx, batch in enumerate(data):
            # Run train step here
            loss = train_step(batch)
            yhat = siamese_model.predict(batch[:2])
            r.update_state(batch[2], yhat)
            p.update_state(batch[2], yhat) 
            progbar.update(idx+1)
        print(loss.numpy(), r.result().numpy(), p.result().numpy())
        
        # Save checkpoints
#         if epoch % 10 == 0: 
#             checkpoint.save(file_prefix=checkpoint_prefix)

In [23]:
EPOCHS = 30
train(train_data, EPOCHS)


 Epoch 1/30
Tensor("binary_crossentropy/weighted_loss/value:0", shape=(), dtype=float32)
Tensor("binary_crossentropy/weighted_loss/value:0", shape=(), dtype=float32)
0.6442569 0.8235294 0.9655172

 Epoch 2/30
0.42210364 0.36363637 1.0

 Epoch 3/30
0.1487707 0.6666667 1.0

 Epoch 4/30
0.13823482 0.8787879 1.0

 Epoch 5/30
0.12232461 0.9705882 1.0

 Epoch 6/30
0.07149318 0.96875 1.0

 Epoch 7/30
0.028026674 0.96875 1.0

 Epoch 8/30
0.005515421 1.0 1.0

 Epoch 9/30
0.017484877 1.0 1.0

 Epoch 10/30
0.0011126385 1.0 1.0

 Epoch 11/30
0.00091572653 1.0 1.0

 Epoch 12/30
0.0011620036 1.0 1.0

 Epoch 13/30
0.00052363245 1.0 1.0

 Epoch 14/30
0.0005386697 1.0 1.0

 Epoch 15/30
0.00038373625 1.0 1.0

 Epoch 16/30
6.276795e-05 1.0 1.0

 Epoch 17/30
3.407308e-05 1.0 1.0

 Epoch 18/30
0.00029921677 1.0 1.0

 Epoch 19/30
8.288782e-05 1.0 1.0

 Epoch 20/30
0.00022403376 1.0 1.0

 Epoch 21/30
8.746784e-05 1.0 1.0

 Epoch 22/30
9.509879e-05 1.0 1.0

 Epoch 23/30
5.306329e-05 1.0 1.0

 Epoch 24/30
0.0

7.115359e-06 1.0 1.0

 Epoch 28/30
8.1627186e-05 1.0 1.0

 Epoch 29/30
4.4182184e-06 1.0 1.0

 Epoch 30/30
1.1451664e-05 1.0 1.0


In [24]:
# Get a batch of test data
test_input, test_val, y_true = test_data.as_numpy_iterator().next()
y_hat = siamese_model.predict([test_input, test_val])



In [25]:
# Post processing the results 
[1 if prediction > 0.5 else 0 for prediction in y_hat ]

[0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0]

In [26]:
y_true

array([0., 0., 0., 1., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0.],
      dtype=float32)

In [28]:
# # Set plot size 
# plt.figure(figsize=(10,8))

# # Set first subplot
# plt.subplot(1,2,1)
# plt.imshow(test_input[15])

# # Set second subplot
# plt.subplot(1,2,2)
# plt.imshow(test_val[15])

# # Renders cleanly
# plt.show()

In [29]:
# Save weights
siamese_model.save('siamesemodelv2.h5')



In [30]:
siamese_model = tf.keras.models.load_model('siamesemodelv2.h5', 
                                   custom_objects={'L1Dist':L1Dist, 'BinaryCrossentropy':tf.losses.BinaryCrossentropy})



In [31]:
# Make predictions with reloaded model
siamese_model.predict([test_input, test_val])



array([[2.3906430e-06],
       [7.4311705e-05],
       [1.2784359e-06],
       [9.9999887e-01],
       [4.8807005e-06],
       [2.7299848e-05],
       [1.7597622e-05],
       [9.9992698e-01],
       [4.4199401e-06],
       [6.8365607e-06],
       [5.7085558e-06],
       [1.1040313e-06],
       [8.5390587e-07],
       [4.3014502e-06],
       [1.7243241e-06],
       [4.7852198e-05]], dtype=float32)

In [36]:
from mtcnn.mtcnn import MTCNN
from PIL import Image

D = MTCNN()
def f_getFace(I):
    r=D.detect_faces(I)
    x1,y1,w,h=r[0]['box']
    x2,y2=x1+w,y1+h
    face=I[y1:y2,x1:x2]
    return Image.fromarray(face).resize((250,250))

In [56]:
def verify(model, detection_threshold, verification_threshold):
    # Build results array
    results = []
    for image in os.listdir(os.path.join('Application_Data', 'verification_imgs')):
        input_img = preprocess(os.path.join('Application_Data', 'input_imgs', 'input_image.jpg'))
        validation_img = preprocess(os.path.join('Application_Data', 'verification_imgs', image))
        
        # Make Predictions 
        result = model.predict(list(np.expand_dims([input_img, validation_img], axis=1)))
        results.append(result)
    
    # Detection Threshold: Metric above which a prediciton is considered positive 
    detection = np.sum(np.array(results) > detection_threshold)
    
    # Verification Threshold: Proportion of positive predictions / total positive samples 
    verification = detection / len(os.listdir(os.path.join('Application_Data', 'verification_imgs'))) 
    verified = verification > verification_threshold
    
    return results, verified

In [63]:
cap = cv2.VideoCapture(0)
while cap.isOpened():
    ret, frame = cap.read()
    
    
    cv2.imshow('Verification', frame)
    
    # Verification trigger
    if cv2.waitKey(10) & 0xFF == ord('v'):
        # Save input image to application_data/input_image folder 
#         hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
#         h, s, v = cv2.split(hsv)

#         lim = 255 - 10
#         v[v > lim] = 255
#         v[v <= lim] -= 10
        
#         final_hsv = cv2.merge((h, s, v))
#         img = cv2.cvtColor(final_hsv, cv2.COLOR_HSV2BGR)

        cv2.imwrite('positive.jpg', frame)
        P1=Image.open(r'positive.jpg')

        f1 = f_getFace(np.asarray(P1))
        imgname= os.path.join('Application_Data', 'input_imgs', 'input_image.jpg')
        imgS = cv2.cvtColor(np.asarray(f1), cv2.COLOR_BGR2RGB)
        cv2.imwrite(imgname, imgS)
        
        # Run verification
        results, verified = verify(siamese_model, 0.9, 0.8)
        print(verified)
    
    if cv2.waitKey(10) & 0xFF == ord('q'):
        break
cap.relVVease()
cv2.destroyAllWindows()

False
False
False
True
True
False
True
True
True
True
False
True


False
False
False
False
False
False
False
False
False
False


AttributeError: 'cv2.VideoCapture' object has no attribute 'relVVease'

In [62]:
np.sum(np.squeeze(results) > 0.9)

1

In [64]:
cap.release()
cv2.destroyAllWindows()