In [1]:
# Import standard dependencies
import cv2
import os
import numpy as np
from matplotlib import pyplot as plt
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten
from tensorflow.keras.metrics import Precision, Recall
import uuid
import math

In [2]:
def preprocess(file_path):
    
    # Read in image from file path
    byte_img = tf.io.read_file(file_path)
    # Load in the image 
    img = tf.io.decode_jpeg(byte_img)
    
    # Preprocessing steps - resizing the image to be 100x100x3
    img = tf.image.resize(img, (48,48))
    # Scale image to be between 0 and 1 
    img = img / 255.0

    # Return image
    return img

In [3]:
def preprocess_twin(input_img, validation_img, label):
    return(preprocess(input_img), preprocess(validation_img), label)

In [4]:
def make_embedding(): 
    inp = Input(shape=(48,48,3), name='input_image')
    
    # First block
    c1 = Conv2D(16, (10,10), activation='relu')(inp)
    m1 = MaxPooling2D(64, (2,2), padding='same')(c1)
    
    # Second block
    c2 = Conv2D(32, (7,7), activation='relu')(m1)
    m2 = MaxPooling2D(64, (2,2), padding='same')(c2)
    
    # Final embedding block
    c3 = Conv2D(64, (4,4), activation='relu')(m2)
    f1 = Flatten()(c3)
    d1 = Dense(4096, activation='sigmoid')(f1)

    return Model(inputs=[inp], outputs=[d1], name='embedding')

In [None]:
# Siamese L1 Distance class
class L1Dist(Layer):
    
    # Init method - inheritance
    def __init__(self, **kwargs):
        super().__init__()
       
    # Magic happens here - similarity calculation
    def call(self, input_embedding, validation_embedding):
        #return tf.math.abs(input_embedding - validation_embedding)
        temp = []
        for inp_tensor, val_tensor in zip(input_embedding, validation_embedding):
            # Perform element-wise subtraction
            sub_result = inp_tensor - val_tensor
            # Append the result to the output list
            temp.append(sub_result)
        return temp

In [None]:
def make_siamese_model(): 
    
    # Anchor image input in the network
    input_image = Input(name='input_img', shape=(48,48,3))
    
    # Validation image in the network 
    validation_image = Input(name='validation_img', shape=(48,48,3))
    
    # Combine siamese distance components
    siamese_layer = L1Dist()
    siamese_layer._name = 'distance'
    distances = siamese_layer(embedding(input_image), embedding(validation_image))
    
    # Classification layer
    classifier = Dense(1, activation='sigmoid')(distances[0])

    return Model(inputs=[input_image, validation_image], outputs=classifier, name='SiameseNetwork')

In [None]:
@tf.function
def train_step(batch):
    
    # Record all of our operations 
    with tf.GradientTape() as tape:     
        # Get anchor and positive/negative image
        X = batch[:2]
        # Get label
        y = batch[2]
        
        # Forward pass
        yhat = siamese_model(X, training=True)
        # Calculate loss
        loss = binary_cross_loss(y, yhat)
    print(loss)
        
    # Calculate gradients
    grad = tape.gradient(loss, siamese_model.trainable_variables)
    
    # Calculate updated weights and apply to siamese model
    opt.apply_gradients(zip(grad, siamese_model.trainable_variables))
        
    # Return loss
    return loss

In [None]:
def train(data, EPOCHS):
    # Loop through epochs
    for epoch in range(1, EPOCHS+1):
        print('\n Epoch {}/{}'.format(epoch, EPOCHS))
        progbar = tf.keras.utils.Progbar(len(data))
        
        # Creating a metric object 
        r = Recall()
        p = Precision()
        
        # Loop through each batch
        for idx, batch in enumerate(data):
            # Run train step here
            loss = train_step(batch)
            yhat = siamese_model.predict(batch[:2])
            r.update_state(batch[2], yhat)
            p.update_state(batch[2], yhat) 
            progbar.update(idx+1)
        print(loss.numpy(), r.result().numpy(), p.result().numpy())

In [None]:
def verify(model):
    results = []
    for image in os.listdir(os.path.join('application', 'verification_image')):
        input_img = preprocess(os.path.join('application', 'input_image', 'input_image.jpg'))
        validation_img = preprocess(os.path.join('application', 'verification_image', image))
        
        result = 100*model.predict(list(np.expand_dims([input_img, validation_img], axis=1)))
        results.append(result)
    
    detection = np.sum(results)
    verification = detection / (len(os.listdir(os.path.join('application', 'verification_image')))) 
    
    return results, verification

In [None]:
# Setup paths
POS_PATH = os.path.join('data', 'positive')
NEG_PATH = os.path.join('data', 'negative')
ANC_PATH = os.path.join('data', 'anchor')

