In [1]:
from keras.models import Sequential,Input
from keras.layers import Dense,Conv2D,MaxPooling2D,Dropout,Flatten,Lambda
from keras import backend as K
from keras.models import Model
from sklearn.utils import shuffle
import numpy.random as rng
import numpy as np
from tensorflow.keras.optimizers import Adam


Using TensorFlow backend.


In [2]:
def initialize_weights(shape, name=None):
    return np.random.normal(loc = 0.0, scale = 1e-2, size = shape)

def initialize_bias(shape, name=None):
    return np.random.normal(loc = 0.5, scale = 1e-2, size = shape)


In [3]:
def siamese_model(input_shape):

    left_input = Input(input_shape)
    right_input = Input(input_shape)

    model = Sequential()
    model.add(Conv2D(2622, (1, 1), activation='relu', input_shape=(input_shape)))
    #model.add(MaxPooling2D())

    model.add(Conv2D(2622, (1, 1), activation='relu'))
    #model.add(MaxPooling2D())
    model.add(Dropout(0.25))

    model.add(Conv2D(512, (1, 1), activation='relu'))
    #model.add(MaxPooling2D())
    model.add(Dropout(0.25))

    model.add(Flatten())
    model.add(Dense(512, activation='relu'))

    encoded_l = model(left_input)
    encoded_r = model(right_input)
    
    L1_layer = Lambda(lambda tensors:K.abs(tensors[0] - tensors[1]))
    L1_distance = L1_layer([encoded_l, encoded_r])
    
    # Add a dense layer with a sigmoid unit to generate the similarity score
    prediction = Dense(1,activation='sigmoid')(L1_distance)
    
    # Connect the inputs with the outputs
    siamese_net = Model(inputs=[left_input,right_input],outputs=prediction)
    
    # return the model
    return siamese_net



In [4]:
input_shape = (2622,1,1)
model = siamese_model(input_shape)
model.summary()

Model: "model_1"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            (None, 2622, 1, 1)   0                                            
__________________________________________________________________________________________________
input_2 (InputLayer)            (None, 2622, 1, 1)   0                                            
__________________________________________________________________________________________________
sequential_1 (Sequential)       (None, 512)          695567806   input_1[0][0]                    
                                                                 input_2[0][0]                    
__________________________________________________________________________________________________
lambda_1 (Lambda)               (None, 512)          0           sequential_1[1][0]         

In [5]:
optimizer = Adam(lr = 0.00006)
model.compile(loss="binary_crossentropy",optimizer=optimizer)

In [6]:
from numpy import load

In [7]:
training_array=np.load('Training_array.npy')

In [8]:
#training_array=training_array.reshape(5,10,2622)

In [9]:
from sklearn.model_selection import train_test_split

In [10]:
train_class=[1,2,3,4,5]

In [11]:
X_train,X_val,train_classes,val_classes=train_test_split(training_array,train_class, test_size = 0.1,random_state=42)

In [12]:
def get_batch(batch_size,s="train"):
    """Create batch of n pairs, half same class, half different class"""
    if s == 'train':
        X = X_train
        categories = train_classes
    else:
        X = X_val
        categories = val_classes

    n_classes, n_examples,w, feature_length = X.shape

    # randomly sample several classes to use in the batch
    categories = rng.choice(n_classes,size=(batch_size,),replace=True)
    
    # initialize 2 empty arrays for the input image batch
    pairs=[np.zeros((batch_size, feature_length,w,1)) for i in range(2)]
    
    # initialize vector for the targets
    targets=np.zeros((batch_size,))
    
    # make one half of it '1's, so 2nd half of batch has same class
    targets[batch_size//2:] = 1
    for i in range(batch_size):
        category = categories[i]
        idx_1 = rng.randint(0, n_examples)
        pairs[0][i,:,:] = X[category, idx_1].reshape(feature_length,w, 1)   #********check if no of ":" are correct
        idx_2 = rng.randint(0, n_examples)
        
        # pick images of same class for 1st half, different for 2nd
        if i >= batch_size // 2:
            category_2 = category  
        else: 
            # add a random number to the category modulo n classes to ensure 2nd image has a different category
            category_2 = (category + rng.randint(1,n_classes)) % n_classes
        
        pairs[1][i,:,:] = X[category_2,idx_2].reshape(feature_length,w,1)#********check if no of ":" are correct
    
    return pairs, targets


In [13]:
def generate(batch_size, s="train"):
    """a generator for batches, so model.fit_generator can be used. """
    while True:
        pairs, targets = get_batch(batch_size,s)
        yield (pairs, targets)


In [14]:
def make_oneshot_task(N, s="val"):
    """Create pairs of test image, support set for testing N way one-shot learning. """
    if s == 'train':
        X = Xtrain
        categories = train_classes
    else:
        X = Xval
        categories = val_classes

    n_classes, n_examples, w , feature_length = X.shape
    
    indices = rng.randint(0, n_examples,size=(N,))


    categories = rng.choice(range(n_classes),size=(N,),replace=False)     
    true_category = categories[0]

    ex1, ex2 = rng.choice(n_examples,replace=False,size=(2,))
    test_image = np.asarray([X[true_category,ex1,:]]*N).reshape(N,w, feature_length,1)   #********check if no of ":" are correct

    support_set = X[categories,indices,:] #********check if no of ":" are correct
    support_set[0,:,:] = X[true_category,ex2]
    support_set = support_set.reshape(N,w, feature_length,1)
    targets = np.zeros((N,))
    targets[0] = 1
    targets, test_image, support_set = shuffle(targets, test_image, support_set)
    pairs = [test_image,support_set]

    return pairs, targets

In [15]:
def test_oneshot(model, N, k, s = "val", verbose = 0):
    """Test average N way oneshot learning accuracy of a siamese neural net over k one-shot tasks"""
    n_correct = 0
    if verbose:
        print("Evaluating model on {} random {} way one-shot learning tasks ... \n".format(k,N))
    for i in range(k):
        inputs, targets = make_oneshot_task(N,s)
        probs = model.predict(inputs)
        if np.argmax(probs) == np.argmax(targets):
            n_correct+=1
    percent_correct = (100.0 * n_correct / k)
    if verbose:
        print("Got an average of {}% {} way one-shot learning accuracy \n".format(percent_correct,N))
    return percent_correct



In [16]:
# Hyper parameters
evaluate_every = 200 # interval for evaluating on one-shot tasks
batch_size = 32
n_iter = 2000 # No. of training iterations
N_way = 3 # how many classes for testing one-shot tasks
n_val = 250 # how many one-shot tasks to validate on
best = -1

In [None]:
model_path = './weights/'
import time

print("Starting training process!")
print("-------------------------------------")
t_start = time.time()
for i in range(1, n_iter+1):
    (inputs,targets) = get_batch(batch_size)
    loss = model.train_on_batch(inputs, targets)
    if i % evaluate_every == 0:
        print("\n ------------- \n")
        print("Time for {0} iterations: {1} mins".format(i, (time.time()-t_start)/60.0))
        print("Train Loss: {0}".format(loss)) 
        val_acc = test_oneshot(model, N_way, n_val, verbose=True)
        model.save_weights(os.path.join(model_path, 'weights.{}.h5'.format(i)))
        if val_acc >= best:
            print("Current best: {0}, previous best: {1}".format(val_acc, best))
            best = val_acc


model.load_weights(os.path.join(model_path, "weights.20000.h5"))


Starting training process!
-------------------------------------
