In [1]:
import cv2
import os
import random
import numpy as np
from matplotlib import pyplot as plt

from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten
from tensorflow.keras.layers import Activation, BatchNormalization, Flatten
import tensorflow.compat.v2 as tf
from tensorflow import keras

from tensorflow.keras.preprocessing.image import save_img

In [2]:
# gpus = tf.config.experimental.list_physical_devices('GPU')
# if gpus:
#     try:
#         tf.config.experimental.set_visible_devices(gpus[0], 'GPU')
#         logical_gpus = tf.config.experimental.list_logical_devices('GPU')
#         print(len(gpus), "Physical GPUs", len(logical_gpus), 'Logical GPU')
#     except RuntimeError as e:
#         print(e)

In [3]:
# # Setup paths
POS_PATH = os.path.join('data', 'positive')
NEG_PATH = os.path.join('data', 'negative')
ANC_PATH = os.path.join('data', 'anchor')

In [4]:
# Setup paths
NEW_POS_PATH = os.path.join('data4', 'positive')
NEW_NEG_PATH = os.path.join('data4', 'negative')
NEW_ANC_PATH = os.path.join('data4', 'anchor')

In [5]:
anchor = tf.data.Dataset.list_files(NEW_ANC_PATH+'/*.jpg').take(600)
positive = tf.data.Dataset.list_files(NEW_POS_PATH+'/*.jpg').take(600)
negative = tf.data.Dataset.list_files(NEW_NEG_PATH+'/*.jpg').take(600)

In [6]:
dir_test = anchor.as_numpy_iterator()

In [7]:
print(dir_test.next())

b'data4/anchor/baker_anchor2_186.jpg'


In [8]:
def preprocess(file_path):
    
    # Read in image from file path
    byte_img = tf.io.read_file(file_path)
    
    # Load in the image
    img = tf.io.decode_jpeg(byte_img)
    
    # Preprocessing step - resizing the image to be 100x100x3
    img = tf.image.resize(img, (100,100))
    
    # Scale image to be between 0 and 1
    img = img / 255.0
    
    return img

In [9]:
positives = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data = positives.concatenate(negatives)

In [10]:
def  preprocess_twin(input_img, validation_img, label):
    return (preprocess(input_img), preprocess(validation_img), label)

In [11]:
# Build dataloade pipeline
data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size=1024)

