In [None]:
import random
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Activation,Dropout,Dense, Flatten,BatchNormalization,Conv2D,MaxPool2D,Input,Lambda

In [None]:
physical_devices=tf.config.experimental.list_physical_devices('GPU')
print("Num GPUs available: ", len(physical_devices))
for gpu in physical_devices:
    tf.config.experimental.set_memory_growth(gpu, True)

In [None]:
epochs=250
batch_size=256
margin=1 #Margin for constrastive loss

In [None]:
with open('trainx1.npy', 'rb') as f:
    a = np.load(f)
with open('trainx2.npy', 'rb') as f:
    b = np.load(f)
with open('labels.npy', 'rb') as f:
    c = np.load(f)

In [None]:
x_train1,x_val1,x_test1=a[:980000],a[980000:1190000],a[1190000:]
x_train2,x_val2,x_test2=b[:980000],b[980000:1190000],b[1190000:]
labels_train,labels_val,labels_test=c[:980000],c[980000:1190000],c[1190000:]

In [None]:
#Creating the Similarity Measure Code using Euclidean Distance
#Euclidean distance = sqrt(sum(square(t1-t2))) where t1 and t2 are tensors

def euclidean_distance(vects):
    x,y=vects
    sum_square=tf.math.reduce_sum(tf.math.square(x-y),axis=1,keepdims=True)
    return tf.math.sqrt(tf.math.maximum(sum_square,tf.keras.backend.epsilon()))

In [None]:
#Creating the Network
embedding_network=Sequential([
    Conv2D(filters=64,kernel_size=(3,3),activation='relu',kernel_initializer='he_uniform',padding="same", input_shape=(28,28,1)), #32x32x32
    MaxPool2D(pool_size=(3,3),strides=1), #16x16x32
    BatchNormalization(),
    Conv2D(filters=128,kernel_size=(3,3),activation="relu",kernel_initializer='he_uniform',padding="same"), #16x16x64
    MaxPool2D(pool_size=(3,3),strides=2), #8x8x64
    Conv2D(filters=64,kernel_size=(3,3),activation="relu",kernel_initializer='he_uniform',padding="same"),#8x8x64
    MaxPool2D(pool_size=(2,2),strides=2), #4x4x128
    Conv2D(filters=32,kernel_size=(3,3),activation="relu",kernel_initializer='he_uniform',padding="same"),#8x8x64
    MaxPool2D(pool_size=(2,2),strides=2), #4x4x128
    Flatten(),
    BatchNormalization(),
    Dense(units=2,activation="tanh"),
])

embedding_network.summary()

In [None]:
input1=Input((28,28,1))
input2=Input((28,28,1))

tower_1=embedding_network(input1)
tower_2=embedding_network(input2)


merge_layer=Lambda(euclidean_distance)([tower_1,tower_2])
normal_layer=BatchNormalization()(merge_layer)
output_layer=Dense(1,activation='sigmoid')(normal_layer)
siamese=keras.Model(inputs=[input1,input2],outputs=output_layer)

In [None]:
def loss(margin=1):
    """Margin is an Integer which defines the baseline for distance for which pairs should be classified as dissimilar
    """
    def contrastive_loss(y_true,y_pred):
        """Calc the loss as a floating point value"""
        square_pred=tf.math.square(y_pred)
        margin_square=tf.math.square(tf.math.maximum(margin-(y_pred),0))
        return tf.math.reduce_mean((1-y_true)*square_pred+(y_true)*margin_square)
    return contrastive_loss

In [None]:
opt = keras.optimizers.Adam(learning_rate=0.001)
#siamese.compile(loss=loss(margin=margin),optimizer=opt,metrics=["accuracy"])
siamese.compile(loss="binary_crossentropy",optimizer="Adam",metrics=["accuracy"])
siamese.summary()

In [None]:
history= siamese.fit(
    [x_train1,x_train2],
    labels_train,
    validation_data=([x_val1,x_val2],labels_val),
    batch_size=batch_size,
    epochs=epochs
)

In [None]:
results=siamese.evaluate([x_test1,x_test2],labels_test)
print("test loss,test accuracy",results)

In [None]:
siamese.save('siamese.h5')
embedding_network.save('embed.h5')
path_siamese='Weights_folder/Weights_siamese'
path_model='Weights_folder/Weights_model'
 
# save
embedding_network.save_weights(path_model)
siamese.save_weights(path_siamese)

In [None]:
#Saving the Accuracy and Loss data
with open('train_acc.npy', 'wb') as f:
    np.save(f, history.history["accuracy"])
    print("Train accuracy")

with open('val_acc.npy', 'wb') as f:
    np.save(f, history.history["val_accuracy"])
    print("Validation accuracy")
    
with open('train_loss.npy', 'wb') as f:
    np.save(f, history.history["loss"])
    print("Train Loss")

with open('val_loss.npy', 'wb') as f:
    np.save(f, history.history["val_loss"])
    print("Validation loss")
