In [2]:

from tensorflow.keras.datasets import mnist
from tensorflow.keras.models import Model
import tensorflow.keras.layers as ly
import tensorflow.keras.models as models
from tensorflow.keras.optimizers import RMSprop
from tensorflow.keras import backend as bck
import keras
import tensorflow.keras.losses as losses

import numpy as np, random,pandas as pd, tensorflow as tf
import matplotlib.pyplot as plt

## HELPER FUNCTIONS


In [3]:
def eu_dist(tensor_arr):
  tensor_x, tensor_y = tensor_arr
  temp_sq = bck.square(tensor_x - tensor_y)
  sum_square = bck.sum(temp_sq, axis=1, keepdims=True)
  return bck.sqrt(bck.maximum(bck.sum(temp_sq, axis=1, keepdims=True), bck.epsilon()))


In [4]:
def make_pairs(X, y):
    no_of_classes = 10
    class_indices = []
    for i in range(no_of_classes):
        class_indices += [np.where(y == i)[0]]
    
    pairs ,labels = [],[]
    arr = []
    for d in range(no_of_classes):
        arr+=[len(class_indices[d])]
    min_length_for_classes = min(arr)
    
    
    for i in range(10):
        for j in range(min_length_for_classes-1):
            
            pairs = pairs+ [[X[class_indices[i][j]], X[class_indices[i][j+1]]]]
            r = random.randrange(1, 10)
            k = (i + r) % 10
            
            pairs = pairs +[[X[class_indices[i][j]], X[class_indices[k][j]]]]
            labels=labels + [1,0]

    pairs = np.array(pairs)
    
    return pairs, np.array(labels).astype('float32')


In [5]:
def get_base_model(input_shape):
  dropout_value = 0.25
  activation_function = 'relu'
  kernel_size = (5,5)
  input = ly.Input(shape = input_shape)
  x = ly.Conv2D(32, kernel_size, activation = activation_function)(input)
  x = ly.MaxPool2D(strides=(2,2),pool_size = (2,2))(x)
  x = ly.Conv2D(64, kernel_size, activation = activation_function)(x)
  x = ly.MaxPooling2D(pool_size = (2,2),strides=(2,2))(x)
  x = ly.Dropout(dropout_value)(x)
  x = ly.Flatten()(x)
  x = ly.Dense(256, activation = activation_function)(x)
  x = ly.Dropout(2*dropout_value)(x)
  x = ly.Dense(512,activation = activation_function)(x)
  x = ly.Dropout(2*dropout_value)(x)
  x = ly.Dense(20,activation = activation_function)(x)
  model = Model(input, x)
  
  return model

In [6]:
def accuracy(y_true, y_pred):
    y = bck.cast(y_pred < 0.5, y_true.dtype)
    x = bck.equal(y_true, y)
    return bck.mean(x)

In [7]:
def compute_accuracy(y_true, y_pred):
    return np.mean((y_pred.ravel() < 0.5) == y_true)


# Data Handling


In [8]:
(X_train, y_train), (X_test, y_test) = mnist.load_data()

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz


In [9]:
X_train = X_train.reshape(X_train.shape[0], 28, 28,1).astype('float32')/255
X_test = X_test.reshape(X_test.shape[0], 28, 28, 1).astype('float32')/255


# Using different loss functions

## Contrastive loss

In [10]:

def contrastive_loss(ground_truth, y_pred):
  margin_square = bck.square(bck.maximum(1 - y_pred, 0))
  gt_bar = 1 - ground_truth
  ret = bck.square(y_pred) * ground_truth +  margin_square * gt_bar
  return bck.mean(ret)



In [60]:
def Siamese():
  
    input1 ,input2 = ly.Input(shape=(28,28,1)) , ly.Input(shape=(28,28,1))
    
    Model_input = [input1,input2]
    base_model = get_base_model((28,28,1))
    d1,d2 = base_model(input1) , base_model(input2)
    oa=(lambda shapes: (shapes[0][0],1))
    distance = ly.Lambda(eu_dist,output_shape=oa)([d1, d2])
    model = Model(Model_input, distance)

    return model

In [61]:
model1 = Siamese()
model1.compile(loss=contrastive_loss, metrics=[accuracy])

In [62]:
(train_pairs, train_y),(test_pairs, test_y) = make_pairs(X_train, y_train) , make_pairs(X_test, y_test)
input_for_test = [test_pairs[:, 0], test_pairs[:, 1]]
input_for_train = [train_pairs[:, 0], train_pairs[:, 1]] 

In [None]:
model1.fit(input_for_train, train_y, batch_size=128, epochs=10)

In [15]:
print(f"Train Accuracy:{( compute_accuracy(train_y, model1.predict(input_for_train)))}")
print(f"Test Accuracy:{(compute_accuracy(test_y, model1.predict(input_for_test)))}")

Train Accuracy:0.9969557195571955
Test Accuracy:0.9902356902356902


## Triplet loss

In [45]:
class TripletLoss(ly.Layer):
 
    def __init__(self):
        super().__init__()
        
 
    def call(self, x):
        margin = 0.2
        anc,p,n = x
        loss = bck.maximum((bck.sum(bck.square(anc-p),axis=1)-bck.sum(bck.square(anc-n),axis=1)+margin),0.0)
        return loss

