In [1]:
from keras.layers import Input, Conv2D, Lambda, merge, Dense, Flatten,MaxPooling2D
from keras.models import Model, Sequential
from keras.regularizers import l2
from keras import backend as K
from keras.optimizers import SGD,Adam
from keras.losses import binary_crossentropy
import numpy.random as rng
import numpy as np
import pandas as pd
import os
import pickle
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.utils import shuffle
%matplotlib inline
def W_init(shape,name=None):
    """Initialize weights as in paper"""
    values = rng.normal(loc=0,scale=1e-2,size=shape)
    return K.variable(values,name=name)
#//TODO: figure out how to initialize layer biases in keras.
def b_init(shape,name=None):
    """Initialize bias as in paper"""
    values=rng.normal(loc=0.5,scale=1e-2,size=shape)
    return K.variable(values,name=name)

input_shape = (101, 101, 1)
left_input = Input(input_shape)
right_input = Input(input_shape)
#build convnet to use in each siamese 'leg'
convnet = Sequential()
convnet.add(Conv2D(64,(10,10),activation='relu',input_shape=input_shape,
                   kernel_initializer=W_init,kernel_regularizer=l2(2e-4)))
convnet.add(MaxPooling2D())
convnet.add(Conv2D(128,(7,7),activation='relu',
                   kernel_regularizer=l2(2e-4),kernel_initializer=W_init,bias_initializer=b_init))
convnet.add(MaxPooling2D())
convnet.add(Conv2D(128,(4,4),activation='relu',kernel_initializer=W_init,kernel_regularizer=l2(2e-4),bias_initializer=b_init))
convnet.add(MaxPooling2D())
convnet.add(Conv2D(256,(4,4),activation='relu',kernel_initializer=W_init,kernel_regularizer=l2(2e-4),bias_initializer=b_init))
convnet.add(Flatten())
convnet.add(Dense(1024,activation="sigmoid",kernel_regularizer=l2(1e-3),kernel_initializer=W_init,bias_initializer=b_init))

#call the convnet Sequential model on each of the input tensors so params will be shared
encoded_l = convnet(left_input)
encoded_r = convnet(right_input)
#layer to merge two encoded inputs with the l1 distance between them
L1_layer = Lambda(lambda tensors:K.abs(tensors[0] - tensors[1]))
#call this layer on list of two input tensors.
L1_distance = L1_layer([encoded_l, encoded_r])
prediction = Dense(1,activation='sigmoid',bias_initializer=b_init)(L1_distance)
siamese_net = Model(inputs=[left_input,right_input],outputs=prediction)

optimizer = Adam(0.00006)
#//TODO: get layerwise learning rates and momentum annealing scheme described in paperworking
siamese_net.compile(loss="binary_crossentropy",optimizer=optimizer)

Using TensorFlow backend.


In [2]:
len_x = 59
len_y = 144

In [None]:
id = '01'
ax = [1, 2, 3]
origin_data = []
train_data = []
for a in ax:
    tmp = pd.read_csv("train/extract/{}_{}.csv".format(id, a)).values[6:-18]
    origin_data.append(tmp)
for i in range(int((origin_data[0].shape[0]-len_x)/10)):
    train_data.append([origin_data[0][i*10:i*10+59],
                       origin_data[1][i*10:i*10+59],
                       origin_data[2][i*10:i*10+59]
                      ])
train_data = np.array(train_data)
train_data = train_data.reshape(train_data.shape[0], train_data.shape[2], train_data.shape[3], train_data.shape[1])
print(train_data.shape)

In [3]:
def load_data(id, data_subset="train"):
    ax = [1, 2, 3]
    origin_data = []
    data = []
    for a in ax:
        tmp = pd.read_csv("{}/extract/{}_{}.csv".format(data_subset, id, a)).values[6:-18]
        origin_data.append(tmp)
    for i in range(int((origin_data[0].shape[0]-len_x)/10)):
        data.append([origin_data[0][i*10:i*10+59],
                           origin_data[1][i*10:i*10+59],
                           origin_data[2][i*10:i*10+59]
                          ])
#     data = np.array(data)
#     data = data.reshape(data.shape[0], data.shape[2], data.shape[3], data.shape[1])
    return data

In [None]:
print([3,5]+[2,4])

In [12]:
class Siamese_Loader:
    """For loading batches and testing tasks to a siamese net"""

    def __init__(self, data_subset="train", ids=["01", "03"]):
        self.data = []
        self.categories = {}
        self.info = {}
        tmp = []
        categories = []
        for id in ids:
            X = load_data(id, data_subset)