In [12]:
# Training partition
train_data = data.take(round(len(data)*.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

In [13]:
train_samples = train_data.as_numpy_iterator()

In [14]:
# Testing partition
test_data = data.skip(round(len(data)*.7))
test_Data = test_data.take(round(len(data)*.3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

In [15]:
def make_embedding():
    
    embedding = Sequential()
    
    # First block
    embedding.add(Conv2D(filters=4,
                         kernel_size=(5,5),
                         padding="same",
                         input_shape=(100,100,1)))
    embedding.add(BatchNormalization())
    embedding.add(Activation("relu"))
    embedding.add(MaxPooling2D())
    
    # Second block
    embedding.add(Conv2D(filters=4,
                         kernel_size=(5,5),
                         padding="same",
                         input_shape=(100,100,1)))
    embedding.add(BatchNormalization())
    embedding.add(Activation("relu"))
    embedding.add(MaxPooling2D())
    
    # Third block
    embedding.add(Conv2D(filters=8,
                         kernel_size=(5,5),
                         padding="same",
                         input_shape=(100,100,1)))
    embedding.add(BatchNormalization())
    embedding.add(Activation("relu"))
    embedding.add(MaxPooling2D())
    
    # Final embedding block
    embedding.add(Conv2D(filters=8,
                         kernel_size=(5,5),
                         padding="same",
                         input_shape=(100,100,1)))
    embedding.add(BatchNormalization())
    embedding.add(Flatten())
    embedding.add(Dense(64, activation='sigmoid'))
    return embedding

In [16]:
embedding = make_embedding()

In [17]:
embedding.summary()

Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d (Conv2D)              (None, 100, 100, 4)       104       
_________________________________________________________________
batch_normalization (BatchNo (None, 100, 100, 4)       16        
_________________________________________________________________
activation (Activation)      (None, 100, 100, 4)       0         
_________________________________________________________________
max_pooling2d (MaxPooling2D) (None, 50, 50, 4)         0         
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 50, 50, 4)         404       
_________________________________________________________________
batch_normalization_1 (Batch (None, 50, 50, 4)         16        
_________________________________________________________________
activation_1 (Activation)    (None, 50, 50, 4)         0

In [18]:
# Siamese L1 Distance class
class L1Dist(Layer):
    
    # Init method - inheritance
    def __inint__(self, **kwargs):
        super().__init__()
    
    # Magic happens here - similarity caculation
    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding)

In [19]:
l1 = L1Dist()

In [20]:
def make_siamese_model():
    
    # Anchor image input in the network
    input_image = Input(name='input_img', shape=(100,100,1))
    
    # Validation image in the network
    validation_image = Input(name='validation_img', shape=(100,100,1))
    
    # Combine siamese distance componets
    siamese_layer = L1Dist()
    siamese_layer._name = 'distance'
    distances = siamese_layer(embedding(input_image), embedding(validation_image))
    
    # classification layer
    classifier = Dense(1, activation='sigmoid')(distances)
    
    return Model(inputs=[input_image, validation_image], outputs=classifier, name='SiameseNetwork')

In [21]:
siamese_model = make_siamese_model()

In [22]:
siamese_model.summary()

Model: "SiameseNetwork"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_img (InputLayer)          [(None, 100, 100, 1) 0                                            
__________________________________________________________________________________________________
validation_img (InputLayer)     [(None, 100, 100, 1) 0                                            
__________________________________________________________________________________________________
sequential (Sequential)         (None, 64)           76812       input_img[0][0]                  
                                                                 validation_img[0][0]             
__________________________________________________________________________________________________
distance (L1Dist)               (None, 64)           0           sequential[0][0]    

In [23]:
binary_cross_loss = tf.losses.BinaryCrossentropy()

In [24]:
opt = tf.keras.optimizers.Adam(1e-4) # 0.0001

In [25]:
@tf.function
def train_step(batch):
    
    # Record all of our operations
    with tf.GradientTape() as tape:
        # Get anchor and positive/negative image
        X = batch[:2]
        
        # Get label
        #if batch[2] == 1:
            
        
        y = batch[2]
        
        # Forward pass
        yhat = siamese_model(X, training=True)
        
        # Calculate loss
        loss = binary_cross_loss(y, yhat)
#         loss = mse(y, yhat)
        
        
    # Calculate gradients
    grad = tape.gradient(loss, siamese_model.trainable_variables)
    
    # Calculate updated weights and apply to siamese model
    opt.apply_gradients(zip(grad, siamese_model.trainable_variables))
    
    # Return loss
    return loss

In [26]:
def test_step(batch):
    
    # Record all of our operations
    with tf.GradientTape() as tape:
        
        # Get anchor and positive/negative image
        X = batch[:2]
        
        # Get label
        y = batch[2]
        
        # Forward pass
        yhat = siamese_model(X, training=True)
        
        # Calculate loss
        loss = binary_cross_loss(y, yhat)
#         loss = mse(y, yhat)
    
    # Return loss
    return loss

In [27]:
def train(train_data, test_data, EPOCHS):
    
    train_losses = []
    test_losses = []
    epochs = []
    
    # Loop through epochs
    for epoch in range(1, EPOCHS+1):
        print('\n EPOCH {}/{}'.format(epoch, EPOCHS))
        progbar = tf.keras.utils.Progbar(len(train_data) + len(test_data))
        
        # Loop through each batch
        train_loss = 0
        test_loss = 0
        for idx, batch in enumerate(train_data):
            # Run train step here
            train_loss += train_step(batch)
            test_loss += test_step(batch)
            progbar.update(idx+1)
            
        train_loss = train_loss.numpy()
        train_loss /= len(train_data)
        test_loss = test_loss.numpy()
        test_loss /= len(test_data)
        
        
        
            
        train_losses.append(train_loss)
        test_losses.append(test_loss)
        epochs.append(epoch)
        print("\n")
        print("training loss: ", train_loss)
        print("testing loss: ", test_loss)
    
    # Save checkpoints
    if epoch % 10 == 0:
        checkpoint.save(file_prefix=checkpoint_prefix)
    
    return train_losses, test_losses, epochs

In [28]:
EPOCHS = 100

In [29]:
train_losses, test_losses, epochs = train(train_data, test_data, EPOCHS)


 EPOCH 1/100

training loss:  0.7075756720776828
testing loss:  1.6187127154806387

 EPOCH 2/100

training loss:  0.6903614548017394
testing loss:  1.5833431741465693

 EPOCH 3/100

training loss:  0.6749488542664726
testing loss:  1.5478286743164062

 EPOCH 4/100

training loss:  0.6599526315365197
testing loss:  1.5132690097974695

 EPOCH 5/100

training loss:  0.6480774789486291
testing loss:  1.4861566294794497

 EPOCH 6/100

training loss:  0.6325716702443249
testing loss:  1.4506524127462637

 EPOCH 7/100

training loss:  0.6263347121904481
testing loss:  1.435883729354195

 EPOCH 8/100

training loss:  0.6085339672160599
testing loss:  1.3950886933699898

 EPOCH 9/100

training loss:  0.6016290052881781
testing loss:  1.37915213211723

 EPOCH 10/100

training loss:  0.5869524973743366
testing loss:  1.3451536427373472

 EPOCH 11/100

training loss:  0.582090269844487
testing loss:  1.3338929881220278

 EPOCH 12/100

training loss:  0.5647320657406213
testing loss:  1.2940137282


training loss:  0.09694974827316571
testing loss:  0.2199730458466903

 EPOCH 63/100

training loss:  0.08644718494055406
testing loss:  0.19655150952546493

 EPOCH 64/100

training loss:  0.08143470872123286
testing loss:  0.18467322639797046

 EPOCH 65/100

training loss:  0.07846841272318138
testing loss:  0.17833006900289786

 EPOCH 66/100

training loss:  0.07992196532915223
testing loss:  0.18124240377674933

 EPOCH 67/100

training loss:  0.07296646765942844
testing loss:  0.1654345989227295

 EPOCH 68/100

training loss:  0.0713114378587255
testing loss:  0.16153804115627124

 EPOCH 69/100

training loss:  0.07047215047872292
testing loss:  0.15969593628593112

 EPOCH 70/100

training loss:  0.06654640863526542
testing loss:  0.15098393481710684

 EPOCH 71/100

training loss:  0.0648003119342732
testing loss:  0.14693645809007727

 EPOCH 72/100

training loss:  0.06405413825556917
testing loss:  0.14505473427150561

 EPOCH 73/100

training loss:  0.057446502289682067
testing l

NameError: name 'checkpoint' is not defined

In [None]:
train_losses

In [None]:
test_losses

In [None]:
def draw_loss(train_losses, test_losses, epochs):
    plt.plot(epochs, train_losses)
    plt.plot(epochs, test_losses)
    plt.title('model loss')
    plt.ylabel('loss')
    plt.xlabel('epoch')
    plt.legend(['train', 'test'], loc='upper left')
    plt.show()

In [None]:
draw_loss(train_losses, test_losses, epochs)

In [None]:
# Get a batch of test data
test_input, test_val, y_true = test_data.as_numpy_iterator().next()

In [None]:
y_hat = siamese_model.predict([test_input, test_val])
y_hat

In [None]:
# Post procrssing the results
[1 if prediction > 0.5 else 0 for prediction in y_hat ]

In [None]:
y_true

In [None]:
# Set plot size
plt.figure(figsize=(18,8))

# Set first subplot
plt.subplot(1,2,1)
plt.imshow(test_input[0][:,:,0],cmap='gray')

# Set second subplot
plt.subplot(1,2,2)
plt.imshow(test_val[0][:,:,0],cmap='gray')


# Render cleanly
plt.show()

In [None]:
keras_model_filename = 'siamesemodel_v15.h5'
tflite_filename = 'siamesemodel_v15_lite.tflite'
tflite_model_name = 'siamesemodel_v15_lite'
c_model_name = 'siamesemodel_v15_lite'

In [None]:
# Save weights
siamese_model.save(keras_model_filename)

In [None]:
# Reload model
model = tf.keras.models.load_model(keras_model_filename,\
                                  custom_objects={'L1Dist':L1Dist, 'MSE':tf.losses.MeanSquaredError})

In [None]:
# Make predictions with reloaded model
model.predict([test_input, test_val])

In [None]:
# View model summary
model.summary()

In [None]:
from tensorflow import lite

In [None]:
import pathlib
model = tf.keras.models.load_model(keras_model_filename,\
                                  custom_objects={'L1Dist':L1Dist, 'MSE':tf.losses.MeanSquaredError})
converter = lite.TFLiteConverter.from_keras_model(model)
converted_model = converter.convert()

generated_dir = pathlib.Path("generated/")
generated_dir.mkdir(exist_ok=True, parents=True)
converted_model_file = generated_dir/tflite_model_name

In [None]:
converted_model_file.write_bytes(converted_model)