# 1. Import dependencies

In [2]:
#import standard dependencies
import numpy as np
import os
from matplotlib import pyplot as plt
import cv2
import random

In [3]:
#import tensorflow dependencies - Functiona API
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten
import tensorflow as tf

# 2 Set GPU Growth

In [3]:
# Avoid OOM errors by setting GPU Memory Cosumption Growth
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)

# 3. Create folder structure

In [5]:
# Setup paths
POS_PATH = os.path.join('data', 'positive')
NEG_PATH = os.path.join('data', 'negative')
ANC_PATH = os.path.join('data', 'anchor')

In [8]:
#make directories
os.makedirs(POS_PATH)
os.makedirs(NEG_PATH)
os.makedirs(ANC_PATH)

# 3.1 Untar Labelled Face in the Wild Datasets 

In [13]:
!tar -xf lfw.tgz

In [5]:
# Move LFW Images to the following repository data/negative
for directory in os.listdir('lfw'):
    for file in os.listdir(os.path.join('lfw', directory)):
        EX_PATH = os.path.join('lfw', directory, file)
        NEW_PATH = os.path.join(NEG_PATH, file)
        os.replace(EX_PATH, NEW_PATH)

FileNotFoundError: [WinError 3] El sistema no puede encontrar la ruta especificada: 'lfw'

# 3.2 Collect Positive and Anchor Classes

In [16]:
import uuid

In [21]:
# Establish a connection with the webcam
cap = cv2.VideoCapture(0)
while cap.isOpened():
    ret, frame = cap.read()
    
    # cut down frame to 250x250px
    frame = frame[80:80+250, 200:200+250, :]
    
    # anchor images
    if cv2.waitKey(1) == ord('a'):
        # create an unique identifier name
        image_name = os.path.join(ANC_PATH, '{}.jpg'.format(uuid.uuid1()))
        # Save image in anchor folder
        cv2.imwrite(image_name, frame)
    
    # positive images
    if cv2.waitKey(1) == ord('p'):
        # create an unique identifier name
        image_name = os.path.join(POS_PATH, '{}.jpg'.format(uuid.uuid1()))
        # Save image in positive folder
        cv2.imwrite(image_name, frame)
    
    # Show image back to screen
    cv2.imshow("Image Collection", frame)
    
    if cv2.waitKey(1) & 0XFF == ord('q'):
        break
# Release webcam
cap.release()
# Close the image show frame
cv2.destroyAllWindows()

# 4. Load and preprocess images

In [6]:
anchor = tf.data.Dataset.list_files(ANC_PATH+'\*.jpg').take(300)
positive = tf.data.Dataset.list_files(POS_PATH+'\*.jpg').take(300)
negative = tf.data.Dataset.list_files(NEG_PATH+'\*.jpg').take(300)

# 4.1 Prepropecing - Scale and Resize

In [10]:
def preprocess(file_path):
    
    # Read in image from file path
    byte_img = tf.io.read_file(file_path)
    # Load in image
    img = tf.io.decode_jpeg(byte_img)
    # Preprocess resize (100,100,3)
    img = tf.image.resize(img, (100,100))
    # Normalize image 0-1
    img = img/255.0
    return img

# 4.2 Create labelled Datasets

In [8]:
positives = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data = positives.concatenate(negatives)

# 4.3 Build train and test partition

In [9]:
def preprocess_twin(input_img, validation_img, label):
    return (preprocess(input_img), preprocess(validation_img), label)

In [10]:
# Create data loader pipeline
data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size=1024)

