In [169]:
import matplotlib.pyplot as plt
import numpy as np
from keras.models import Model
from keras.datasets import mnist
from keras.layers.core import  Activation, Dense, Reshape
from keras.layers import Input, Flatten, Dense, Dropout, Lambda
from keras import backend as K
from keras import layers
from keras.engine.topology import Layer
from keras.optimizers import RMSprop, Adam,SGD
import tensorflow as tf

In [None]:
tf.config.run_functions_eagerly(True)

In [136]:
(train_X, train_y), (test_X, test_y) = mnist.load_data()
print('X_train: ' + str(train_X.shape))
print('Y_train: ' + str(train_y.shape))
print('X_test:  '  + str(test_X.shape))
print('Y_test:  '  + str(test_y.shape))

X_train: (60000, 28, 28)
Y_train: (60000,)
X_test:  (10000, 28, 28)
Y_test:  (10000,)


## For each digit making 800 positive and negative samples

In [152]:
import random
new_data_set_1 = []
new_data_set_2 = []
new_label = []
for i in range(0,10):
  x_train_same = train_X[train_y == i]
  x_train_diff = train_X[train_y != i]
  #select 800 positive samples for each number
  for k in range(800):
    r1 = random.randint(0,len(x_train_same)-1)
    r2 = random.randint(0,len(x_train_same)-1)
    new_data_set_1.append(x_train_same[r1])
    new_data_set_2.append(x_train_same[r2])                   
    new_label.append(1)
    r3 = random.randint(0,len(x_train_diff)-1)
    new_data_set_1.append(x_train_same[r1])
    new_data_set_2.append(x_train_diff[r3])
    new_label.append(0)

In [153]:
new_data_set_1 = np.stack(new_data_set_1,axis=0).astype('float64')
new_data_set_2 = np.stack(new_data_set_2,axis=0).astype('float64')
new_label = np.stack(new_label,axis=0).astype('float64')

In [154]:
print(new_data_set_1.shape)
print(new_data_set_2.shape)
print(new_data_set_1.dtype)

(16000, 28, 28)
(16000, 28, 28)
float64


In [155]:
new_data_set_1 = new_data_set_1.reshape(-1,28*28)
new_data_set_2 = new_data_set_2.reshape(-1,28*28)

In [8]:
def buildBranchModel():
  inpx = Input(shape=(784,))
  x = Dense(128,activation='relu')(inpx)
  x = Dropout(0.1)(x)
  x = Dense(128,activation='relu')(x)
  x = Dropout(0.1)(x)
  x = Dense(128,activation='relu')(x)
  return Model([inpx],[x])

In [17]:
class DistanceCost(Layer):
    def __init__(self, **kwargs):
        super(DistanceCost, self).__init__(**kwargs)

    def call(self ,x ,mask=None):
      h1=x[0]
      h2=x[1]
      sum_square = K.sum(K.square(h1 - h2), axis=1, keepdims=True)
      return K.sqrt(K.maximum(sum_square, K.epsilon()))

In [45]:
def contrastive_loss(y_true, y_pred): # y_true is the label either 0 or 1
    margin = 255
    sqaure_pred = K.square(y_pred)
    margin_square = K.square(K.maximum(margin - y_pred, 0))
    return K.mean(y_true * sqaure_pred + (1 - y_true) * margin_square)

In [46]:
branchModel = buildBranchModel()
input_1 = Input(shape=(784,))
input_2 = Input(shape=(784,))

output_1 = branchModel(input_1)
output_2 = branchModel(input_2)

distance_layer = DistanceCost()([output_1,output_2])
model = Model([input_1,input_2],distance_layer)

rms = RMSprop()

model.compile(loss=contrastive_loss,optimizer=rms)

In [47]:
model.fit([new_data_set_1,new_data_set_2],new_label,epochs=50,batch_size=128)

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50


<tensorflow.python.keras.callbacks.History at 0x7f77162e1e50>

# Selecting images of each digit for comparing

In [140]:
# create an image pool and save the output of the model here
image_pool = []
for i in range(0,10):
  image_pool.append(train_X[train_y == i][0])
