# Image similarity estimation using a Siamese Network with a triplet loss

## Setup

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import os
import random
import tensorflow as tf
from tensorflow.keras import models, layers, losses, optimizers, metrics, callbacks

## Load the dataset

We are going to load the *Totally Looks Like* dataset and unzip it inside the `~/.keras` directory
in the local environment.

The dataset consists of two separate files:

* `left.zip` contains the images that we will use as the anchor.
* `right.zip` contains the images that we will use as the positive sample (an image that looks like the anchor).

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!unzip "/content/drive/MyDrive/ISIA Lab/Dataset/ISIA.zip"
!unzip "/content/drive/MyDrive/ISIA Lab/Dataset/CLICK.zip"
!unzip "/content/drive/MyDrive/ISIA Lab/Dataset/robot.zip"

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
  inflating: robot/flows/wrong/left/1183.jpg  
  inflating: robot/flows/wrong/left/1184.jpg  
  inflating: robot/flows/wrong/left/1185.jpg  
  inflating: robot/flows/wrong/left/1186.jpg  
  inflating: robot/flows/wrong/left/1187.jpg  
  inflating: robot/flows/wrong/left/1188.jpg  
  inflating: robot/flows/wrong/left/1189.jpg  
  inflating: robot/flows/wrong/left/119.jpg  
  inflating: robot/flows/wrong/left/1190.jpg  
  inflating: robot/flows/wrong/left/1191.jpg  
  inflating: robot/flows/wrong/left/1192.jpg  
  inflating: robot/flows/wrong/left/1193.jpg  
  inflating: robot/flows/wrong/left/1194.jpg  
  inflating: robot/flows/wrong/left/1195.jpg  
  inflating: robot/flows/wrong/left/1196.jpg  
  inflating: robot/flows/wrong/left/1197.jpg  
  inflating: robot/flows/wrong/left/1198.jpg  
  inflating: robot/flows/wrong/left/1199.jpg  
  inflating: robot/flows/wrong/left/12.jpg  
  inflating: robot/flows/wrong/left/120.jpg  

## Preparing the data

We are going to use a `tf.data` pipeline to load the data and generate the triplets that we
need to train the Siamese network.

We'll set up the pipeline using a zipped list with anchor, positive, and negative filenames as
the source. The pipeline will load and preprocess the corresponding images.

Let's setup our data pipeline using a zipped list with an anchor, positive,
and negative image filename as the source. The output of the pipeline
contains the same triplet with every image loaded and preprocessed.

In [None]:
path = fr"/content/ISIA/flows"

# Get data from all the directories
ds = [] #0->leftWrong same as leftPair, 1->rightWrong, 2->leftPair, 3->rightPair
for label in ['wrong', 'good']:
    for side in ['left', 'right']:
        tmp = tf.keras.utils.image_dataset_from_directory(
            fr"{path}/{label}/{side}",
            color_mode="rgb",
            image_size=(224,224),
            batch_size=None,
            labels=None,
            shuffle=False
        )
        tmp = tmp.map(lambda x: x/255)
        ds.append(tmp)

negative_input = ds[1]
anchor_input = ds[2]
positive_input = ds[3]

train_dataset = tf.data.Dataset.zip((anchor_input, positive_input, negative_input))

size = len(os.listdir(fr"{path}/good/left"))
train_size=0.8

val_dataset = train_dataset.skip(int(size*train_size))
train_dataset = train_dataset.take(int(size*train_size))

train_dataset = train_dataset.shuffle(1000).batch(64).prefetch(1)
val_dataset = val_dataset.batch(64).prefetch(1)

print(train_dataset)
print(val_dataset)

Found 11945 files belonging to 1 classes.
Found 11945 files belonging to 1 classes.
Found 11945 files belonging to 1 classes.
Found 11945 files belonging to 1 classes.
<PrefetchDataset element_spec=(TensorSpec(shape=(None, 224, 224, 3), dtype=tf.float32, name=None), TensorSpec(shape=(None, 224, 224, 3), dtype=tf.float32, name=None), TensorSpec(shape=(None, 224, 224, 3), dtype=tf.float32, name=None))>
<PrefetchDataset element_spec=(TensorSpec(shape=(None, 224, 224, 3), dtype=tf.float32, name=None), TensorSpec(shape=(None, 224, 224, 3), dtype=tf.float32, name=None), TensorSpec(shape=(None, 224, 224, 3), dtype=tf.float32, name=None))>


