# Siamese Neural Network - omniglot dataset

## Import Library

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, Input, backend
import tensorflow_datasets as tfds

## Load the Omniglot dataset 

In [None]:
(ds_train, ds_test), ds_info = tfds.load('omniglot', split=['train','test'], with_info=True)

In [None]:
input_shape = ds_info.features['image'].shape
print(input_shape)

In [None]:
print(list(ds_info.features.keys()))

In [None]:
ds_info.splits['train'].num_examples

## Split the dataset to train and test

In [None]:
def normalize_img(left, right):
  """Normalizes images: `uint8` -> `float32`."""
  # left, right = case
  if left["alphabet"] == right["alphabet"]:
    flag = 0
  else:
    flag = 1
  return tf.cast(left["image"], tf.float32) / 255., tf.cast(right["image"], tf.float32) / 255., tf.cast(flag, tf.float32)

In [None]:
num_train = int(ds_info.splits['train'].num_examples / 2 * 0.8)
# num_val = int(ds_info.splits['train'].num_examples / 2 * 0.2)
left_train = ds_train.take(num_train)
right_train = ds_train.skip(num_train).take(num_train)
# left_val = ds_train.skip(num_train + num_train).take(num_val)
# right_val = ds_train.skip(num_train + num_train + num_val).take(num_val)

ds_train = tf.data.Dataset.zip((left_train, right_train))
# ds_val = tf.data.Dataset.zip((left_val, right_val))

train_x_left = []
train_x_right = []
train_y = []

for left, right in ds_train:
    left_x, right_x, flag = normalize_img(left, right)

    train_x_left.append(left_x)
    train_x_right.append(right_x)
    train_y.append(flag)

train_x_left = np.array(train_x_left)
train_x_right = np.array(train_x_right)
train_y = np.array(train_y)

In [None]:
# val_x_left = []
# val_x_right = []
# val_y = []

# for left, right in ds_train:
#     left_x, right_x, flag = normalize_img(left, right)

#     val_x_left.append(left_x)
#     val_x_right.append(right_x)
#     val_y.append(flag)

# val_x_left = np.array(val_x_left)
# val_x_right = np.array(val_x_right)
# val_y = np.array(val_y)

In [None]:
num_test = int(ds_info.splits['test'].num_examples / 2)
left_test = ds_test.take(num_test)
right_test = ds_test.skip(num_test).take(num_test)
ds_test = tf.data.Dataset.zip((left_test, right_test))

test_x_left = []
test_x_right = []
test_y = []

for left, right in ds_test:
    left_x, right_x, flag = normalize_img(left, right)

    test_x_left.append(left_x)
    test_x_right.append(right_x)
    test_y.append(flag)

test_x_left = np.array(test_x_left)
test_x_right = np.array(test_x_right)
test_y = np.array(test_y)

In [None]:
print(train_x_left.shape, train_x_right.shape, train_y.shape)
print(test_x_left.shape, test_x_right.shape, test_y.shape)

In [None]:
# def normalize_img(case):
#   """Normalizes images: `uint8` -> `float32`."""
#   return tf.cast(case["image"], tf.float32) / 255., case["alphabet"]


In [None]:
# ds_train = ds_train.map(
#   normalize_img, num_parallel_calls=tf.data.experimental.AUTOTUNE)
# ds_train = ds_train.cache()
# ds_train = ds_train.shuffle(int(ds_info.splits['train'].num_examples/2))
# ds_train = ds_train.batch(128)
# ds_train = ds_train.prefetch(tf.data.experimental.AUTOTUNE)

In [None]:
# ds_test = ds_test.map(
#     normalize_img, num_parallel_calls=tf.data.experimental.AUTOTUNE)
# ds_test = ds_test.batch(128)
# ds_test = ds_test.cache()
# ds_test = ds_test.prefetch(tf.data.experimental.AUTOTUNE)

