# Siamese Convolutional Neural Network
The main model trained on the datasets.

In [14]:
import os
import gc
import time
import math
import numpy as np
np.random.seed(0)
import matplotlib.pyplot as plt
%matplotlib inline
from pylab import *
from keras.models import Sequential
from keras.optimizers import Adam
from keras.layers import Conv2D, ZeroPadding2D, Activation, Input, concatenate
from keras.models import Model
from keras.datasets import mnist

from keras.layers.normalization import BatchNormalization
from keras.layers.pooling import MaxPooling2D
from keras.layers.merge import Concatenate
from keras.layers.core import Lambda, Flatten, Dense
from keras.initializers import glorot_uniform, he_uniform

from keras.engine.topology import Layer
from keras.regularizers import l2
from keras import backend as keras_backend
from keras.utils import plot_model, normalize

from sklearn.metrics import roc_curve, roc_auc_score

import import_ipynb
import create_dataset  # own module

In [15]:
def free_ram(a):
    """Function to free ram by deleting variables.

    Args:
        a : variable to be deleted.
    """
    a = []
    del a[:]
    del a
    gc.collect()

In [None]:
english_path = "data\\CEDAR"
bengali_path = "data\\BHSig260-Bengali"
hindi_path = "data\\BHSig260-Hindi"
PATHS = [english_path, bengali_path, hindi_path]

english_classes = 55
bengali_classes = 100
hindi_classes = 160
CLASSES = [english_classes, bengali_classes, hindi_classes]

SIZE = 224
input_shape = (224, 224, 1)

In [None]:
DATASETS = []
ORIGINALS = []

for i, j in zip(PATHS, CLASSES):
    data_train, data_test, x_train_origin, y_train_origin, x_test_origin, y_test_origin = create_dataset.build_dataset(i, j)

    DATASETS.append([data_train, data_test])
    ORIGINALS.append([x_train_origin, y_train_origin, x_test_origin, y_test_origin])
    print()

In [None]:
for data in DATASETS:
    print("\n", DATASETS.index(data))
    print("Checking shapes for class 0 (train) : ", data[0][0].shape)
    print("Checking shapes for class 0 (test) : ", data[1][0].shape)

    draw_pics(data[0][0], nb=3)

In [None]:
x = [data_train, data_test, x_train_origin, y_train_origin, x_test_origin, y_test_origin]
for i in x:
    free_ram(i)
free_ram(x)

In [None]:
def build_network(input_shape, embeddingsize):
    """Define the neural network to learn image similarity.

    Args:
        input_shape -- tuple : shape of input images.
        embeddingsize -- int : vector size used to encode our picture.
    """
    # Convolutional Neural Network
    network = Sequential()
    network.add(Conv2D(128, (7, 7), activation='relu',
                       input_shape=input_shape,
                       kernel_initializer='he_uniform',
                       kernel_regularizer=l2(2e-4)))
    network.add(MaxPooling2D())

    network.add(Conv2D(128, (3, 3), activation='relu',
                       kernel_initializer='he_uniform',
                       kernel_regularizer=l2(2e-4)))
    network.add(MaxPooling2D())

    network.add(Conv2D(256, (3, 3), activation='relu',
                       kernel_initializer='he_uniform',
                       kernel_regularizer=l2(2e-4)))
    network.add(Flatten())

    network.add(Dense(4096, activation='relu',
                      kernel_regularizer=l2(1e-3),
                      kernel_initializer='he_uniform'))

    network.add(Dense(embeddingsize, activation=None,
                      kernel_regularizer=l2(1e-3),
                      kernel_initializer='he_uniform'))

    # Force the encoding to live on the d-dimentional hypershpere
    network.add(Lambda(lambda x: keras_backend.l2_normalize(x, axis=-1)))

    return network