image_pool = np.stack(image_pool,axis=0).astype('float64').reshape(-1,28*28)
image_pool_output = []
for img in image_pool:
  image_pool_output.append(branchModel(img.reshape(1,784)))
image_pool_output = np.stack(image_pool_output,axis=0).astype('float64')

In [141]:
test_images = test_X.reshape(-1,28*28)

In [142]:
def predict(test_image):
  pred_1 = branchModel(test_image.reshape(1,784))
  dis = K.mean((image_pool_output-pred_1)**2,axis=2).numpy()
  return np.argmin(dis)

In [143]:
predictions = []
for img in test_images:
  predictions.append(predict(img))
correct_predictions_rms = (predictions == test_y).sum()

In [147]:
print(correct_predictions," out of ",test_images.shape[0],"accuracy is : ",correct_predictions/test_images.shape[0])

9610  out of  10000 accuracy is :  0.961


# Adam optimizer

In [149]:
branchModel_adam = buildBranchModel()
input_1 = Input(shape=(784,))
input_2 = Input(shape=(784,))

output_1 = branchModel_adam(input_1)
output_2 = branchModel_adam(input_2)

distance_layer_adam = DistanceCost()([output_1,output_2])
model_adam = Model([input_1,input_2],distance_layer_adam)

adam = Adam()

model_adam.compile(loss=contrastive_loss,optimizer=adam)

In [157]:
tf.config.run_functions_eagerly(True)

In [158]:
model_adam.fit([new_data_set_1,new_data_set_2],new_label,epochs=50,batch_size=128)

Epoch 1/50
  5/125 [>.............................] - ETA: 3s - loss: 30478.4180

  "Even though the tf.config.experimental_run_functions_eagerly "


Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50


<tensorflow.python.keras.callbacks.History at 0x7f771477ced0>

In [164]:
def predict_adam(test_image):
  pred_1 = branchModel_adam(test_image.reshape(1,784))
  dis = K.mean((image_pool_output_adam-pred_1)**2,axis=2).numpy()
  return np.argmin(dis)

In [163]:
image_pool_adam = []
for i in range(0,10):
  image_pool_adam.append(train_X[train_y == i][0])
image_pool_adam = np.stack(image_pool,axis=0).astype('float64').reshape(-1,28*28)
image_pool_output_adam = []
for img in image_pool_adam:
  image_pool_output_adam.append(branchModel_adam(img.reshape(1,784)))
image_pool_output_adam = np.stack(image_pool_output_adam,axis=0).astype('float64')

In [165]:
predictions_adam = []
for img in test_images:
  predictions_adam.append(predict_adam(img))
correct_predictions_adam = (predictions_adam == test_y).sum()

In [166]:
print("ADAM : ",correct_predictions_adam," out of ",test_images.shape[0],"accuracy is : ",correct_predictions_adam/test_images.shape[0])

ADAM :  9394  out of  10000 accuracy is :  0.9394


# Mini Batch Gradient Descent

In [183]:
branchModel_gd = buildBranchModel()
input_1 = Input(shape=(784,))
input_2 = Input(shape=(784,))

output_1 = branchModel_gd(input_1)
output_2 = branchModel_gd(input_2)

distance_layer_gd = DistanceCost()([output_1,output_2])
model_gd = Model([input_1,input_2],distance_layer_gd)

sgd = SGD(lr=0.00001, momentum=0.2)

model_gd.compile(loss=contrastive_loss,optimizer=sgd)

In [184]:
model_gd.fit([new_data_set_1,new_data_set_2],new_label,epochs=50,batch_size=128)

Epoch 1/50
  6/125 [>.............................] - ETA: 2s - loss: 44426.1755

  "Even though the tf.config.experimental_run_functions_eagerly "


Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50


<tensorflow.python.keras.callbacks.History at 0x7f770a3db790>

In [185]:
image_pool_gd = []
for i in range(0,10):
  image_pool_gd.append(train_X[train_y == i][0])
image_pool_gd = np.stack(image_pool_gd,axis=0).astype('float64').reshape(-1,28*28)
image_pool_output_gd = []
for img in image_pool_gd:
  image_pool_output_gd.append(branchModel_gd(img.reshape(1,784)))