In [None]:
def triplet_loss(y_true, y_pred, margin = 1):
    """Implementation of the triplet loss function

    Arguments:
        y_true : true labels, required when you define a loss in Keras, 
                not applied in this function.

        y_pred (list): python list containing three objects:
            anchor : the encodings for the anchor data
            positive : the encodings for the positive data (similar to anchor)
            negative : the encodings for the negative data (different from anchor)
        
        margin (float, optional): m > 0 determines how far the embeddings of 
                    a negative data should be pushed apart. Defaults to 0.4.

    Returns:
        loss (float): real number, value of the loss
    """

    anchor = y_pred[0]
    positive = y_pred[1]
    negative = y_pred[2]

    # squared distance between the anchor and the positive
    pos_dist = tf.math.reduce_sum(tf.math.square(anchor - positive), axis=1)

    # squared distance between the anchor and the negative
    neg_dist = tf.math.reduce_sum(tf.math.square(anchor - negative), axis=1)

    # compute loss
    basic_loss = margin + pos_dist - neg_dist
    loss = tf.math.maximum(basic_loss,0.0)
    loss = tf.math.reduce_mean(loss)
    return loss


def contrastive_loss(y_true, y_pred, margin = 1):
    """Implementation of the triplet loss function

    Inspired by https://stackoverflow.com/questions/38260113/implementing-contrastive-loss-and-triplet-loss-in-tensorflow

    Args:
        y_true (int): true label, positive pair (same class) -> 0, 
                    negative pair (different class) -> 1
        
        y_pred (list): python list containing two objects in a pair of tensors:
            left : the encodings for one image data in a pair
            right : the encodings for the other image data in a pair
        
        margin (float, optional): m > 0 determines how far the embeddings of 
                    a negative pair should be pushed apart. Defaults to 0.4.

    Returns:
        loss (float): real number, value of the loss
    """

    # left = y_pred[0]
    # right = y_pred[1]

    # # squared distance between the left image and the right image
    # dist = tf.math.reduce_sum(tf.math.square(left - right), 0)
    dist = y_pred

    loss_positive = tf.math.square(dist)
    loss_negative = tf.math.square(tf.maximum(0., margin - dist))
    
    loss = y_true * loss_negative + (1 - y_true) * loss_positive
    loss = 0.5 * tf.math.reduce_mean(loss)

    return loss


In [None]:
# https://towardsdatascience.com/one-shot-learning-with-siamese-networks-using-keras-17f34e75bb3d

# https://keras.io/examples/vision/siamese_contrastive/

def euclidean_distance(vec):
    """Find the Euclidean distance between two vectors.

    Arguments:
        vec: List containing two tensors of same length.

    Returns:
        Tensor containing euclidean distance
        (as floating point value) between vectors.
    """

    x, y = vec
    distance = tf.math.sqrt(tf.math.reduce_sum(tf.math.square(x - y), axis=1, keepdims=True))
    return distance



# def siamese_networks(input_shape):

#     input_layer = Input(shape=input_shape)
#     x = layers.Conv2D(filters=64, kernel_size=(10,10), activation='relu')(input_layer)
#     x = layers.MaxPool2D(pool_size=(2,2))(x)
#     x = layers.Conv2D(filters=128, kernel_size=(7,7), activation='relu')(x)
#     x = layers.MaxPool2D(pool_size=(2,2))(x)
#     x = layers.Conv2D(filters=128, kernel_size=(4,4), activation='relu')(x)
#     x = layers.MaxPool2D(pool_size=(2,2))(x)
#     x = layers.Conv2D(filters=256, kernel_size=(4,4), activation='relu')(x)
#     x = layers.Flatten()(x)
#     x = layers.Dense(4096, activation='sigmoid')(x)

#     model = keras.Model(inputs=input_layer, outputs=x)

#     input1 = Input(input_shape)
#     input2 = Input(input_shape)