In [None]:
class TripletLossLayer(Layer):
    """Class to calculate triplet loss.
    """
    def __init__(self, alpha, **kwargs):
        """Constructor for TripletLossLayer class."""
        self.alpha = alpha
        super(TripletLossLayer, self).__init__(**kwargs)

    def triplet_loss(self, inputs):
        """Module to calculate triplet loss."""
        anchor, positive, negative = inputs
        p_dist = keras_backend.sum(keras_backend.square(anchor-positive), axis=-1)
        n_dist = keras_backend.sum(keras_backend.square(anchor-negative), axis=-1)

        return keras_backend.sum(keras_backend.maximum(p_dist - n_dist + self.alpha, 0), axis=0)

    def call(self, inputs):
        """Module to return loss."""
        loss = self.triplet_loss(inputs)
        self.add_loss(loss)
    
        return loss

In [None]:
def build_model(input_shape, network, margin=0.2):
    """Define the Keras Model for training.
    
    Args:
        input_shape -- tuple : shape of input images.
        network -- CNN : Neural network to train outputing embeddings.
        margin -- float : minimal distance between Anchor-Positive and
                          Anchor-Negative for the lossfunction (alpha).
    """
    # Define the tensors for the three input images
    anchor_input = Input(input_shape, name="anchor_input")
    positive_input = Input(input_shape, name="positive_input")
    negative_input = Input(input_shape, name="negative_input")

    # Generate the encodings (feature vectors) for the three images
    encoded_a = network(anchor_input)
    encoded_p = network(positive_input)
    encoded_n = network(negative_input)

    # TripletLoss Layer
    loss_layer = TripletLossLayer(alpha=margin, name='triplet_loss_layer')([
        encoded_a, encoded_p, encoded_n])

    # Connect the inputs with the outputs
    network_train = Model(inputs=[anchor_input, positive_input, negative_input], outputs=loss_layer)

    # return the model
    return network_train

In [None]:
def get_batch_random(batch_size, s="train"):
    """Create batch of APN triplets with a complete random strategy.

    Args:
        batch_size -- int : the batch size.

    Returns:
        triplets -- list : list containing 3 tensors Anchor, Positive,
                           Negative of shape (batch_size, w, h, c).
    """
    if s == 'train':
        X = dataset_train
    else:
        X = dataset_test

    m, w, h, c = X[0].shape

    # initialize result
    triplets = [np.zeros((batch_size, h, w, c)) for i in range(3)]
    
    '''
    if '-G-' or 'original' in img_path:
        labels.append(np.array(1))
    else:
        labels.append(np.array(0))
    '''

    for i in range(batch_size):
        # Pick one random class for anchor
        anchor_class = np.random.randint(0, nb_classes)
        nb_sample_available_for_class_AP = X[anchor_class].shape[0]

        # Pick two different random pics for this class => A and P
        [idx_A, idx_P] = np.random.choice(nb_sample_available_for_class_AP, size=2, replace=False)

        # Pick another class for N, different from anchor_class
        negative_class = (anchor_class + np.random.randint(1, nb_classes)) % nb_classes
        nb_sample_available_for_class_N = X[negative_class].shape[0]

        # Pick a random pic for this negative class => N
        idx_N = np.random.randint(0, nb_sample_available_for_class_N)

        triplets[0][i, :, :, :] = X[anchor_class][idx_A, :, :, :]
        triplets[1][i, :, :, :] = X[anchor_class][idx_P, :, :, :]
        triplets[2][i, :, :, :] = X[negative_class][idx_N, :, :, :]

    return triplets

In [None]:
def compute_dist(a, b):
    return np.sum(np.square(a-b))

