In [13]:
'''
It's from keras demo code for Siamese Network with MNIST data:
https://github.com/NVIDIA/keras/blob/master/examples/mnist_siamese_graph.py
'''
from __future__ import absolute_import
from __future__ import print_function
import numpy as np

import random
from keras.datasets import mnist
from keras.models import Sequential, Model
from keras.layers import Dense, Dropout, Input, Lambda
from keras.optimizers import RMSprop
from keras import backend as K

def euclidean_distance(vects):
    x, y = vects
    return K.sqrt(K.sum(K.square(x - y), axis=1, keepdims=True))

def eucl_dist_output_shape(shapes):
    shape1, shape2 = shapes
    return (shape1[0], 1)

def contrastive_loss(y_true, y_pred):
    '''Contrastive loss from Hadsell-et-al.'06
    http://yann.lecun.com/exdb/publis/pdf/hadsell-chopra-lecun-06.pdf
    '''
    margin = 1
    return K.mean(y_true * K.square(y_pred) + (1 - y_true) * K.square(K.maximum(margin - y_pred, 0)))

def create_pairs(x, digit_indices):
    '''Positive and negative pair creation.
    Alternates between positive and negative pairs.
    '''
    pairs = []
    labels = []
    n = min([len(digit_indices[d]) for d in range(10)]) - 1
    for d in range(10):
        for i in range(n):
            z1, z2 = digit_indices[d][i], digit_indices[d][i + 1]
            pairs += [[x[z1], x[z2]]]
            inc = random.randrange(1, 10)
            dn = (d + inc) % 10
            z1, z2 = digit_indices[d][i], digit_indices[dn][i]
            pairs += [[x[z1], x[z2]]]
            labels += [1, 0]
    return np.array(pairs), np.array(labels)

def create_base_network(img_shape):
    '''Base network to be shared (eq. to feature extraction).
    '''
    seq = Sequential()
    seq.add(Dense(128, input_shape=img_shape, activation='relu'))
    seq.add(Dropout(0.1))
    seq.add(Dense(128, activation='relu'))
    seq.add(Dropout(0.1))
    seq.add(Dense(128, activation='relu'))
    return seq

def compute_accuracy(predictions, labels):
    '''Compute classification accuracy with a fixed threshold on distances.
    '''
    return labels[predictions.ravel() < 0.5].mean()

In [3]:
import numpy as np
import keras

def ImageNet_preprocessing(data_x, input_img_size=(256, 256, 2), output_img_size=(256, 256, 3)):
    new_data_x = np.resize(data_x, (len(data_x), 256, 256, 3))
    return new_data_x

def read_data():
    # write a function to select x_train y_train x_test y_test
    data_x = np.load('image_data_256x256.npy')
    data_y = np.load('labels.npy')

    # concatenate together and shuffle the data
    data_x = np.swapaxes(np.swapaxes(data_x,1,2),2,3)
    data_y = np.array([data_y]).T

    # Expand 2 channels into 3 channels
    # data_x = ImageNet_preprocessing(data_x)

    # shuffle the data first
    from sklearn.utils import shuffle
    data_x, data_y = shuffle(data_x, data_y)

    return data_x, data_y

def cross_validataion_splits(data_x, data_y, test_set_size = 0.2, val_set_size = 0.2, cv_split_size = 10, num_classes = 3):
    from sklearn.cross_validation import train_test_split
    from sklearn.model_selection import KFold
    processed_data = []
    all_index = [_ for _ in range(len(data_x))]

    # cross-validataion data set
    kf = KFold(n_splits=cv_split_size)

    for train, test in kf.split(all_index):
        print("Train size: {}, Test size: {}".format(train.shape, test.shape))
        x_train = data_x[train]
        y_train = data_y[train]
        x_test = data_x[test]
        y_test = data_y[test]

        x_train, x_val, y_train, y_val = train_test_split(x_train, y_train, test_size=val_set_size)
        # Convert class vectors to binary class matrices.
        y_train = keras.utils.to_categorical(y_train, num_classes)
        y_val = keras.utils.to_categorical(y_val, num_classes)
        y_test = keras.utils.to_categorical(y_test, num_classes)

        x_train = x_train.astype('float32')
        x_val = x_val.astype('float32')
        x_test = x_test.astype('float32')
        x_train /= 255
        x_val /= 255
        x_test /= 255

        processed_data.append([(x_train,y_train),(x_val, y_val),(x_test, y_test)])
    print("Cross validation data has been split!")
    return processed_data


In [4]:
data_x, data_y = read_data()
processed_data = cross_validataion_splits(data_x, data_y)



Train size: (249,), Test size: (28,)
Train size: (249,), Test size: (28,)
Train size: (249,), Test size: (28,)
Train size: (249,), Test size: (28,)
Train size: (249,), Test size: (28,)
Train size: (249,), Test size: (28,)
Train size: (249,), Test size: (28,)
Train size: (250,), Test size: (27,)
Train size: (250,), Test size: (27,)
Train size: (250,), Test size: (27,)
Cross validation data has been split!


In [8]:
import numpy as np
(x_train, y_train),(x_val, y_val),(x_test, y_test) = processed_data[0]
print(np.array(x_train).shape, np.array(y_train).shape)
print(np.array(x_val).shape, np.array(y_val).shape)
print(np.array(x_test).shape, np.array(y_test).shape)

(199, 256, 256, 2) (199, 3)
(50, 256, 256, 2) (50, 3)
(28, 256, 256, 2) (28, 3)


In [15]:
input_shape = (256,256,1)

# network definition
base_network = create_base_network(input_shape)
input_a = Input(shape=input_shape)
input_b = Input(shape=input_shape)
# because we re-use the same instance `base_network`,
# the weights of the network
# will be shared across the two branches
processed_a = base_network(input_a)
processed_b = base_network(input_b)

distance = Lambda(euclidean_distance, output_shape=eucl_dist_output_shape)([processed_a, processed_b])
model = Model(input=[input_a, input_b], output=distance)
# train
rms = RMSprop()
model.compile(loss=contrastive_loss, optimizer=rms)

  


In [26]:
print(np.array(processed_data[0][0][0][:,:,:,:1]).shape)

(199, 256, 256, 1)


In [None]:
tr_pairs = (,)
tr_y = processed_data[0][0][1]
val_pairs = (,)
val_y = processed_data[0][1][1]

In [None]:
nb_epoch = 20
model.fit([tr_pairs[0], tr_pairs[1]], tr_y,
          validation_data=([val_pairs[0], val_pairs[1]], val_y),
          batch_size=128,
          nb_epoch=nb_epoch)

In [None]:
te_pairs = (,)
te_y = processed_data[0][2][1]

# compute final accuracy on training and test sets
pred = model.predict([tr_pairs[:, 0], tr_pairs[:, 1]])
tr_acc = compute_accuracy(pred, tr_y)
pred = model.predict([te_pairs[:, 0], te_pairs[:, 1]])
te_acc = compute_accuracy(pred, te_y)

print('* Accuracy on training set: %0.2f%%' % (100 * tr_acc))
print('* Accuracy on test set: %0.2f%%' % (100 * te_acc))