In [None]:
import tensorflow as tf
import utils.data_utils as du
import utils.model_helper as mh
import matplotlib.pyplot as plt
import os

In [None]:
DATASET_PATH = '/home/irizqy/ml_ws/bangkit-ws/data/bizz.it-sim_dataset'
IMAGE_SHAPE = (150, 150, 3)
BATCH_SIZE = 8

In [None]:
(train_pairs, test_pairs) = du.make_train_test_pairs(DATASET_PATH, .1)
train_pairs.shape, test_pairs.shape

In [None]:
w = 9
h = 8

du.visualize(train_pairs, w, h)

In [None]:
base_cnn_model = tf.keras.applications.resnet50.ResNet50(input_shape=IMAGE_SHAPE, weights='imagenet', include_top=False)

embedding_model = tf.keras.Sequential([
    base_cnn_model,
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(512, activation="relu"),
    tf.keras.layers.BatchNormalization(),
    tf.keras.layers.Dense(256, activation="relu"),
    tf.keras.layers.BatchNormalization(),
    tf.keras.layers.Dense(256),
], name='base_embedding_model')

trainable = False
for layer in base_cnn_model.layers:
    if layer.name == 'conv4_block2_out':
        trainable = True
    layer.trainable = trainable

In [None]:
class DistanceLayer(tf.keras.layers.Layer):
    """
    This layer is responsible for computing the distance between the anchor
    embedding and the positive embedding, and the anchor embedding and the
    negative embedding.
    """

    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def call(self, anchor, positive, negative):
        ap_distance = tf.reduce_sum(tf.square(anchor - positive), -1)
        an_distance = tf.reduce_sum(tf.square(anchor - negative), -1)
        return (ap_distance, an_distance)
    
anchor_input = tf.keras.layers.Input(name="anchor", shape=IMAGE_SHAPE)
positive_input = tf.keras.layers.Input(name="positive", shape=IMAGE_SHAPE)
negative_input = tf.keras.layers.Input(name="negative", shape=IMAGE_SHAPE)

distances = DistanceLayer()(
    embedding_model(tf.keras.applications.resnet.preprocess_input(anchor_input)),
    embedding_model(tf.keras.applications.resnet.preprocess_input(positive_input)),
    embedding_model(tf.keras.applications.resnet.preprocess_input(negative_input)),
)

siamese_network = tf.keras.Model(
    inputs=[anchor_input, positive_input, negative_input], outputs=distances
)

In [None]:
class SiameseModel(tf.keras.Model):
    """The Siamese Network model with a custom training and testing loops.

    Computes the triplet loss using the three embeddings produced by the
    Siamese Network.

    The triplet loss is defined as:
       L(A, P, N) = max(‖f(A) - f(P)‖² - ‖f(A) - f(N)‖² + margin, 0)
    """

    def __init__(self, siamese_network, margin=0.5):
        super().__init__()
        self.siamese_network = siamese_network
        self.margin = margin
        self.loss_tracker = tf.keras.metrics.Mean(name="loss")

    def call(self, inputs):
        return self.siamese_network(inputs)

    def train_step(self, data):
        # GradientTape is a context manager that records every operation that
        # you do inside. We are using it here to compute the loss so we can get
        # the gradients and apply them using the optimizer specified in
        # `compile()`.
        with tf.GradientTape() as tape:
            loss = self._compute_loss(data)

        # Storing the gradients of the loss function with respect to the
        # weights/parameters.
        gradients = tape.gradient(loss, self.siamese_network.trainable_weights)

        # Applying the gradients on the model using the specified optimizer
        self.optimizer.apply_gradients(
            zip(gradients, self.siamese_network.trainable_weights)
        )

        # Let's update and return the training loss metric.
        self.loss_tracker.update_state(loss)
        return {"loss": self.loss_tracker.result()}

    def test_step(self, data):
        loss = self._compute_loss(data)

        # Let's update and return the loss metric.
        self.loss_tracker.update_state(loss)
        return {"loss": self.loss_tracker.result()}

    def _compute_loss(self, data):
        # The output of the network is a tuple containing the distances
        # between the anchor and the positive example, and the anchor and
        # the negative example.
        ap_distance, an_distance = self.siamese_network(data)

        # Computing the Triplet Loss by subtracting both distances and
        # making sure we don't get a negative value.
        loss = ap_distance - an_distance
        loss = tf.maximum(loss + self.margin, 0.0)
        return loss

    @property
    def metrics(self):
        # We need to list our metrics here so the `reset_states()` can be
        # called automatically.
        return [self.loss_tracker]