In [None]:
def get_batch_hard(draw_batch_size, hard_batchs_size, norm_batchs_size, network, s="train"):
    """Create batch of APN "hard" triplets.

    Args:
        draw_batch_size -- int : number of initial randomly taken samples.
        hard_batchs_size -- int : select the number of hardest samples to keep.
        norm_batchs_size -- int : number of random samples to add.

    Returns:
        triplets -- list : containing 3 tensors Anchor, Positive, Negative of
                           shape (hard_batchs_size+norm_batchs_size, w, h, c).
    """
    if s == 'train':
        X = dataset_train
    else:
        X = dataset_test

    m, w, h, c = X[0].shape

    # Step 1 : pick a random batch to study
    studybatch = get_batch_random(draw_batch_size, s)

    # Step 2 : compute the loss with current network : d(A,P)-d(A,N).
    # The alpha parameter here is omited here since we want only to order them
    studybatchloss = np.zeros((draw_batch_size))

    # Compute embeddings for anchors, positive and negatives
    A = network.predict(studybatch[0])
    P = network.predict(studybatch[1])
    N = network.predict(studybatch[2])

    # Compute d(A,P)-d(A,N)
    studybatchloss = np.sum(np.square(A-P), axis=1) - \
        np.sum(np.square(A-N), axis=1)

    # Sort by distance (high distance first) and take the
    selection = np.argsort(studybatchloss)[::-1][:hard_batchs_size]

    # Draw other random samples from the batch
    selection2 = np.random.choice(np.delete(np.arange(draw_batch_size), selection),
                                            norm_batchs_size,
                                            replace=False)

    selection = np.append(selection, selection2)

    triplets = [studybatch[0][selection, :, :, :],
                studybatch[1][selection, :, :, :],
                studybatch[2][selection, :, :, :]]

    return triplets

In [None]:
def draw_triplets(tripletbatch, nbmax=None):
    """Display the three images for each triplets in the batch.

    Args:
        tripletbatch -- : image batch of triplets.
        nbmax -- int/None : number of batches to be displayed,
                            if set to None will show all batches.
    """
    labels = ["Anchor", "Positive", "Negative"]

    if (nbmax == None):
        nbrows = tripletbatch[0].shape[0]
    else:
        nbrows = min(nbmax, tripletbatch[0].shape[0])

    for row in range(nbrows):
        fig = plt.figure(figsize=(16, 2))

        for i in range(3):
            subplot = fig.add_subplot(1, 3, i+1)
            axis("off")
            plt.imshow(tripletbatch[i][row, :, :, 0], vmin=0, vmax=1, cmap='Greys')
            subplot.title.set_text(labels[i])

## Evaluation

In [None]:
def compute_probs(network, X, Y):
    """Compute probs.

    Args:
        network : current NN to compute embeddings.
        X : tensor of shape (m,w,h,1) containing pics to evaluate.
        Y : tensor of shape (m,) containing true class.

    Returns:
        probs : array of shape (m, m) containing distances.
    """
    m = X.shape[0]
    nbevaluation = int(m*(m-1)/2)
    probs = np.zeros((nbevaluation))
    y = np.zeros((nbevaluation))

    # Compute all embeddings for all pics with current network
    embeddings = network.predict(X)

    size_embedding = embeddings.shape[1]

    # For each pics of our dataset
    k = 0
    for i in range(m):
        # Against all other images
        for j in range(i+1, m):
            # Compute the probability of being the right decision : it should be 1 for right class, 0 for all other classes
            probs[k] = -compute_dist(embeddings[i, :], embeddings[j, :])
            if (Y[i] == Y[j]):
                y[k] = 1
                #print("{3}:{0} vs {1} : {2}\tSAME".format(i,j,probs[k],k))
            else:
                y[k] = 0
                #print("{3}:{0} vs {1} : \t\t\t{2}\tDIFF".format(i,j,probs[k],k))
            k += 1
    return probs, y
#probs,yprobs = compute_probs(network,x_test_origin[:10,:,:,:],y_test_origin[:10])

In [None]:
def compute_metrics(probs, yprobs):
    """ Computer metrics.

    Args:
        probs -- :
        yprobs -- :

    Returns:
        fpr : Increasing false positive rates such that element i is the false positive rate 
              of predictions with score >= thresholds[i].
        tpr : Increasing true positive rates such that element i is the true positive rate 
              of predictions with score >= thresholds[i].
        thresholds : Decreasing thresholds on the decision function used to compute fpr and tpr.
                     thresholds[0] represents no instances being predicted and is arbitrarily set to max(y_score) + 1
        auc : Area Under the ROC Curve metric.
    """
    # calculate AUC
    auc = roc_auc_score(yprobs, probs)
    # calculate roc curve
    fpr, tpr, thresholds = roc_curve(yprobs, probs)

    return fpr, tpr, thresholds, auc