In [None]:
# Establish a connection to the webcam
cap = cv2.VideoCapture(0)
while cap.isOpened(): 
    ret, frame = cap.read()
   
    # Cut down frame to 250x250px
    frame = frame[120:120+250,200:200+250, :]
    
    # Collect anchors 
    if cv2.waitKey(1) & 0XFF == ord('a'):
        # Create the unique file path 
        imgname = os.path.join(ANC_PATH, '{}.jpg'.format(uuid.uuid1()))
        # Write out anchor image
        cv2.imwrite(imgname, frame)
    
    # Collect positives
    if cv2.waitKey(1) & 0XFF == ord('p'):
        # Create the unique file path 
        imgname = os.path.join(POS_PATH, '{}.jpg'.format(uuid.uuid1()))
        # Write out positive image
        cv2.imwrite(imgname, frame)

    # Collect negatives
    if cv2.waitKey(1) & 0XFF == ord('n'):
        # Create the unique file path 
        imgname = os.path.join(NEG_PATH, '{}.jpg'.format(uuid.uuid1()))
        # Write out negative image
        cv2.imwrite(imgname, frame)

    if cv2.waitKey(1) & 0xFF == ord('v'):
        cv2.imwrite(os.path.join('application','input_image','input_image.jpg'),frame)
        results, verification = verify(siamese_model)
        print(np.squeeze(results))
        print(verification)

    if cv2.waitKey(1) & 0xFF == ord('x'):
        results, verification = verify(siamese_model)
        print(np.squeeze(np.squeeze(results)))
        plt.hist(np.squeeze(np.squeeze(results)), bins=30, color='skyblue', edgecolor='black')
        print(verification)
        plt.show()
    
    # Show image back to screen
    cv2.imshow('Image Collection', frame)
    
    # Breaking gracefully
    if cv2.waitKey(1) & 0XFF == ord('q'):
        break
        
# Release the webcam
cap.release()
# Close the image show frame
cv2.destroyAllWindows()

In [None]:
anchor = tf.data.Dataset.list_files(ANC_PATH+'\*.jpg').take(300)
positive = tf.data.Dataset.list_files(POS_PATH+'\*.jpg').take(300)
negative = tf.data.Dataset.list_files(NEG_PATH+'\*.jpg').take(300)

In [None]:
positives = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data = positives.concatenate(negatives)

In [None]:
# Build dataloader pipeline
data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size=1024)

# Training partition
train_data = data.take(round(len(data)*.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

# Testing partition
test_data = data.skip(round(len(data)*.7))
test_data = test_data.take(round(len(data)*.3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

In [None]:
embedding = make_embedding()

embedding.summary()

In [None]:
siamese_model = make_siamese_model()

siamese_model.summary()

In [None]:
binary_cross_loss = tf.losses.BinaryCrossentropy()
opt = tf.keras.optimizers.Adam(1e-4) # 0.0001

In [None]:
EPOCHS = 50

train(train_data, EPOCHS)

In [None]:
test_input, test_val, y_true = test_data.as_numpy_iterator().next()
y_hat = siamese_model.predict([test_input, test_val])

# Post processing the results 
[1 if prediction > 0.5 else 0 for prediction in y_hat ]

In [None]:
y_hat

In [None]:
r = Recall()
p = Precision()

for test_input, test_val, y_true in test_data.as_numpy_iterator():
    print(test_input)
    yhat = siamese_model.predict([test_input, test_val])
    r.update_state(y_true, yhat)
    p.update_state(y_true, yhat) 

print(r.result().numpy(), p.result().numpy())

In [None]:
yhat

In [None]:
# Set plot size 
plt.figure(figsize=(10,8))

plt.subplot(2,4,1)
plt.imshow(test_input[0])
plt.subplot(2,4,2)
plt.imshow(test_input[1])
plt.subplot(2,4,3)
plt.imshow(test_input[2])
plt.subplot(2,4,4)
plt.imshow(test_input[3])

plt.subplot(2,4,5)
plt.imshow(test_val[0])
plt.subplot(2,4,6)
plt.imshow(test_val[1])
plt.subplot(2,4,7)
plt.imshow(test_val[2])
plt.subplot(2,4,8)
plt.imshow(test_val[3])

# Renders cleanly
plt.show()

In [None]:
siamese_model.save('siamesemodel_s48_c3_m2.keras')

siamese_model.summary()

In [None]:
# Reload model 
siamese_model = tf.keras.models.load_model('siamesemodel_s48_c3_m2.keras', 
                                   custom_objects={'L1Dist':L1Dist, 'BinaryCrossentropy':tf.losses.BinaryCrossentropy})

In [None]:
# Make predictions with reloaded model
siamese_model.predict([test_input, test_val])

siamese_model.summary()

x = siamese_model.get_weights()

x[11]

#np.savetxt('weight.txt',x[10])