In [None]:
siamese_model = SiameseModel(siamese_network)
siamese_model.compile(optimizer=tf.keras.optimizers.Adam())
siamese_model.fit([train_pairs[:, 0], train_pairs[:, 1], train_pairs[:, 2]],
                  epochs=20,
                  validation_data=[test_pairs[:, 0], test_pairs[:, 1], test_pairs[:, 2]]
                  )

In [None]:
siamese_model.save('/home/irizqy/ml_ws/bangkit-ws/src/logo-detector/triplet_im_similar/')

In [16]:
anchor_path = '/home/irizqy/Downloads/ct_3.png'
target_path = '/home/irizqy/Downloads/franchise'

anchor = mh.adjust_im(anchor_path, (150, 150)) 

In [17]:
preds = []
anchor = mh.adjust_im(anchor_path, (150, 150)) 

for i, file in enumerate(os.listdir(target_path)):
    target = mh.adjust_im(os.path.join(target_path, file), (150, 150))
    print(target)
    anchor_embedding, positive_embedding, negative_embedding = (
        embedding_model(tf.keras.applications.resnet.preprocess_input(anchor)),
        embedding_model(tf.keras.applications.resnet.preprocess_input(target)),
        embedding_model(tf.keras.applications.resnet.preprocess_input(target)),
    )
    cosine_similarity = tf.keras.metrics.CosineSimilarity()

    positive_similarity = cosine_similarity(anchor_embedding, positive_embedding)
    # print("Positive similarity:", positive_similarity.numpy())

    negative_similarity = cosine_similarity(anchor_embedding, negative_embedding)
    # print("Negative similarity", negative_similarity.numpy())

    if positive_similarity > negative_similarity:
        preds.append((i, positive_similarity))
    else:    
        pass

[[[[1. 1. 1.]
   [1. 1. 1.]
   [1. 1. 1.]
   ...
   [1. 1. 1.]
   [1. 1. 1.]
   [1. 1. 1.]]

  [[1. 1. 1.]
   [1. 1. 1.]
   [1. 1. 1.]
   ...
   [1. 1. 1.]
   [1. 1. 1.]
   [1. 1. 1.]]

  [[1. 1. 1.]
   [1. 1. 1.]
   [1. 1. 1.]
   ...
   [1. 1. 1.]
   [1. 1. 1.]
   [1. 1. 1.]]

  ...

  [[1. 1. 1.]
   [1. 1. 1.]
   [1. 1. 1.]
   ...
   [1. 1. 1.]
   [1. 1. 1.]
   [1. 1. 1.]]

  [[1. 1. 1.]
   [1. 1. 1.]
   [1. 1. 1.]
   ...
   [1. 1. 1.]
   [1. 1. 1.]
   [1. 1. 1.]]

  [[1. 1. 1.]
   [1. 1. 1.]
   [1. 1. 1.]
   ...
   [1. 1. 1.]
   [1. 1. 1.]
   [1. 1. 1.]]]]
[[[[0.88627451 0.1254902  0.16862745]
   [0.88627451 0.1254902  0.16862745]
   [0.88627451 0.1254902  0.16862745]
   ...
   [0.81960784 0.10980392 0.18431373]
   [0.81960784 0.10980392 0.18431373]
   [0.81176471 0.10980392 0.18431373]]

  [[0.88627451 0.1254902  0.16862745]
   [0.88627451 0.1254902  0.16862745]
   [0.88627451 0.1254902  0.16862745]
   ...
   [0.81960784 0.10980392 0.18431373]
   [0.81960784 0.10980392 0.18431373]


In [21]:
preds

[(0, <tf.Tensor: shape=(), dtype=float32, numpy=0.9999801>),
 (3, <tf.Tensor: shape=(), dtype=float32, numpy=0.99927205>),
 (4, <tf.Tensor: shape=(), dtype=float32, numpy=0.9978555>),
 (5, <tf.Tensor: shape=(), dtype=float32, numpy=0.9961332>),
 (6, <tf.Tensor: shape=(), dtype=float32, numpy=0.99452704>),
 (7, <tf.Tensor: shape=(), dtype=float32, numpy=0.9926253>),
 (8, <tf.Tensor: shape=(), dtype=float32, numpy=0.99123716>),
 (9, <tf.Tensor: shape=(), dtype=float32, numpy=0.98977554>),
 (10, <tf.Tensor: shape=(), dtype=float32, numpy=0.9885911>),
 (11, <tf.Tensor: shape=(), dtype=float32, numpy=0.98754096>),
 (12, <tf.Tensor: shape=(), dtype=float32, numpy=0.986044>),
 (13, <tf.Tensor: shape=(), dtype=float32, numpy=0.98574597>),
 (14, <tf.Tensor: shape=(), dtype=float32, numpy=0.98495156>),
 (15, <tf.Tensor: shape=(), dtype=float32, numpy=0.98449135>)]