#     left_model = model(input1)
#     right_model = model(input2)
#     L1_distance = layers.Lambda(euclidean_distance)([left_model, right_model])
#     # L1_layer = layers.Lambda(lambda tensors:backend.abs(tensors[0] - tensors[1]))
#     # L1_distance = L1_layer([left_model, right_model])

#     prediction = layers.Dense(1,activation='sigmoid')(L1_distance)

#     siamese_model = keras.Model(inputs=[input1, input2], outputs=prediction, name="siamese_networks")

#     return model, siamese_model

def siamese_networks(input_shape):

    input_layer = Input(shape=input_shape)
    x = layers.Conv2D(filters=64, kernel_size=(10,10), activation='relu')(input_layer)
    x = layers.MaxPool2D(pool_size=(2,2))(x)
    x = layers.Conv2D(filters=128, kernel_size=(7,7), activation='relu')(x)
    x = layers.MaxPool2D(pool_size=(2,2))(x)
    x = layers.Conv2D(filters=128, kernel_size=(4,4), activation='relu')(x)
    x = layers.MaxPool2D(pool_size=(2,2))(x)
    x = layers.Conv2D(filters=256, kernel_size=(4,4), activation='relu')(x)
    x = layers.Flatten()(x)
    x = layers.Dense(4096, activation='sigmoid')(x)

    model = keras.Model(inputs=input_layer, outputs=x)

    input1 = Input(input_shape)
    input2 = Input(input_shape)
    input3 = Input(input_shape)

    left_model = model(input1)
    right_model = model(input2)
    L1_distance = layers.Lambda(euclidean_distance)([left_model, right_model])

    prediction = layers.Dense(1,activation='sigmoid')(L1_distance)

    siamese_model = keras.Model(inputs=[input1, input2], outputs=prediction, name="siamese_networks")

    return model, siamese_model


In [None]:
!pip install pydot

In [None]:
model, siamese_model = siamese_networks(input_shape)
keras.utils.plot_model(model, "sub-model.png", show_shapes=True)
keras.utils.plot_model(siamese_model, "siamese_model.png", show_shapes=True)

In [None]:
siamese_model.summary()

In [None]:
# A2_i = np.random.randint(0, 256, size=(1, 105, 105, 3)).astype("float32")
# A2_j = np.random.randint(0, 256, size=(1, 105, 105, 3)).astype("float32")

# model([A2_i, A2_j]).numpy()

In [None]:
siamese_model.compile(
    loss=contrastive_loss,
    optimizer=keras.optimizers.SGD(),
    metrics=['accuracy'],
)


In [None]:
# early_stopping = keras.callbacks.EarlyStopping(monitor='accuracy', patience=2)

# lr_reducer = keras.callbacks.ReduceLROnPlateau(monitor='val_loss',
#                                                 factor=0.5,
#                                                 patience=5,
#                                                 verbose=0,
#                                                 mode='auto',
#                                                 min_delta=0.0001,
#                                                 cooldown=0,
#                                                 min_lr=0)

history = siamese_model.fit(
    x=[train_x_left, train_x_right],
    y=train_y,
    epochs=10,
    batch_size=128, #128
    # validation_data=([val_x_left, val_x_right], val_y),
    # callbacks=[early_stopping, lr_reducer],
)

In [None]:
score = siamese_model.evaluate(x=[test_x_left,test_x_right],y=test_y, verbose=1)

print('Test loss:', score[0])
print('Test accuracy:', score[1])

In [None]:
# from sklearn.metrics import classification_report

# target_names = ['0', '1']

# print(classification_report(test_y, y_pred, target_names=target_names))

In [None]:
import matplotlib.pyplot as plt

plt.plot(history.history['accuracy'])
# plt.plot(history.history['val_accuracy'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

In [None]:
plt.plot(history.history['loss'])
# plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

In [None]:
    # score = siamese_model_1.predict(x=[test_x_left,test_x_right], verbose=1)

    # print('Test loss:', score[0])
    # print('Test accuracy:', score[1])