In [46]:
def get_image(label, X, y):
    idx = np.random.randint(len(y))
    while y[idx] != label:
        idx = np.random.randint(len(y))
    return X[idx]
    
def get_triplet(X,y):
    n = a = np.random.randint(10)
    while n == a:
        n = np.random.randint(10)
    a, p = get_image(a, X, y), get_image(a, X, y)
    n = get_image(n,  X, y)
    return a, p, n

def generate_triplets(X,y,batch_size):
    while 1:
        list_a,list_p, list_n = [], [] ,[]
       

        for i in range(batch_size):
            triplets = get_triplet(X,y)
            list_a.append(triplets[0])
            list_p.append(triplets[1])
            list_n.append(triplets[2])
            
        A, P, N = np.array(list_a, dtype='float32'),np.array(list_p, dtype='float32'),np.array(list_n, dtype='float32')
        label = np.ones(batch_size)
        yield [A, P, N], label

In [47]:
def identity_loss(y_true, y_pred):
    return bck.mean(y_pred)


In [48]:
def get_triplet_model():
  base_model = get_base_model((28,28,1))
  input_1, input_2,input_3  = ly.Input((28,28,1)), ly.Input((28,28,1)), ly.Input((28,28,1))
      
  anchor,positive,negative = base_model(input_1),base_model(input_2),base_model(input_3)

  loss = TripletLoss()([anchor, positive, negative]) 
  model = Model(inputs=[input_1, input_2, input_3], outputs=loss)
  
  return model

In [49]:
model = get_triplet_model()
model.compile(loss=identity_loss)

In [50]:
train_generator = generate_triplets(X_train, y_train,128)
test_generator = generate_triplets(X_test, y_test,128)

In [51]:
history = model.fit(train_generator, 
                    validation_data=test_generator, 
                    epochs=10,steps_per_epoch=20, 
                    validation_steps=30)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


## Cross Entropy Loss

In [53]:
model3 = Siamese()
model3.compile(loss=losses.binary_crossentropy, metrics=[accuracy])

In [54]:
(train_pairs, train_y),(test_pairs, test_y) = make_pairs(X_train, y_train) , make_pairs(X_test, y_test)
input_for_test = [test_pairs[:, 0], test_pairs[:, 1]]
input_for_train = [train_pairs[:, 0], train_pairs[:, 1]] 

In [55]:
model1.fit(input_for_train, train_y, batch_size=128, epochs=10)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<tensorflow.python.keras.callbacks.History at 0x7f06f23e9a50>

In [56]:
print(f"Train Accuracy:{( compute_accuracy(train_y, model1.predict(input_for_train)))}")
print(f"Test Accuracy:{(compute_accuracy(test_y, model1.predict(input_for_test)))}")

Train Accuracy:0.5
Test Accuracy:0.5


# Using different gradient descent optimisers



## Adam

In [None]:
import tensorflow.keras.optimizers as opt

In [None]:
ad = opt.Adam()

model_adam = Siamese()

model_adam.compile(optimizer=ad, loss=contrastive_loss,metrics=[accuracy])

In [None]:
history_adam = model_adam.fit(input_for_train, train_y, batch_size=128, epochs=10)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [None]:

print(f"Test Accuracy:{(compute_accuracy(test_y, model_adam.predict(input_for_test)))}")

Test Accuracy:0.9925925925925926


## RMSProp

In [None]:
rmsprop = opt.RMSprop()
model_rms = Siamese()
model_rms.compile(optimizer=rmsprop, loss=contrastive_loss,metrics=[accuracy])

In [None]:
history_rms = model_rms.fit(input_for_train, train_y, batch_size=128, epochs=10)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [None]:
print(f"Test Accuracy:{(compute_accuracy(test_y, model_rms.predict(input_for_test)))}")

Test Accuracy:0.9904601571268238


## Mini Btach

In [None]:
sgd = opt.SGD()
model_sgd = Siamese()
model_sgd.compile(optimizer=sgd, loss=contrastive_loss,metrics=[accuracy])

In [None]:
history_rms = model_sgd.fit(input_for_train, train_y, batch_size=64, epochs=10)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [None]:
print(f"Test Accuracy:{(compute_accuracy(test_y, model_sgd.predict(input_for_test)))}")

Test Accuracy:0.9225028058361392


# Hyperparameter Optimisation

In [None]:

import keras.optimizers.schedules as schedules
lr_scheduler = schedules.InverseTimeDecay(
                                            decay_steps=100000,
                                            initial_learning_rate=1e-3,
                                            decay_rate=0.5
                                          )

In [None]:
ad = opt.Adam(learning_rate=lr_scheduler)

model_adam2 = Siamese()

model_adam2.compile(optimizer=ad, loss=contrastive_loss,metrics=[accuracy])

In [None]:
history_adam2 = model_adam2.fit(input_for_train, train_y, batch_size=128, epochs=10)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [None]:

print(f"Test Accuracy:{(compute_accuracy(test_y, model_adam2.predict(input_for_test)))}")

Test Accuracy:0.9912457912457913


# Pros and Cons

- More Robust to class Imbalance problems
- Make use of relationships, using more data
- Siamese focuses on learning embeddings (in the deeper layer) that place the same classes/concepts close together. Hence, can learn semantic similarity.
 
### Disadvantages:
- As it operates on pairs of training samples per class, requires more training time.
- It provides output in the form of distance from each class, not the probability of a test sample to belong to a class.