In [1]:
# Import standard dependencies
import cv2
import os
import random
import numpy as np
from matplotlib import pyplot as plt
from glob import *



In [2]:
# Import tensorflow dependencies - Functional API
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten
import tensorflow as tf

In [3]:
# Setup paths
POS_PATH = os.path.join('data', 'positive')
NEG_PATH = os.path.join('data', 'negative')
ANC_PATH = os.path.join('data', 'anchor')

In [None]:
# Make the directories
os.makedirs(POS_PATH)
os.makedirs(NEG_PATH)
os.makedirs(ANC_PATH)

In [5]:
import os
import uuid


In [None]:
# This code helps to make data of positive images in POS_PATH directory using webcam
cap = cv2.VideoCapture(1)
while cap.isOpened(): 
    ret, frame = cap.read()
    #print(frame)
   
    # Cut down frame to 250x250px
    #frame = frame[250:250+250,250:100+100, :]
    #print(frame)
    imgname = os.path.join(POS_PATH, '{}.jpg'.format(uuid.uuid1()))
    print(imgname)
    cv2.imwrite(imgname, frame)
# Release the webcam
cap.release()
# Close the image show frame
cv2.destroyAllWindows()


In [None]:
# This code helps to make data of anchor images in ANC_PATH directory using webcam


# Establish a connection to the webcam
cap = cv2.VideoCapture(1)
while cap.isOpened(): 
    ret, frame = cap.read()
   
    # Cut down frame to 250x250px
    frame = frame[250:250+250,250:100+100, :]
    
    # Collect anchors 
    if cv2.waitKey(1) & 0XFF == ord('a'):
        # Create the unique file path 
        imgname = os.path.join(ANC_PATH, '{}.jpg'.format(uuid.uuid1()))
        # Write out anchor image
        cv2.imwrite(imgname, frame)
    
    # Collect positives
    if cv2.waitKey(1) & 0XFF == ord('p'):
        # Create the unique file path 
        imgname = os.path.join(POS_PATH, '{}.jpg'.format(uuid.uuid1()))
        # Write out positive image
        cv2.imwrite(imgname, frame)
    
    # Show image back to screen
    cv2.imshow('Image Collection', frame)
    
    # Breaking gracefully
    if cv2.waitKey(1) & 0XFF == ord('q'):
        breaka
        
# Release the webcam
cap.release()
# Close the image show frame
cv2.destroyAllWindows()

In [44]:
anchor = tf.data.Dataset.list_files(ANC_PATH+'\*.jpg').take(30)
positive = tf.data.Dataset.list_files(POS_PATH+'\*.jpg').take(30)
negative = tf.data.Dataset.list_files(NEG_PATH+'\*.jpg').take(30)

In [7]:
dir_test = anchor.as_numpy_iterator()

In [8]:
def preprocess(file_path):
    
    # Read in image from file path
    byte_img = tf.io.read_file(file_path)
    # Load in the image 
    img = tf.io.decode_jpeg(byte_img)
    
    # Preprocessing steps - resizing the image to be 100x100x3
    img = tf.image.resize(img, (100,100))
    # Scale image to be between 0 and 1 
    img = img / 255.0

    # Return image
    return img

In [45]:
positives = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(30))))
negatives = tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(30))))
data = positives.concatenate(negatives)

In [46]:
tf.data.Dataset.from_tensor_slices(tf.ones(30))

<TensorSliceDataset shapes: (), types: tf.float32>

In [11]:
def preprocess_twin(input_img, validation_img, label):
    return(preprocess(input_img), preprocess(validation_img), label)

In [47]:
# Build dataloader pipeline
data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size=10000)

