In [None]:
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Flatten, Dense, Dropout,Activation, Lambda,LeakyReLU,ReLU
from tensorflow.keras.layers import Conv2D,MaxPooling2D,BatchNormalization,LayerNormalization,GlobalMaxPooling2D
from tensorflow.keras.optimizers import RMSprop, Adam
from tensorflow.keras.activations import sigmoid
from tensorflow.keras.datasets import fashion_mnist
from tensorflow.python.keras.utils.vis_utils import plot_model
from tensorflow.keras.models import Sequential
from tensorflow.keras.applications.vgg16 import VGG16
from tensorflow.keras import backend as K
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
from matplotlib.pyplot import imread
import pandas as pd
import numpy as np
import os

from image_utils import get_pairs,get_dataset_info,estimate_dataset


In [None]:
gpus = tf.config.list_physical_devices('GPU')
tf.config.experimental.set_memory_growth(gpus[0], True)

## Define images main information

In [None]:
main_folder_name='datasets/sof_preproc'
get_dataset_info(main_folder_name)

## Define dataset parametrs 

In [None]:
image_shape=(128,128,3)
max_positive_pairs_count=20000
max_negative_pairs_count=20000
estimate_dataset(main_folder_name,
                 max_positive_pairs_count,
                 max_negative_pairs_count)

## Preprocces the images

In [None]:
pairs,labels=get_pairs(main_folder_name,
                       max_positive_pairs_count,
                       max_negative_pairs_count)

In [None]:
pairs=pairs.astype(np.float32)
labels=labels.astype(np.float32)

train_pairs,test_pairs,train_labels,test_labels=train_test_split(pairs,labels,test_size=0.05)
del pairs
del labels

## Siamese pecularity

As siamese network measure distance between we define Euclidean distance Lambda layer and special contrastive loss ( https://arxiv.org/abs/2011.02803 )

In [None]:
def euclidean_distance(vects):
    x, y = vects
    sum_square = K.sum(K.square(x - y), axis=1, keepdims=True)
    return K.sqrt(K.maximum(sum_square, K.epsilon()))


def eucl_dist_output_shape(shapes):
    shape1, shape2 = shapes
    return (shape1[0], 1)

def contrastive_loss_with_margin(margin):
    def contrastive_loss(y_true, y_pred):
    
        square_pred = K.square(y_pred)
        margin_square = K.square(K.maximum(margin - y_pred, 0))
        return K.mean( (1 - y_true) * square_pred+y_true*margin_square)
    
    return contrastive_loss

In [None]:
def get_model():
    #
    #   it is also good idea to use VGG model, but it requires powerfull GPU
    #   model = Sequential(VGG16(weights='imagenet', include_top=False, input_shape=image_shape).layers)
    #
    model=Sequential()
    
    model.add(Conv2D(32,(3,3),input_shape=image_shape))
    model.add(BatchNormalization())
    model.add(Conv2D(32,(3,3)))
    model.add(MaxPooling2D((2,2)))
        
    model.add(Conv2D(64,(3,3)))
    model.add(BatchNormalization())
    model.add(Conv2D(64,(3,3)))
    model.add(MaxPooling2D((2,2)))

    model.add(Conv2D(128,(3,3)))
    model.add(BatchNormalization())
    model.add(Conv2D(128,(3,3)))
    model.add(MaxPooling2D((3,3)))
    
    model.add(Conv2D(256,(3,3)))
    model.add(BatchNormalization())
    model.add(Conv2D(256,(3,3)))
    model.add(MaxPooling2D((3,3)))
    
    model.add(Flatten())
    model.add(Dense(512))
    model.add(LeakyReLU(0.2))
    model.add(Dense(256))
    model.add(LeakyReLU(0.2))

    return model

In [None]:
#base_network = get_model()
resnet = tf.keras.applications.ResNet50V2(weights = 'imagenet', include_top = False, input_shape = (128,128,3))
x = Flatten()(resnet.output)
x = Dense(512, activation='relu')(x)
x = LeakyReLU(0.2)(x)
x = Dense(256, activation='relu')(x)
output = LeakyReLU(0.2)(x)


base_model = Model(inputs = resnet.input, outputs = output)

base_model.summary()

## Define siamese model

To provide weights sycronization we define left and right inputs with the same model

In [None]:
input_a = Input(shape=image_shape)
vect_output_a = base_model(input_a)

input_b = Input(shape=image_shape)
vect_output_b = base_model(input_b)

x = Lambda(euclidean_distance, output_shape=eucl_dist_output_shape)([vect_output_a, vect_output_b])
output= Dense(1,activation='sigmoid')(x)

# specify the inputs and output of the model
model = Model([input_a, input_b], output)

In [None]:
del model

In [None]:
model.summary()

## Model training

In [None]:

optim = RMSprop(  learning_rate=0.0001)
#optim = Adam(  learning_rate=0.015)
model.compile(loss=contrastive_loss_with_margin(margin=1), optimizer=optim)
history = model.fit([train_pairs[:,0],train_pairs[:,1]], 
                    train_labels, 
                    epochs=17, 
                    batch_size=64,
                    validation_split=0.2)

## Save weights and history

In [None]:
saved_path='saved data'

weights_name='human_weight.h5'
history_name='human_history.json'

hist_json_file = saved_path+'/'+history_name
weights_file=saved_path+'/' +weights_name

base_model.save_weights(weights_file)
hist_df = pd.DataFrame(history.history)  

with open(hist_json_file, mode='w') as f:
    hist_df.to_json(f)

## Load weights and history

In [None]:
saved_path='saved data'

weights_name='human_weight.h5'
history_name='human_history.json'

hist_json_file = saved_path+'/'+history_name
weights_file=saved_path+'/' +weights_name

base_model.load_weights(weights_file)
loaded_history=pd.read_json(hist_json_file)

## Loss plot

In [None]:
def plot_loss(loaded_history,title):
    loss=loaded_history['loss']
    val_loss=loaded_history['val_loss']
    x=range(len(loss))
    X=15
    Y=8
    plt.figure(figsize=(X,Y))
    plt.plot(x,loss,'b',x,val_loss,'g')
    plt.title(title, fontsize=20, fontname='Times New Roman')
    plt.legend(['loss','val_loss'])
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.show()

In [None]:
title='sof losses per epoch'
plot_loss(loaded_history,title)

## Model testing

To compute accuracy of the model define rule: if predicted value is bigger, then 0.5, the result is 1, else 0

In [None]:
def compute_accuracy(y_true, y_pred):
    '''Compute classification accuracy with a fixed threshold on distances.
    '''
    pred = y_pred.ravel() > 0.5
    return round(np.mean(pred == y_true),4)

In [None]:
loss = round(model.evaluate(x=[test_pairs[:,0],test_pairs[:,1]], y=test_labels),4)

y_pred_train = model.predict([test_pairs[:,0], test_pairs[:,1]])
train_accuracy = compute_accuracy(test_labels, y_pred_train)

y_pred_test = model.predict([test_pairs[:,0], test_pairs[:,1]])
test_accuracy = compute_accuracy(test_labels, y_pred_test)

print("Loss = {}, Train Accuracy = {} Test Accuracy = {}".format(loss, train_accuracy, test_accuracy))