In [None]:
def compute_interdist(network):
    """Computes sum of distances between all classes embeddings on our reference test image.

    Distances are calculated as such:
    d(0,1) + d(0,2) + ... + d(0,9) + d(1,2) + d(1,3) + ... d(8,9)
    A good model should have a large distance between all theses embeddings.

    Args:
        network -- CNN : current CNN.

    Returns:
        res -- np.array : array of shape (nb_classes,nb_classes).
    """
    res = np.zeros((nb_classes, nb_classes))

    ref_images = np.zeros((nb_classes, img_rows, img_cols, 1))

    # generates embeddings for reference images
    for i in range(nb_classes):
        ref_images[i, :, :, :] = dataset_test[i][0, :, :, :]
    ref_embeddings = network.predict(ref_images)

    for i in range(nb_classes):
        for j in range(nb_classes):
            res[i, j] = compute_dist(ref_embeddings[i], ref_embeddings[j])
    return res

In [None]:
def draw_interdist(network, n_iteration):
    """Plots embeddings distance from each other after n iterations.

    Args:
        network -- CNN : current CNN.
        n_iteraction -- int : current iteration.
    """
    interdist = compute_interdist(network)

    data = []
    for i in range(nb_classes):
        data.append(np.delete(interdist[i, :], [i]))

    fig, ax = plt.subplots()
    ax.set_title("Evaluating embeddings distance from each other after {0} iterations".format(n_iteration))
    ax.set_ylim([0, 3])
    plt.xlabel('Classes')
    plt.ylabel('Distance')
    ax.boxplot(data, showfliers=False, showbox=True)
    locs, labels = plt.xticks()
    plt.xticks(locs, np.arange(nb_classes))

    plt.show()

In [None]:
def find_nearest(array, value):
    """Find nearest value.

    Args:
        array -- :
        value -- :
    
    Returns:
        array[idx-1] -- np.array : nearest value from array.
        idx -- int : index.
    """
    idx = np.searchsorted(array, value, side="left")
    if idx > 0 and (idx == len(array) or math.fabs(value - array[idx-1]) < math.fabs(value - array[idx])):
        return array[idx-1], idx-1
    else:
        return array[idx], idx

In [None]:
def draw_roc(fpr, tpr, thresholds):
    """Draw ROC under curve.

    Args:
        fpr -- :
        tpr -- :
        thresholds -- :
    """
    # Find threshold
    targetfpr = 1e-3
    _, idx = find_nearest(fpr, targetfpr)
    threshold = thresholds[idx]
    recall = tpr[idx]

    # Plot no skill
    plt.plot([0, 1], [0, 1], linestyle='--')
    # Plot the roc curve for the model
    plt.plot(fpr, tpr, marker='.')
    plt.title('AUC: {0:.3f}\nSensitivity : {2:.1%} @FPR={1:.0e}\nThreshold={3})'.format(
        auc, targetfpr, recall, abs(threshold)))
    # Show the plot
    plt.show()