In [None]:
path = fr"/content/CLICK/flows"

# Get data from all the directories
ds = [] #0->leftWrong same as leftPair, 1->rightWrong, 2->leftPair, 3->rightPair
for label in ['wrong', 'good']:
    for side in ['left', 'right']:
        tmp = tf.keras.utils.image_dataset_from_directory(
            fr"{path}/{label}/{side}",
            color_mode="rgb",
            image_size=(224,224),
            batch_size=None,
            labels=None,
            shuffle=False
        )
        tmp = tmp.map(lambda x: x/255)
        ds.append(tmp)

negative_input = ds[1]
anchor_input = ds[2]
positive_input = ds[3]

test_dataset = tf.data.Dataset.zip((anchor_input, positive_input, negative_input))
test_dataset = test_dataset.batch(1).prefetch(1)
print(test_dataset)

Found 8051 files belonging to 1 classes.
Found 8051 files belonging to 1 classes.
Found 8051 files belonging to 1 classes.
Found 8051 files belonging to 1 classes.
<PrefetchDataset element_spec=(TensorSpec(shape=(None, 224, 224, 3), dtype=tf.float32, name=None), TensorSpec(shape=(None, 224, 224, 3), dtype=tf.float32, name=None), TensorSpec(shape=(None, 224, 224, 3), dtype=tf.float32, name=None))>


## Setting up the embedding generator model

In [None]:
# Convolutional Neural Network
cnn = models.Sequential()
cnn.add(layers.Conv2D(32, (3, 3), activation='relu', input_shape=(224,224,3)))
# cnn.add(layers.BatchNormalization())
cnn.add(layers.MaxPooling2D((3, 3)))

cnn.add(layers.Conv2D(48, (3, 3), activation='relu'))
# cnn.add(layers.BatchNormalization())
cnn.add(layers.MaxPooling2D((2, 2)))

cnn.add(layers.Conv2D(48, (3, 3), activation='relu'))
# cnn.add(layers.BatchNormalization())
cnn.add(layers.MaxPooling2D((2, 2)))

cnn.add(layers.Conv2D(64, (3, 3), activation='relu'))
# cnn.add(layers.BatchNormalization())
cnn.add(layers.MaxPooling2D((2, 2)))

cnn.add(layers.GlobalAveragePooling2D())
cnn.add(layers.Dense(48, activation='relu'))

## Setting up the Siamese Network model

The Siamese network will receive each of the triplet images as an input,
generate the embeddings, and output the distance between the anchor and the
positive embedding, as well as the distance between the anchor and the negative
embedding.

To compute the distance, we can use a custom layer `DistanceLayer` that
returns both values as a tuple.

In [None]:

class DistanceLayer(layers.Layer):
    """
    This layer is responsible for computing the distance between the anchor
    embedding and the positive embedding, and the anchor embedding and the
    negative embedding.
    """

    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def call(self, anchor, positive, negative):
        ap_distance = tf.reduce_sum(tf.square(anchor - positive), -1)
        an_distance = tf.reduce_sum(tf.square(anchor - negative), -1)
        return (ap_distance, an_distance)


anchor_input = layers.Input(name="anchor", shape=(224,224,3))
positive_input = layers.Input(name="positive", shape=(224,224,3))
negative_input = layers.Input(name="negative", shape=(224,224,3))

distances = DistanceLayer()(
    cnn(anchor_input),
    cnn(positive_input),
    cnn(negative_input),
)

siamese_network = models.Model(
    inputs=[anchor_input, positive_input, negative_input], outputs=distances
)

## Putting everything together

We now need to implement a model with custom training loop so we can compute
the triplet loss using the three embeddings produced by the Siamese network.

Let's create a `Mean` metric instance to track the loss of the training process.

In [None]:

class SiameseModel(models.Model):
    """The Siamese Network model with a custom training and testing loops.

    Computes the triplet loss using the three embeddings produced by the
    Siamese Network.

    The triplet loss is defined as:
       L(A, P, N) = max(‖f(A) - f(P)‖² - ‖f(A) - f(N)‖² + margin, 0)
    """

    def __init__(self, siamese_network, margin=0.5):
        super(SiameseModel, self).__init__()
        self.siamese_network = siamese_network
        self.margin = margin
        self.loss_tracker = metrics.Mean(name="loss")

    def call(self, inputs):
        return self.siamese_network(inputs)

    def train_step(self, data):
        # GradientTape is a context manager that records every operation that
        # you do inside. We are using it here to compute the loss so we can get
        # the gradients and apply them using the optimizer specified in
        # `compile()`.
        with tf.GradientTape() as tape:
            loss = self._compute_loss(data)

        # Storing the gradients of the loss function with respect to the
        # weights/parameters.
        gradients = tape.gradient(loss, self.siamese_network.trainable_weights)

        # Applying the gradients on the model using the specified optimizer
        self.optimizer.apply_gradients(
            zip(gradients, self.siamese_network.trainable_weights)
        )

        # Let's update and return the training loss metric.
        self.loss_tracker.update_state(loss)
        return {"loss": self.loss_tracker.result()}

    def test_step(self, data):
        loss = self._compute_loss(data)

        # Let's update and return the loss metric.
        self.loss_tracker.update_state(loss)
        return {"loss": self.loss_tracker.result()}

    def _compute_loss(self, data):
        # The output of the network is a tuple containing the distances
        # between the anchor and the positive example, and the anchor and
        # the negative example.
        ap_distance, an_distance = self.siamese_network(data)

        # Computing the Triplet Loss by subtracting both distances and
        # making sure we don't get a negative value.
        loss = ap_distance - an_distance
        loss = tf.maximum(loss + self.margin, 0.0)
        return loss

    @property
    def metrics(self):
        # We need to list our metrics here so the `reset_states()` can be
        # called automatically.
        return [self.loss_tracker]


## Training

In [None]:
os.mkdir("weights")

In [None]:
checkpoint = callbacks.ModelCheckpoint(
  filepath=fr"weights/weights",
  save_weights_only=True,
  monitor='val_loss',
  mode='min',
  save_best_only=True)

In [None]:
siamese_model = SiameseModel(siamese_network)
siamese_model.compile(optimizer=optimizers.Adam(0.0001))
siamese_model.fit(train_dataset, epochs=50, validation_data=val_dataset, callbacks=[checkpoint])

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50


<keras.callbacks.History at 0x7f80fb1f5050>

In [None]:
siamese_model.evaluate(test_dataset)



0.18352608382701874

In [None]:
import shutil
shutil.make_archive('/content/drive/MyDrive/weights', 'zip', '/content/weights')

'/content/drive/MyDrive/weights.zip'

## Inspecting what the network has learned

At this point, we can check how the network learned to separate the embeddings
depending on whether they belong to similar images.

We can use [cosine similarity](https://en.wikipedia.org/wiki/Cosine_similarity) to measure the
similarity between embeddings.

Let's pick a sample from the dataset to check the similarity between the
embeddings generated for each image.

In [None]:
# sample = next(iter(train_dataset))
# visualize(*sample)

# anchor, positive, negative = sample
# anchor_embedding, positive_embedding, negative_embedding = (
#     embedding(resnet.preprocess_input(anchor)),
#     embedding(resnet.preprocess_input(positive)),
#     embedding(resnet.preprocess_input(negative)),
# )

Finally, we can compute the cosine similarity between the anchor and positive
images and compare it with the similarity between the anchor and the negative
images.

We should expect the similarity between the anchor and positive images to be
larger than the similarity between the anchor and the negative images.

In [None]:
# cosine_similarity = metrics.CosineSimilarity()

# positive_similarity = cosine_similarity(anchor_embedding, positive_embedding)
# print("Positive similarity:", positive_similarity.numpy())

# negative_similarity = cosine_similarity(anchor_embedding, negative_embedding)
# print("Negative similarity", negative_similarity.numpy())
