In [2]:
# Import standard dependencies
import cv2
import os
import random
import numpy as np
from matplotlib import pyplot as plt

In [3]:
# Import tensorflow dependencies - Functional API
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten
import tensorflow as tf

In [4]:
tf.__version__

'2.8.2'

In [5]:
# Avoid OOM errors by setting GPU Memory Consumption Growth
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus: 
    tf.config.experimental.set_memory_growth(gpu, True)

In [6]:
# Setup paths
POS_PATH = os.path.join('data', 'positive')
NEG_PATH = os.path.join('data', 'negative')
ANC_PATH = os.path.join('data', 'anchor')

In [7]:
# Make the directories
os.makedirs(POS_PATH)
os.makedirs(NEG_PATH)
os.makedirs(ANC_PATH)

In [8]:
#http://vis-www.cs.umass.edu/lfw/
!wget http://vis-www.cs.umass.edu/lfw/lfw.tgz

--2022-07-17 12:08:44--  http://vis-www.cs.umass.edu/lfw/lfw.tgz
Resolving vis-www.cs.umass.edu (vis-www.cs.umass.edu)... 128.119.244.95
Connecting to vis-www.cs.umass.edu (vis-www.cs.umass.edu)|128.119.244.95|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 180566744 (172M) [application/x-gzip]
Saving to: ‘lfw.tgz’


2022-07-17 12:08:48 (52.2 MB/s) - ‘lfw.tgz’ saved [180566744/180566744]



In [9]:
# Uncompress Tar GZ Labelled Faces in the Wild Dataset
!tar -xf lfw.tgz

In [10]:
print(os.listdir('lfw')[0:10])
print(len(os.listdir('lfw')))

['George_Bovell', 'Laura_Elena_Harring', 'Robert_Torricelli', 'Victoria_Beckham', 'Lin_Yi-fu', 'Hugo_Colace', 'Tomoko_Hagiwara', 'Alicia_Silverstone', 'Christian_Patino', 'Gabriella_Bo']
5749


In [11]:
# Move LFW Images to the following repository data/negative
for directory in os.listdir('lfw'):
    for file in os.listdir(os.path.join('lfw', directory)):
        EX_PATH = os.path.join('lfw', directory, file)
        NEW_PATH = os.path.join(NEG_PATH, file)
        os.replace(EX_PATH, NEW_PATH)

In [12]:
#negative path images
print(os.listdir(NEG_PATH)[0:10])
print(len(os.listdir(NEG_PATH)))

['Bill_Clinton_0018.jpg', 'Richard_Gere_0008.jpg', 'Tony_Blair_0042.jpg', 'Gwendal_Peizerat_0002.jpg', 'Astrid_Betancourt_0001.jpg', 'Jo_Dee_Messina_0002.jpg', 'Andrew_Luster_0001.jpg', 'Kim_Hong-up_0001.jpg', 'Luis_Figo_0002.jpg', 'John_Rosa_0002.jpg']
13233