In [11]:
# Train partition
train_data = data.take(round(len(data)*.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

In [12]:
# Test partition
test_data = data.skip(round(len(data)*.7))
test_data = test_data.take(round(len(data)*.3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

# 5. Model Engineering

# 5.1 Build Embeding Layer

In [17]:
def make_embedding():
    #input shape
    inp = Input(shape=(100,100,3), name='input_image')
    
    # block one
    c1 = Conv2D(64, (10,10), activation='relu')(inp)
    m1 = MaxPooling2D(64, (2,2), padding='same')(c1)
    
    # block two
    c2 = Conv2D(128, (7,7), activation='relu')(m1)
    m2 = MaxPooling2D(64, (2,2), padding='same')(c2)
    
    # block three
    c3 = Conv2D(256, (4,4), activation='relu')(m2)
    m3 = MaxPooling2D(64, (2,2), padding='same')(c3)
    
    # final block
    c4 = Conv2D(256, (4,4), activation='relu')(m3)
    f1 = Flatten()(c4)
    d1 = Dense(4096, activation='sigmoid')(f1)
    
    return Model(inputs=[inp], outputs=[d1], name='embedding')

In [18]:
embedding = make_embedding()

In [19]:
embedding.summary()

Model: "embedding"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_image (InputLayer)     [(None, 100, 100, 3)]     0         
_________________________________________________________________
conv2d (Conv2D)              (None, 91, 91, 64)        19264     
_________________________________________________________________
max_pooling2d (MaxPooling2D) (None, 46, 46, 64)        0         
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 40, 40, 128)       401536    
_________________________________________________________________
max_pooling2d_1 (MaxPooling2 (None, 20, 20, 128)       0         
_________________________________________________________________
conv2d_2 (Conv2D)            (None, 17, 17, 256)       524544    
_________________________________________________________________
max_pooling2d_2 (MaxPooling2 (None, 9, 9, 256)         0 

# 5.2 Build Distance Layer

In [4]:
# Siamese distance class

class L1Dist(Layer):
    
    # Init method - inheritance
    def __init__(self, **kwargs):
        super().__init__()
        
    # Similarity calculation
    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding)

# 5.3 Make siamesse model

In [21]:
def make_siamese_model():
    
    # anchor image input in the network
    input_image = Input(name='input_image', shape=(100,100,3))
    
    # validation image in the network
    validation_image = Input(name='validation_image', shape=(100,100,3))
    
    # Combine siamese neural distace components
    siamese_layer = L1Dist()
    siamese_layer._name = 'distance'
    distances = siamese_layer(embedding(input_image), embedding(validation_image))
    
    classifier = Dense(1, activation='sigmoid')(distances)
    
    return Model(inputs=[input_image, validation_image], outputs=[classifier], name='SiameseNetwork')

In [22]:
model = make_siamese_model()

In [23]:
model.summary()

Model: "SiameseNetwork"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_image (InputLayer)        [(None, 100, 100, 3) 0                                            
__________________________________________________________________________________________________
validation_image (InputLayer)   [(None, 100, 100, 3) 0                                            
__________________________________________________________________________________________________
embedding (Functional)          (None, 4096)         39747008    input_image[0][0]                
                                                                 validation_image[0][0]           
__________________________________________________________________________________________________
distance (L1Dist)               (None, 4096)         0           embedding[0][0]     

# 6. Train the model

# 6.1 Define loss and optimizer functions

In [24]:
binary_cross_loss = tf.losses.BinaryCrossentropy()
opt = tf.keras.optimizers.Adam(1e-4)

In [22]:
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(opt=opt, siamese_model=model)

# 6.2 Build Train Step Function

In [23]:
@tf.function
def train_step(batch):
    
    #Record all operations
    with tf.GradientTape() as tape:
        #Get anchor postive/negative image
        X = batch[:2]
        #Get labels
        y = batch[2]
        
        #Fordward pass
        yhat = model(X, training=True)
        #Calculate the loss
        loss = binary_cross_loss(y, yhat)
    
    #Calculate gradients
    grad = tape.gradient(loss, model.trainable_variables)
    
    #Update weights and apply to siamese model
    opt.apply_gradients(zip(grad, model.trainable_variables))
    
    return loss

# 6.3 Build Training Loop 

In [27]:
def train(data, EPOCHS):
    
    #Loop through the epochs    
    #Calculate gradients
    grad = tape.gradient(loss, model.trainable_variables)
    

    for epoch in range(1, EPOCHS+1):
        print('\n Epoch {}/{}'.format(epoch, EPOCHS))
        prog_bar = tf.keras.utils.Progbar(len(data))
        
        #Loop through each batch
        for idx, batch in enumerate(data):
            # Run train step here
            train_step(batch)
            prog_bar.update(idx+1)
            
        #Save checkpoint
        if epoch%10 == 0:
            checkpoint.save(file_prefix=checkpoint_prefix)

# 6.4 Train Model

In [28]:
EPOCH = 50

In [29]:
train(train_data, EPOCH)


 Epoch 1/50

 Epoch 2/50

 Epoch 3/50

 Epoch 4/50

 Epoch 5/50

 Epoch 6/50

 Epoch 7/50

 Epoch 8/50

 Epoch 9/50

 Epoch 10/50

 Epoch 11/50

 Epoch 12/50

 Epoch 13/50

 Epoch 14/50

 Epoch 15/50

 Epoch 16/50

 Epoch 17/50

 Epoch 18/50

 Epoch 19/50

 Epoch 20/50

 Epoch 21/50

 Epoch 22/50

 Epoch 23/50

 Epoch 24/50

 Epoch 25/50

 Epoch 26/50

 Epoch 27/50

 Epoch 28/50

 Epoch 29/50

 Epoch 30/50

 Epoch 31/50

 Epoch 32/50

 Epoch 33/50

 Epoch 34/50

 Epoch 35/50

 Epoch 36/50

 Epoch 37/50

 Epoch 38/50

 Epoch 39/50

 Epoch 40/50

 Epoch 41/50

 Epoch 42/50

 Epoch 43/50

 Epoch 44/50

 Epoch 45/50

 Epoch 46/50

 Epoch 47/50

 Epoch 48/50

 Epoch 49/50

 Epoch 50/50


In [25]:
ckpt = tf.compat.v2.train.Checkpoint(model=model)
ckpt.restore(os.path.join('D:\\Documents\\facial_recognition\\training_checkpoints\\', 'ckpt-5')).expect_partial()

<tensorflow.python.training.tracking.util.CheckpointLoadStatus at 0x1678a47f580>

In [26]:
model.summary()

Model: "SiameseNetwork"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_image (InputLayer)        [(None, 100, 100, 3) 0                                            
__________________________________________________________________________________________________
validation_image (InputLayer)   [(None, 100, 100, 3) 0                                            
__________________________________________________________________________________________________
embedding (Functional)          (None, 4096)         39747008    input_image[0][0]                
                                                                 validation_image[0][0]           
__________________________________________________________________________________________________
distance (L1Dist)               (None, 4096)         0           embedding[0][0]     

# 7 Evaluate model

# 7.1 Import metrics

In [27]:
# import metrics calculation
from tensorflow.keras.metrics import Precision, Recall

# 7.2 Get the batch of the test data

In [35]:
test_input, test_val, y_true = test_data.as_numpy_iterator().next()

# 7.3 Make predictions

In [36]:
y_hat = model.predict([test_input, test_val])

In [37]:
y_hat, y_true

(array([[0.4995697 ],
        [0.49777186],
        [0.49940133],
        [0.49957183],
        [0.49959293],
        [0.49984792],
        [0.49966237],
        [0.4994852 ],
        [0.49721944],
        [0.49922544],
        [0.49621564],
        [0.49922416],
        [0.49799207],
        [0.49773997],
        [0.4975668 ],
        [0.49931395]], dtype=float32),
 array([1., 0., 1., 1., 1., 1., 1., 1., 0., 1., 0., 1., 0., 0., 0., 1.],
       dtype=float32))

In [31]:
[1 if prediction > 0.5 else 0 for prediction in y_hat]

[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

In [32]:
y_true

array([1., 0., 0., 0., 0., 0., 0., 1., 1., 1., 1., 1., 1., 1., 1., 0.],
      dtype=float32)

# 7.4 Create a metric  object

In [38]:
m = Recall()
m.update_state(y_true, y_hat)
m.result().numpy()

0.0

In [39]:
m = Precision()
m.update_state(y_true, y_hat)
m.result().numpy()

0.0

# 7.5 Vizualisation image

In [1]:
plt.figure(figsize=(12,8))

# Test image
plt.subplot(1,2,1)
plt.imshow(test_input[0])

# Validation image
plt.subplot(1,2,2)
plt.imshow(test_val[0])

# Show image
plt.show()

NameError: name 'plt' is not defined



# 8 Save model

In [45]:
model.save('siamese_model.h5')

In [7]:
model = tf.keras.models.load_model('siamese_model.h5', custom_objects={'L1Dist':L1Dist, 'BinaryCrossentropy':tf.losses.BinaryCrossentropy})



# 9 Real time detection

# 9.1 Verfication function

In [13]:
def verify(model, detection_threshold, verification_threshold):
    # Build result arrays
    results = []
    for image in os.listdir(os.path.join('application_data', 'verification_images')):
        input_img = preprocess(os.path.join('application_data', 'input_image', 'input_image.jpg'))
        validation_img = preprocess(os.path.join('application_data', 'verification_images', image))
        
        # Make predictions
        result = model.predict(list(np.expand_dims([input_img, validation_img], axis=1)))
        results.append(result)
        
    # Detection threshold: Metric above which a prediction is considered positive
    detection = np.sum(np.array(results) > detection_threshold)
    verification = detection / len(os.listdir(os.path.join('application_data', 'verification_images')))
    verified = verification > verification_threshold
    
    return results, verified

# 9.2 Real time detection

In [None]:
cap = cv2.VideoCapture(0)
while cap.isOpened():
    ret, frame = cap.read()
    frame = frame[80:80+250, 200:200+250, :]
    
    cv2.imshow('Verification_image', frame)
    
    # Verificatio trigger
    if cv2.waitKey(10) & 0xFF == ord('v'):
        # Save input image
        cv2.imwrite(os.path.join('application_data', 'input_image', 'input_image.jpg'), frame)
        # Run verification
        results, verified = verify(model, 0.5, 0.5)
        print(verified)
    
    # Closse application
    if cv2.waitKey(10) & 0xFF == ord('q'):
        break
        
cap.release()
cv2.destroyAllWindows()

True
True