image_pool_output_gd = np.stack(image_pool_output_gd,axis=0).astype('float64')

In [186]:
def predict_gd(test_image):
  pred_1 = branchModel_gd(test_image.reshape(1,784))
  dis = K.mean((image_pool_output_gd-pred_1)**2,axis=2).numpy()
  return np.argmin(dis)

In [187]:
predictions_gd = []
for img in test_images:
  predictions_gd.append(predict_gd(img))
correct_predictions_gd = (predictions_gd == test_y).sum()

In [188]:
print("SGD : ",correct_predictions_gd," out of ",test_images.shape[0],"accuracy is : ",correct_predictions_gd/test_images.shape[0])

SGD :  9317  out of  10000 accuracy is :  0.9317


In [192]:
from prettytable import PrettyTable
t = PrettyTable(['','Adam ', 'RMSProp ','SGD'])
t.add_row(['Accuracy',correct_predictions_adam/test_images.shape[0],correct_predictions/test_images.shape[0],correct_predictions_gd/test_images.shape[0]])
print(t)

+----------+--------+----------+--------+
|          | Adam   | RMSProp  |  SGD   |
+----------+--------+----------+--------+
| Accuracy | 0.9394 |  0.961   | 0.9317 |
+----------+--------+----------+--------+


## From above values RMSProp is giving good accuracy for same number of epochs hence select RMSProp

# Hyperparameter optimization changing the hidden layer from 128 to 256 size

In [215]:
def buildBranchModel_64():
  inpx = Input(shape=(784,))
  x = Dense(64,activation='relu')(inpx)
  x = Dropout(0.1)(x)
  x = Dense(10,activation='relu')(x)
  return Model([inpx],[x])

In [223]:
branchModel_64 = buildBranchModel_64()
input_1 = Input(shape=(784,))
input_2 = Input(shape=(784,))

output_1 = branchModel_64(input_1)
output_2 = branchModel_64(input_2)

distance_layer_64 = DistanceCost()([output_1,output_2])
model_64 = Model([input_1,input_2],distance_layer_64)

rms = RMSprop()

model_64.compile(loss=contrastive_loss,optimizer=rms)

In [230]:
model_64.fit([new_data_set_1,new_data_set_2],new_label,epochs=10,batch_size=128)

Epoch 1/10
  7/125 [>.............................] - ETA: 2s - loss: 3682.5627

  "Even though the tf.config.experimental_run_functions_eagerly "


Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<tensorflow.python.keras.callbacks.History at 0x7f7707884ad0>

In [225]:
image_pool_64 = []
for i in range(0,10):
  image_pool_64.append(train_X[train_y == i][0])
image_pool_64 = np.stack(image_pool_64,axis=0).astype('float64').reshape(-1,28*28)
image_pool_output_64 = []
for img in image_pool_64:
  image_pool_output_64.append(branchModel(img.reshape(1,784)))
image_pool_output_64 = np.stack(image_pool_output_64,axis=0).astype('float64')

In [226]:
def predict_64(test_image):
  pred_1 = branchModel_64(test_image.reshape(1,784))
  dis = K.mean((image_pool_output_64-pred_1)**2,axis=2).numpy()
  return np.argmin(dis)

In [228]:
predictions_64 = []
for img in test_images:
  predictions_64.append(predict(img))
correct_predictions_64 = (predictions_64 == test_y).sum()

In [229]:
print(correct_predictions_64," out of ",test_images.shape[0],"accuracy is : ",correct_predictions_256/test_images.shape[0])


9610  out of  10000 accuracy is :  0.961


## Pros of siamese network 
 - given a few images per class is sufficient for Siamese Networks to recognize those images in the future
 - Siamese focuses on learning embeddings (in the deeper layer) that place the same classes/concepts close together. Hence, can learn semantic similarity.

## Cons of siamese network
 -  Needs more training time than normal networks: Since Siamese Networks involves quadratic pairs to learn from (to see all information available) it is slower than normal classification type of learning(pointwise learning)
 - Doesn’t output probabilities: Since training involves pairwise learning, it won’t output the probabilities of the prediction, but the distance from each class