In [13]:
#collect positive and anchor images
#anchor - reference image to compute loss with positive and negative images
# upload mine 1 sample image to colab from local and unzip and copy all to anchor and positive folder path
!unzip anchor.zip
!mv anchor/*.jpg data/anchor/
!unzip positive.zip
!mv positive/*.jpg data/positive/

Archive:  anchor.zip
   creating: anchor/
  inflating: anchor/f76799b4-05c4-11ed-a3ed-a86daadaf6c3.jpg  
Archive:  positive.zip
   creating: positive/
  inflating: positive/6a1c4a98-05c5-11ed-ae61-a86daadaf6c3.jpg  


In [14]:
#data augumentation
def data_aug(img):
    data = []
    for i in range(9):
        img = tf.image.stateless_random_brightness(img, max_delta=0.02, seed=(1,2))
        img = tf.image.stateless_random_contrast(img, lower=0.6, upper=1, seed=(1,3))
        img = tf.image.stateless_random_flip_left_right(img, seed=(np.random.randint(100),np.random.randint(100)))
        img = tf.image.stateless_random_jpeg_quality(img, min_jpeg_quality=90, max_jpeg_quality=100, seed=(np.random.randint(100),np.random.randint(100)))
        img = tf.image.stateless_random_saturation(img, lower=0.9,upper=1, seed=(np.random.randint(100),np.random.randint(100)))
        data.append(img)
    return data

In [15]:
#augument anchor image
import uuid
img_path = os.path.join(ANC_PATH, 'f76799b4-05c4-11ed-a3ed-a86daadaf6c3.jpg')
img = cv2.imread(img_path)
augmented_images = data_aug(img)
for image in augmented_images:
    cv2.imwrite(os.path.join(ANC_PATH, '{}.jpg'.format(uuid.uuid1())), image.numpy())

In [16]:
#augument positive image
import uuid
img_path = os.path.join(POS_PATH, '6a1c4a98-05c5-11ed-ae61-a86daadaf6c3.jpg')
img = cv2.imread(img_path)
augmented_images = data_aug(img) 
for image in augmented_images:
    cv2.imwrite(os.path.join(POS_PATH, '{}.jpg'.format(uuid.uuid1())), image.numpy())

In [17]:
#get all image directories
anchor = tf.data.Dataset.list_files(ANC_PATH+'/*.jpg').take(3000)
positive = tf.data.Dataset.list_files(POS_PATH+'/*.jpg').take(3000)
negative = tf.data.Dataset.list_files(NEG_PATH+'/*.jpg').take(3000)

In [18]:
#pre process image - resize to 100X100 and scale pixels between 0 and 1
def preprocess(file_path):
    # Read in image from file path
    byte_img = tf.io.read_file(file_path)
    # Load in the image 
    img = tf.io.decode_jpeg(byte_img)
    # Preprocessing steps - resizing the image to be 100x100x3
    img = tf.image.resize(img, (100,100))
    # Scale image to be between 0 and 1 
    img = img / 255.0
    # Return image
    return img

In [19]:
#create labelled datasets
# (anchor, positive) => 1,1,1,1,1
# (anchor, negative) => 0,0,0,0,0
positives = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data = positives.concatenate(negatives)

In [20]:
#pre process both input and validation image
def preprocess_twin(input_img, validation_img, label):
  return(preprocess(input_img), preprocess(validation_img), label)

In [21]:
# Build dataloader pipeline
data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size=10000)

In [22]:
# Training partition
train_data = data.take(round(len(data)*.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

In [23]:
# Testing partition
test_data = data.skip(round(len(data)*.7))
test_data = test_data.take(round(len(data)*.3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

In [25]:
#model engineering
#build embedding layer
def make_embedding(): 
    inp = Input(shape=(100,100,3), name='input_image')

    # First block
    c1 = Conv2D(64, (10,10), activation='relu')(inp)
    m1 = MaxPooling2D(64, (2,2), padding='same')(c1)
    
    # Second block
    c2 = Conv2D(128, (7,7), activation='relu')(m1)
    m2 = MaxPooling2D(64, (2,2), padding='same')(c2)
    
    # Third block 
    c3 = Conv2D(128, (4,4), activation='relu')(m2)
    m3 = MaxPooling2D(64, (2,2), padding='same')(c3)
    
    # Final embedding block
    c4 = Conv2D(256, (4,4), activation='relu')(m3)
    f1 = Flatten()(c4)
    d1 = Dense(4096, activation='sigmoid')(f1)
    
    return Model(inputs=[inp], outputs=[d1], name='embedding')

In [26]:
embedding = make_embedding()
embedding.summary()

Model: "embedding"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_image (InputLayer)    [(None, 100, 100, 3)]     0         
                                                                 
 conv2d_4 (Conv2D)           (None, 91, 91, 64)        19264     
                                                                 
 max_pooling2d_3 (MaxPooling  (None, 46, 46, 64)       0         
 2D)                                                             
                                                                 
 conv2d_5 (Conv2D)           (None, 40, 40, 128)       401536    
                                                                 
 max_pooling2d_4 (MaxPooling  (None, 20, 20, 128)      0         
 2D)                                                             
                                                                 
 conv2d_6 (Conv2D)           (None, 17, 17, 128)       26

In [27]:
#build distance layer
# Siamese L1 Distance class
class L1Dist(Layer):
    
    # Init method - inheritance
    def __init__(self, **kwargs):
        super().__init__()
       
    # Magic happens here - similarity calculation
    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding)

In [28]:
#make siamese model
def make_siamese_model(): 
    # Anchor image input in the network
    input_image = Input(name='input_img', shape=(100,100,3))
    
    # Validation image in the network 
    validation_image = Input(name='validation_img', shape=(100,100,3))
    
    # Combine siamese distance components
    siamese_layer = L1Dist()
    siamese_layer._name = 'distance'
    distances = siamese_layer(embedding(input_image), embedding(validation_image))
    
    # Classification layer 
    classifier = Dense(1, activation='sigmoid')(distances)
    
    return Model(inputs=[input_image, validation_image], outputs=classifier, name='SiameseNetwork')

In [29]:
siamese_model = make_siamese_model()
siamese_model.summary()

Model: "SiameseNetwork"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_img (InputLayer)         [(None, 100, 100, 3  0           []                               
                                )]                                                                
                                                                                                  
 validation_img (InputLayer)    [(None, 100, 100, 3  0           []                               
                                )]                                                                
                                                                                                  
 embedding (Functional)         (None, 4096)         38960448    ['input_img[0][0]',              
                                                                  'validation_img[0][

In [30]:
#Training
#setup loss and optimizer
binary_cross_loss = tf.losses.BinaryCrossentropy()
opt = tf.keras.optimizers.Adam(1e-4) # 0.0001
#trained model checkpoints
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(opt=opt, siamese_model=siamese_model)

In [32]:
#build train step function
@tf.function
def train_step(batch):

    # Record all of our operations 
    with tf.GradientTape() as tape:     
        # Get anchor and positive/negative image
        X = batch[:2]
        # Get label
        y = batch[2]
        
        # Forward pass
        yhat = siamese_model(X, training=True)
        # Calculate loss
        loss = binary_cross_loss(y, yhat)
    print(loss)
        
    # Calculate gradients
    grad = tape.gradient(loss, siamese_model.trainable_variables)
    
    # Calculate updated weights and apply to siamese model
    opt.apply_gradients(zip(grad, siamese_model.trainable_variables))
        
    # Return loss
    return loss

In [35]:
# Import metric calculations
from tensorflow.keras.metrics import Precision, Recall
#build train loop function
def train(data, EPOCHS):
    # Loop through epochs
    for epoch in range(1, EPOCHS+1):
        print('\n Epoch {}/{}'.format(epoch, EPOCHS))
        progbar = tf.keras.utils.Progbar(len(data))
        
        # Creating a metric object 
        r = Recall()
        p = Precision()
        
        # Loop through each batch
        for idx, batch in enumerate(data):
            # Run train step here
            loss = train_step(batch)
            yhat = siamese_model.predict(batch[:2])
            r.update_state(batch[2], yhat)
            p.update_state(batch[2], yhat) 
            progbar.update(idx+1)
        print('Loss - {}, Recall - {}, Precision - {}'.format(str(loss.numpy()), str(r.result().numpy()), str(p.result().numpy())))
        
        # Save checkpoints
        if epoch % 10 == 0: 
            checkpoint.save(file_prefix=checkpoint_prefix)

In [36]:
EPOCHS = 50
train(train_data, EPOCHS)


 Epoch 1/50
Tensor("binary_crossentropy/weighted_loss/value:0", shape=(), dtype=float32)
Tensor("binary_crossentropy/weighted_loss/value:0", shape=(), dtype=float32)
Loss - 0.69374573, Recall - 0.75, Precision - 1.0

 Epoch 2/50
Loss - 0.68362194, Recall - 0.71428573, Precision - 1.0

 Epoch 3/50
Loss - 0.64396065, Recall - 0.2, Precision - 1.0

 Epoch 4/50
Loss - 0.6092652, Recall - 0.0, Precision - 0.0

 Epoch 5/50
Loss - 0.61767465, Recall - 0.0, Precision - 0.0

 Epoch 6/50
Loss - 0.5445085, Recall - 0.2857143, Precision - 1.0

 Epoch 7/50
Loss - 0.51659715, Recall - 0.85714287, Precision - 1.0

 Epoch 8/50
Loss - 0.49806216, Recall - 0.71428573, Precision - 1.0

 Epoch 9/50
Loss - 0.42736298, Recall - 0.71428573, Precision - 1.0

 Epoch 10/50
Loss - 0.39221615, Recall - 1.0, Precision - 1.0

 Epoch 11/50
Loss - 0.33996895, Recall - 1.0, Precision - 1.0

 Epoch 12/50
Loss - 0.2667987, Recall - 1.0, Precision - 1.0

 Epoch 13/50
Loss - 0.28417057, Recall - 1.0, Precision - 1.0

 Ep

In [40]:
#Evaluate model
# Get a batch of test data
r = Recall()
p = Precision()
for test_input, test_val, y_true in test_data.as_numpy_iterator():
    yhat = siamese_model.predict([test_input, test_val])
    r.update_state(y_true, yhat)
    p.update_state(y_true,yhat) 
print('Recall - {}, Precision - {}'.format(str(r.result().numpy()), str(p.result().numpy())))

Recall - 1.0, Precision - 1.0


In [41]:
#save model
#save trained weights
siamese_model.save('siamesemodel.h5')



In [42]:
# Reload model 
siamese_model = tf.keras.models.load_model('siamesemodel.h5', custom_objects={'L1Dist':L1Dist, 'BinaryCrossentropy':tf.losses.BinaryCrossentropy})



In [44]:
# View model summary
siamese_model.summary()

Model: "SiameseNetwork"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_img (InputLayer)         [(None, 100, 100, 3  0           []                               
                                )]                                                                
                                                                                                  
 validation_img (InputLayer)    [(None, 100, 100, 3  0           []                               
                                )]                                                                
                                                                                                  
 embedding (Functional)         (None, 4096)         38960448    ['input_img[0][0]',              
                                                                  'validation_img[0][

In [43]:
r = Recall()
p = Precision()
for test_input, test_val, y_true in test_data.as_numpy_iterator():
    yhat = siamese_model.predict([test_input, test_val])
    r.update_state(y_true, yhat)
    p.update_state(y_true,yhat) 
print('Recall - {}, Precision - {}'.format(str(r.result().numpy()), str(p.result().numpy())))

Recall - 1.0, Precision - 1.0


In [45]:
#create unseen image path via input webcam
UNSEEN_PATH = os.path.join('unseen_images')
os.makedirs(UNSEEN_PATH)

In [56]:
#create verification image path - reference image to register to database
#there are two verification images in folder to take as reference images
VERIFICATION_PATH = os.path.join('verification')
os.makedirs(VERIFICATION_PATH)

In [64]:
#verification test function on unseen mine images
def verify(model, test_image, detection_threshold, verification_threshold):
    # Build results array
    results = []
    for image in os.listdir(os.path.join('verification')):
        input_img = preprocess(os.path.join('unseen_images', test_image))
        validation_img = preprocess(os.path.join('verification', image))
        
        # Make Predictions 
        result = model.predict(list(np.expand_dims([input_img, validation_img], axis=1)))
        results.append(result)
    
    # Detection Threshold: Metric above which a prediciton is considered positive 
    detection = np.sum(np.array(results) > detection_threshold)
    
    # Verification Threshold: Proportion of positive predictions / total positive samples 
    verification = detection / len(os.listdir(os.path.join('verification'))) 
    verified = verification > verification_threshold
    
    return results, verified

In [65]:
#detection threshold - 0.99
#verification threshold - 0.7
results, verified = verify(siamese_model, '285096f6-05ce-11ed-b881-a86daadaf6c3.jpg', 0.5, 0.5)
print(results)
print(verified)

[array([[0.7249249]], dtype=float32), array([[0.6156386]], dtype=float32)]
True
