In [15]:
import tensorflow as tf
from tensorflow.keras.models import load_model
from tensorflow.keras.layers import Layer
import os

In [16]:
# Setup paths
POS_PATH = os.path.join('data', 'positive')
NEG_PATH = os.path.join('data', 'negative')
ANC_PATH = os.path.join('data', 'anchor')

In [17]:
anchor = tf.data.Dataset.list_files(ANC_PATH+'/*.jpg').take(300)
positive = tf.data.Dataset.list_files(POS_PATH+'/*.jpg').take(300)
negative = tf.data.Dataset.list_files(NEG_PATH+'/*.jpg').take(300)

In [34]:
# Preprocess function
def preprocess(file_path):
    # Read in image from file path
    byte_img = tf.io.read_file(file_path)
    # Load in the image 
    img = tf.io.decode_jpeg(byte_img)
    # Preprocessing steps - resizing the image to be 100x100x3
    img = tf.image.resize(img, (100, 100))
    # Scale image to be between 0 and 1 
    img = img / 255.0
    # Return image
    return img

In [43]:
# Preprocess twin function
def preprocess_twin(input_img_path, validation_img_path, label):
    return (preprocess(input_img_path), preprocess(validation_img_path), label)

In [44]:
# Create datasets
positives = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data = positives.concatenate(negatives)

In [45]:
# Build dataloader pipeline
data = data.map(preprocess_twin)


In [46]:
# Cache and shuffle the data
data = data.cache()
data = data.shuffle(buffer_size=10000)

In [85]:
# Split into training and testing partitions
train_data = data.take(round(len(data) * 0.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

In [48]:
# Debug: Check the shape of a batch
for batch in train_data.take(1):
    anchor_batch, validation_batch, label_batch = batch
    print(f"Anchor batch shape: {anchor_batch.shape}")  # Expected: (16, 100, 100, 3)
    print(f"Validation batch shape: {validation_batch.shape}")  # Expected: (16, 100, 100, 3)
    print(f"Label batch shape: {label_batch.shape}")  # Expected: (16,)

Anchor batch shape: (16, 100, 100, 3)
Validation batch shape: (16, 100, 100, 3)
Label batch shape: (16,)


In [49]:
class L1Dist(Layer):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def call(self, input_embedding, validation_embedding):
        # Convert inputs to tensors if they are not already
        input_embedding = tf.convert_to_tensor(input_embedding)
        validation_embedding = tf.convert_to_tensor(validation_embedding)
        
        # Calculate the absolute difference
        return tf.math.abs(input_embedding - validation_embedding)

In [50]:
# Load the saved model
siamese_model = load_model('siamesemodel.h5', custom_objects={'L1Dist': L1Dist})



In [64]:
# Recompile the model (use the same optimizer and loss as before)

opt = tf.keras.optimizers.Adam(1e-4)  # Use the same learning rate as before
siamese_model.compile(optimizer=opt, loss=binary_cross_loss)

In [72]:
loss_function = tf.keras.losses.BinaryCrossentropy()

In [93]:

@tf.function
def train_step(batch):
    anchor, validation, label = batch

    # Ensure inputs have batch dimensions
    anchor = tf.expand_dims(anchor, axis=0) if len(anchor.shape) == 3 else anchor
    validation = tf.expand_dims(validation, axis=0) if len(validation.shape) == 3 else validation
    label = tf.reshape(label, (-1, 1))  # Ensure label shape is (batch_size, 1)

    with tf.GradientTape() as tape:
        yhat = siamese_model([anchor, validation], training=True)  # Forward pass

        # Ensure yhat has the correct shape
        yhat = tf.reshape(yhat, (-1, 1))

        # Compute loss
        loss = loss_function(label, yhat)  

    return loss



In [90]:
for batch in train_data.take(1):
    anchor, validation, label = batch
    yhat = siamese_model([anchor, validation], training=False)

    print(f"Label shape: {label.shape}")  # Expected: (batch_size,)
    print(f"yhat shape before reshape: {yhat.shape}")  # Expected: (batch_size, 1) or (batch_size,)

    # Apply reshaping
    label = tf.reshape(label, (-1, 1))
    yhat = tf.reshape(yhat, (-1, 1))

    print(f"Label shape after reshape: {label.shape}")  # Expected: (batch_size, 1)
    print(f"yhat shape after reshape: {yhat.shape}")  # Expected: (batch_size, 1)


Label shape: (16,)
yhat shape before reshape: (1, 16, 1)
Label shape after reshape: (16, 1)
yhat shape after reshape: (16, 1)


In [91]:
# Define your train function
def train(data, EPOCHS):
    for epoch in range(1, EPOCHS + 1):
        print('\n Epoch {}/{}'.format(epoch, EPOCHS))
        progbar = tf.keras.utils.Progbar(len(data))
        
        for idx, batch in enumerate(data):
            train_step(batch)
            progbar.update(idx + 1)
        
        # Save checkpoints (optional)
        if epoch % 10 == 0:
            siamese_model.save(f'siamesemodel_epoch_{epoch + 50}.h5')  # Adjust the filename as needed

In [94]:
# Resume training for 20 more epochs
additional_epochs = 20
train(data, additional_epochs)


 Epoch 1/20
[1m600/600[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m267s[0m 443ms/step

 Epoch 2/20
[1m600/600[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m246s[0m 409ms/step

 Epoch 3/20
[1m600/600[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m237s[0m 394ms/step

 Epoch 4/20
[1m600/600[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m285s[0m 474ms/step

 Epoch 5/20
[1m600/600[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m254s[0m 423ms/step

 Epoch 6/20
[1m600/600[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m250s[0m 416ms/step

 Epoch 7/20
[1m600/600[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m248s[0m 414ms/step

 Epoch 8/20
[1m600/600[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m247s[0m 412ms/step

 Epoch 9/20
[1m600/600[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m249s[0m 416ms/step

 Epoch 10/20
[1m600/600[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m250s[0m 417ms/step





 Epoch 11/20
[1m600/600[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m244s[0m 407ms/step

 Epoch 12/20
[1m600/600[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m242s[0m 404ms/step

 Epoch 13/20
[1m600/600[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m246s[0m 410ms/step

 Epoch 14/20
[1m600/600[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m242s[0m 403ms/step

 Epoch 15/20
[1m600/600[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m245s[0m 408ms/step

 Epoch 16/20
[1m600/600[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m280s[0m 466ms/step

 Epoch 17/20
[1m600/600[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m282s[0m 470ms/step

 Epoch 18/20
[1m600/600[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m274s[0m 457ms/step

 Epoch 19/20
[1m600/600[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m274s[0m 456ms/step

 Epoch 20/20
[1m600/600[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m274s[0m 457ms/step




In [95]:
siamese_model.save('siamesemodel_final.h5')



In [96]:
siamese_model = load_model('siamesemodel_final.h5', custom_objects={'L1Dist': L1Dist})