In [None]:
def draw_test_image(network, images, refidx=0):
    """
    Args:
        network -- CNN :
        images -- :
        refidx -- :

    Returns:
        scores : resultat des scores de similarités avec les images de base => (N)
    """
    N = 4
    _, w, h, c = dataset_test[0].shape
    nbimages = images.shape[0]

    # generates embedings for given images
    image_embedings = network.predict(images)

    # generates embedings for reference images
    ref_images = np.zeros((nb_classes, w, h, c))
    for i in range(nb_classes):
        ref_images[i, :, :, :] = dataset_test[i][refidx, :, :, :]
    ref_embedings = network.predict(ref_images)

    for i in range(nbimages):
        # Prepare the figure
        fig = plt.figure(figsize=(16, 2))
        subplot = fig.add_subplot(1, nb_classes+1, 1)
        axis("off")
        plotidx = 2

        # Draw this image
        plt.imshow(images[i, :, :, 0], vmin=0, vmax=1, cmap='Greys')
        subplot.title.set_text("Test image")

        for ref in range(nb_classes):
            # Compute distance between this images and references
            dist = compute_dist(image_embedings[i, :], ref_embedings[ref, :])
            # Draw
            subplot = fig.add_subplot(1, nb_classes+1, plotidx)
            axis("off")
            plt.imshow(ref_images[ref, :, :, 0], vmin=0, vmax=1, cmap='Greys')
            subplot.title.set_text(("Class {0}\n{1:.3e}".format(ref, dist)))
            plotidx += 1

## Putting it all together

In [None]:
# Hyper parameters
evaluate_every = 1000  # Interval for evaluating on one-shot tasks
batch_size = 32
n_iter = 80000  # No. of training iterations
n_val = 250  # How many one-shot tasks to validate on

In [None]:
# # Testing on an untrained network
# probs, yprob = compute_probs(network, x_test_origin[:500, :, :, :], y_test_origin[:500])
# fpr, tpr, thresholds, auc = compute_metrics(probs, yprob)
# draw_roc(fpr, tpr, thresholds)
# draw_interdist(network, n_iteration)

In [None]:
# for i in range(3):
#     draw_test_image(network, np.expand_dims(dataset_train[i][0, :, :, :], axis=0))

In [None]:
# triplets = get_batch_random(2)
# print("Checking batch width, should be 3 : ", len(triplets))
# print("Shapes in the batch A:{0} P:{1} N:{2}".format(triplets[0].shape, triplets[1].shape, triplets[2].shape))
# draw_triplets(triplets)

# hardtriplets = get_batch_hard(50, 1, 1, network)
# print("Checking hard batch width, should be 3 : ", len(hardtriplets))
# print("Shapes in the hardbatch A:{0} P:{1} N:{2}".format(hardtriplets[0].shape, hardtriplets[1].shape, hardtriplets[2].shape))
# draw_triplets(hardtriplets)

In [None]:
network_train = build_model(input_shape, build_network(input_shape, embeddingsize=10))

optimizer = Adam(lr=0.00006)
network_train.compile(loss=None, optimizer=optimizer)
network_train.summary()

plot_model(network_train,
           show_shapes=True,
           show_layer_names=True,
           to_file='Model Plot.png')

print(network_train.metrics_names)
n_iteration = 0
network_train.load_weights('mnist-160k_weights.h5')

In [None]:
print("Starting training process!")
print("-------------------------------------")
t_start = time.time()
dummy_target = [np.zeros((batch_size, 15)) for i in range(3)]

for i in range(1, n_iter+1):
    triplets = get_batch_hard(200, 16, 16, network)
    loss = network_train.train_on_batch(triplets, None)
    n_iteration += 1
    if i % evaluate_every == 0:
        print("\n ------------- \n")
        print("[{3}] Time for {0} iterations: {1:.1f} mins, Train Loss: {2}".format(i, (time.time()-t_start)/60.0, loss, n_iteration))
        probs, yprob = compute_probs(network, x_test_origin[:n_val, :, :, :], y_test_origin[:n_val])
        #fpr, tpr, thresholds,auc = compute_metrics(probs,yprob)
        #draw_roc(fpr, tpr)

In [None]:
# Full evaluation
probs, yprob = compute_probs(network, x_test_origin, y_test_origin)
fpr, tpr, thresholds, auc = compute_metrics(probs, yprob)
draw_roc(fpr, tpr, thresholds)
draw_interdist(network, n_iteration)

In [None]:
for i in range(3):
    draw_test_image(network, np.expand_dims(dataset_train[i][0, :, :, :], axis=0))