In [48]:
# Training partition
train_data = data.take(round(60*.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

In [49]:
# Testing partition
test_data = data.skip(round(60*.7))
test_data = test_data.take(round(60*.3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

In [50]:
# Testing partition
test_data = data.skip(round(60*.7))
test_data = test_data.take(round(60*.3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

# Building modle layers

In [51]:
inp = Input(shape=(100,100,3), name='input_image')

In [52]:
c1 = Conv2D(64, (10,10), activation='relu')(inp)

In [53]:
m1 = MaxPooling2D(64, (2,2), padding='same')(c1)

In [54]:
c2 = Conv2D(128, (7,7), activation='relu')(m1)
m2 = MaxPooling2D(64, (2,2), padding='same')(c2)

In [55]:
c3 = Conv2D(128, (4,4), activation='relu')(m2)
m3 = MaxPooling2D(64, (2,2), padding='same')(c3)

In [56]:
c4 = Conv2D(256, (4,4), activation='relu')(m3)
f1 = Flatten()(c4)
d1 = Dense(4096, activation='sigmoid')(f1)

In [57]:
mod = Model(inputs=[inp], outputs=[d1], name='embedding')

In [58]:
mod.summary()

Model: "embedding"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_image (InputLayer)     [(None, 100, 100, 3)]     0         
_________________________________________________________________
conv2d_8 (Conv2D)            (None, 91, 91, 64)        19264     
_________________________________________________________________
max_pooling2d_6 (MaxPooling2 (None, 46, 46, 64)        0         
_________________________________________________________________
conv2d_9 (Conv2D)            (None, 40, 40, 128)       401536    
_________________________________________________________________
max_pooling2d_7 (MaxPooling2 (None, 20, 20, 128)       0         
_________________________________________________________________
conv2d_10 (Conv2D)           (None, 17, 17, 128)       262272    
_________________________________________________________________
max_pooling2d_8 (MaxPooling2 (None, 9, 9, 128)         0 

In [59]:
def make_embedding(): 
    inp = Input(shape=(100,100,3), name='input_image')
    
    # First block
    c1 = Conv2D(64, (10,10), activation='relu')(inp)
    m1 = MaxPooling2D(64, (2,2), padding='same')(c1)
    
    # Second block
    c2 = Conv2D(128, (7,7), activation='relu')(m1)
    m2 = MaxPooling2D(64, (2,2), padding='same')(c2)
    
    # Third block 
    c3 = Conv2D(128, (4,4), activation='relu')(m2)
    m3 = MaxPooling2D(64, (2,2), padding='same')(c3)
    
    # Final embedding block
    c4 = Conv2D(256, (4,4), activation='relu')(m3)
    f1 = Flatten()(c4)
    d1 = Dense(4096, activation='sigmoid')(f1)
    
    
    return Model(inputs=[inp], outputs=[d1], name='embedding')

In [60]:
embedding = make_embedding()

# Distance layer

In [61]:
# Siamese L1 Distance class
class L1Dist(Layer):
    
    # Init method - inheritance
    def __init__(self, **kwargs):
        super().__init__()
       
    # Magic happens here - similarity calculation
    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding)

In [62]:
input_image = Input(name='input_img', shape=(100,100,3))
validation_image = Input(name='validation_img', shape=(100,100,3))

In [63]:
inp_embedding = embedding(input_image)
val_embedding = embedding(validation_image)

In [64]:
siamese_layer = L1Dist()

In [65]:
distances = siamese_layer(inp_embedding, val_embedding)

In [66]:
classifier = Dense(1, activation='sigmoid')(distances)

# Siamese layer

In [67]:
siamese_network = Model(inputs=[input_image, validation_image], outputs=classifier, name='SiameseNetwork')

In [68]:
def make_siamese_model(): 
    
    # Anchor image input in the network
    input_image = Input(name='input_img', shape=(100,100,3))
    
    # Validation image in the network 
    validation_image = Input(name='validation_img', shape=(100,100,3))
    
    # Combine siamese distance components
    siamese_layer = L1Dist()
    siamese_layer._name = 'distance'
    distances = siamese_layer(embedding(input_image), embedding(validation_image))
    
    # Classification layer 
    classifier = Dense(1, activation='sigmoid')(distances)
    
    return Model(inputs=[input_image, validation_image], outputs=classifier, name='SiameseNetwork')

In [69]:
siamese_model = make_siamese_model()

# Taining

In [70]:
binary_cross_loss = tf.losses.BinaryCrossentropy()

In [71]:
opt = tf.keras.optimizers.Adam(1e-4) # 0.0001

In [72]:
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(opt=opt, siamese_model=siamese_model)

In [73]:
@tf.function
def train_step(batch):
    
    # Record all of our operations 
    with tf.GradientTape() as tape:     
        # Get anchor and positive/negative image
        X = batch[:2]
        # Get label
        y = batch[2]
        y=tf.expand_dims(y,-1)
        
        # Forward pass
        yhat = siamese_model(X, training=True)
        # Calculate loss
        loss = binary_cross_loss(y, yhat)
    print(loss)
        
    # Calculate gradients
    grad = tape.gradient(loss, siamese_model.trainable_variables)
    
    # Calculate updated weights and apply to siamese model
    opt.apply_gradients(zip(grad, siamese_model.trainable_variables))
        
    # Return loss
    return loss

In [74]:
# Import metric calculations
from tensorflow.keras.metrics import Precision, Recall

In [76]:
def train(data, EPOCHS):
    # Loop through epochs
    for epoch in range(1, EPOCHS+1):
        print('\n Epoch {}/{}'.format(epoch, EPOCHS))
        progbar = tf.keras.utils.Progbar(60)
        
        # Creating a metric object 
        r = Recall()
        p = Precision()
        
        # Loop through each batch
        for idx, batch in enumerate(data):
            # Run train step here
            loss = train_step(batch)
            yhat = siamese_model.predict(batch[:2])
            r.update_state(tf.expand_dims(batch[2],-1), yhat)
            p.update_state(tf.expand_dims(batch[2],-1), yhat) 
            progbar.update(idx+1)
        print(loss.numpy(), r.result().numpy(), p.result().numpy())
        
        # Save checkpoints
        if epoch % 10 == 0: 
            checkpoint.save(file_prefix=checkpoint_prefix)

In [78]:
EPOCHS = 30

In [79]:
train(train_data, EPOCHS)


 Epoch 1/30
Tensor("binary_crossentropy/weighted_loss/value:0", shape=(), dtype=float32)
Tensor("binary_crossentropy/weighted_loss/value:0", shape=(), dtype=float32)
 2/60 [>.............................] - ETA: 24:19Tensor("binary_crossentropy/weighted_loss/value:0", shape=(), dtype=float32)
 3/60 [>.............................] - ETA: 19:460.61079466 0.083333336 1.0

 Epoch 2/30
 3/60 [>.............................] - ETA: 19:270.3736075 0.05 1.0

 Epoch 3/30
 3/60 [>.............................] - ETA: 19:270.24309388 0.0 0.0

 Epoch 4/30
 3/60 [>.............................] - ETA: 19:300.3598296 0.25 1.0

 Epoch 5/30
 3/60 [>.............................] - ETA: 19:370.28186998 0.94736844 1.0

 Epoch 6/30
 3/60 [>.............................] - ETA: 19:240.26245728 0.7619048 1.0

 Epoch 7/30
 3/60 [>.............................] - ETA: 19:240.32036504 0.8947368 1.0

 Epoch 8/30
 3/60 [>.............................] - ETA: 19:270.13725081 0.9444444 1.0

 Epoch 9/30
 3/60 [>

In [89]:
# Save weights
siamese_model.save('siamesemodelv2.h5')