#             tmp.append(X)
            categories += [i for i in range(len(X))]
            self.data += X
#         for i in range(len(ids)):
#             self.data = np.concatenate((self.data, tmp[i]))
        self.data = np.array(self.data)
#         print(self.data.shape)
        self.data = self.data.reshape(self.data.shape[0], self.data.shape[2], 
                                      self.data.shape[3], self.data.shape[1])
        print(categories)
#         print(self.data.shape)
#         for name in data_subsets:
#             file_path = os.path.join(path, name + ".pickle")
#             print("loading data from {}".format(file_path))
#             with open(file_path, "rb") as f:
#                 (X, c) = pickle.load(f)
#                 self.data[name] = X
#                 self.categories[name] = c

    def get_batch(self, batch_size, s="train"):
        """Create batch of n pairs, half same class, half different class"""
        X = self.data[s]
        n_classes, n_examples, w, h = X.shape

        # randomly sample several classes to use in the batch
        categories = rng.choice(n_classes, size=(batch_size,), replace=False)
        print(categories)
        # initialize 2 empty arrays for the input image batch
        pairs = [np.zeros((batch_size, h, w, 1)) for i in range(2)]
        print(len(pairs))
        print(pairs[1].shape)
        # initialize vector for the targets, and make one half of it '1's, so 2nd half of batch has same class
        targets = np.zeros((batch_size,))
        targets[batch_size // 2:] = 1
        for i in range(batch_size):
            category = categories[i]
            idx_1 = rng.randint(0, n_examples)
            pairs[0][i, :, :, :] = X[category, idx_1].reshape(w, h, 1)
            idx_2 = rng.randint(0, n_examples)
            # pick images of same class for 1st half, different for 2nd
            if i >= batch_size // 2:
                category_2 = category
            else:
                # add a random number to the category modulo n classes to ensure 2nd image has
                # ..different category
                category_2 = (category + rng.randint(1, n_classes)) % n_classes
            pairs[1][i, :, :, :] = X[category_2, idx_2].reshape(w, h, 1)
        return pairs, targets

    def generate(self, batch_size, s="train"):
        """a generator for batches, so model.fit_generator can be used. """
        while True:
            pairs, targets = self.get_batch(batch_size, s)
            yield (pairs, targets)

    def make_oneshot_task(self, N, s="val", language=None):
        """Create pairs of test image, support set for testing N way one-shot learning. """
        X = self.data[s]
        n_classes, n_examples, w, h = X.shape
        indices = rng.randint(0, n_examples, size=(N,))
        if language is not None:
            low, high = self.categories[s][language]
            if N > high - low:
                raise ValueError("This language ({}) has less than {} letters".format(language, N))
            categories = rng.choice(range(low, high), size=(N,), replace=False)

        else:  # if no language specified just pick a bunch of random letters
            categories = rng.choice(range(n_classes), size=(N,), replace=False)
        true_category = categories[0]
        ex1, ex2 = rng.choice(n_examples, replace=False, size=(2,))
        test_image = np.asarray([X[true_category, ex1, :, :]] * N).reshape(N, w, h, 1)
        support_set = X[categories, indices, :, :]
        support_set[0, :, :] = X[true_category, ex2]
        support_set = support_set.reshape(N, w, h, 1)
        targets = np.zeros((N,))
        targets[0] = 1
        targets, test_image, support_set = shuffle(targets, test_image, support_set)
        pairs = [test_image, support_set]

        return pairs, targets

    def test_oneshot(self, model, N, k, s="val", verbose=0):
        """Test average N way oneshot learning accuracy of a siamese neural net over k one-shot tasks"""
        n_correct = 0
        if verbose:
            print("Evaluating model on {} random {} way one-shot learning tasks ...".format(k, N))
        for i in range(k):
            inputs, targets = self.make_oneshot_task(N, s)
            probs = model.predict(inputs)
            if np.argmax(probs) == np.argmax(targets):
                n_correct += 1
        percent_correct = (100.0 * n_correct / k)
        if verbose:
            print("Got an average of {}% {} way one-shot learning accuracy".format(percent_correct, N))
        return percent_correct

    def train(self, model, epochs, verbosity):
        model.fit_generator(self.generate(batch_size),

                            )

In [13]:
train_loader = Siamese_Loader()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]


In [None]:
id = '05'
ax = [1, 2, 3]
data_1 = []
for a in ax:
    tmp = pd.read_csv("test/extract/{}_{}.csv".format(id, a)).values
    print(tmp.shape)