In [None]:
import os
import keras
import logging
import numpy as np
from imageio import imread
from keras.models import Model
from keras.regularizers import l2
from keras.optimizers import Adam
from PIL import Image, ImageFilter
from keras.models import Sequential
from keras.engine.topology import Layer
from keras.layers.pooling import MaxPooling2D
from keras.layers.core import Lambda, Flatten, Dense
from keras.layers.normalization import BatchNormalization
from keras.layers import Conv2D, ZeroPadding2D, Activation, Input, concatenate


logger = logging.getLogger()
logger.setLevel(logging.DEBUG)

Using TensorFlow backend.


### Dataset managment

In [None]:
def download_ds(download_address, save_address):
    if os.path.exists(save_address):
        logger.info("The dataset has already been downloaded.")
    else:
        !wget -O ds.zip {download_address}
        !unzip ds.zip
        !mv ShapesData_few-show-learning {save_address}
        !find {save_address} -name '*.DS_Store' -delete

In [None]:
class DS:
    def __init__(self):
        self.x_tr = None
        self.y_tr = None
        self.labels_tr = None
        self.x_val = None
        self.Y_val = None
        self.labels_val = None
        self.X_eval = None
        self.Y_eval = None
        self.labels_eval = None
        self.labels_dict = None

    def __Load_data(self,address, new_im_size, train_ratio):
        class_count = len(os.listdir(address))
        train_count = int(class_count*train_ratio)
        train_idx = np.random.choice(class_count, size = train_count, replace = False)
        x_tr, y_tr, x_val, y_val = [],[],[],[]
        labels_idx = dict()
        labels_tr = []
        labels_val = []
        offset_tr, offset_val = 0,0
        if os.path.exists(os.path.join(address,".DS_Store")):
            os.remove(os.path.join(address, ".DS_Store"))
        for i , label in enumerate(os.listdir(address)):
            labels_address = os.path.join(address, label)
            x, y, labels = (x_tr, y_tr, labels_tr) if i in train_idx else (x_val, y_val, labels_val)
            labels_idx[i] = label
            for image_name in os.listdir(labels_address):
                image_address = os.path.join(labels_address, image_name)
                try:
                    img = Image.open(image_address).convert('RGB')
                    img = img.resize((new_im_size, new_im_size))
                    x.append(np.array(img, dtype=np.float32))
                    y.append(i)
                except:
                    logger.info("ERROR: can't load {}".format(image_address))
            label_count = len(os.listdir(labels_address))
            if i in train_idx:
                labels.append((i, label_count , offset_tr))
                offset_tr+=label_count
            else:
                labels.append((i, label_count , offset_val))
                offset_val+=label_count

        labels = labels_idx
        x_tr = np.array(x_tr)
        y_tr = np.array(y_tr)
                
        x_val = np.array(x_val)
        y_val = np.array(y_val)

        if class_count-train_count > 0:
            x_val = np.array(x_val)
            y_val = np.array(y_val)

        return x_tr, y_tr, np.array(labels_tr), x_val, y_val, np.array(labels_val), labels

    def Load_validaton(self, address, new_im_size):
        self.x_eval, self.eval, self.labels_eval, _, _, _, _ =self.__Load_data(address, new_im_size, 1)
    def Load_background(self,address, new_im_size, train_ratio):
        self.x_tr, self.y_tr, self.labels_tr, self.x_val, self.y_val, self.labels_val, self.labels=self.__Load_data(address, new_im_size, train_ratio)

    def get_paired_data(self, batch_size, mode='train'):
        if mode=='train':
            x, labels = self.x_tr, self.labels_tr
        elif mode=='validation':
            x, labels = self.x_val, self.labels_val
        else:
            x, labels = self.x_eval, self.labels_eval

        half_size = batch_size // 2
        class_count = labels.shape[0]
        rnd_classes = np.random.choice(class_count, size = half_size, replace = True)
        rnd_pos_classes_lbl =labels[rnd_classes]
        rnd_1 = np.rint(np.random.uniform(0,1, half_size)*rnd_pos_classes_lbl[:,1]+rnd_pos_classes_lbl[:,2]).astype(np.int32)
        rnd_pos_2 = np.rint(np.random.uniform(0,1, half_size)*rnd_pos_classes_lbl[:,1] +rnd_pos_classes_lbl[:,2]).astype(np.int32)

        rnd_neg_2_classes =  (rnd_classes + np.random.randint(1,class_count-1,half_size)) % class_count
        rnd_neg_classes_lbl =labels[rnd_neg_2_classes]
        rnd_neg_2 = np.rint(np.random.uniform(0,1, half_size)*rnd_neg_classes_lbl[:,1]+rnd_neg_classes_lbl[:,2]).astype(np.int32)
        
        pair_1 = x[np.repeat(rnd_1,2)]
        pair_2 = np.concatenate((x[rnd_pos_2], x[rnd_neg_2]))
        y = np.ones(half_size*2)
        y[half_size:]= 0
        return [pair_1, pair_2], y



In [None]:

download_address = "https://www.dropbox.com/s/9imgl9m0qzin55g/ShapesData_few-show-learning.zip?dl=0"
save_address = "dataset"
new_im_size = 105
download_ds(download_address, save_address)
batch1_address = os.path.join(save_address, "batch1")
batch1_ds = DS()
batch1_ds.Load_background(batch1_address, 105, 0.9)

In [None]:
print(batch1_ds.labels)
print(batch1_ds.x_tr.shape)
print(batch1_ds.y_tr.shape)
print(batch1_ds.labels_tr)
print(batch1_ds.x_val.shape)
print(batch1_ds.y_val.shape)
print(batch1_ds.labels_val)

{0: 'Boy', 1: 'Monkey', 2: 'Bat', 3: 'If', 4: 'Tree', 5: 'House', 6: 'Lion', 7: 'Crocodile', 8: 'Girl', 9: 'Car'}
(5775, 105, 105, 3)
(5775,)
[[   0  691    0]
 [   1  287  691]
 [   2  391  978]
 [   3  468 1369]
 [   4  859 1837]
 [   5  737 2696]
 [   6 1144 3433]
 [   7  649 4577]
 [   9  549 5226]]
(1315, 105, 105, 3)
(1315,)
[[   8 1315    0]]


In [None]:
batch, y = batch1_ds.get_paired_data(100, 'train')

[4 6 6 5 7 3 7 4 8 6 0 5 5 5 3 0 5 4 7 0 4 3 5 5 2 1 1 6 0 1 3 1 2 1 1 8 8
 5 4 8 1 0 3 6 3 6 0 2 5 6]
[0 8 8 0 3 7 0 2 1 2 3 0 8 8 5 2 6 8 0 7 8 1 7 2 6 4 4 0 6 6 0 8 0 7 4 4 3
 8 6 4 3 4 1 3 5 7 6 5 8 8]
[-4  2  2 -5 -4  4 -7 -2 -7 -4  3 -5  3  3  2  2  1  4 -7  7  4 -2  2 -3
  4  3  3 -6  6  5 -3  7 -2  6  3 -4 -5  3  2 -4  2  4 -2 -3  2  1  6  3
  3  2]
(100, 105, 105, 3)
(100, 105, 105, 3)
(100,)


### Model

In [None]:
def siamese(input_shape):
    init_b = keras.initializers.RandomNormal(mean = 0.5, stddev=0.01)
    init_w = keras.initializers.RandomNormal(mean = 0.0, stddev=0.01)
    model = Sequential()
    model.add(Conv2D(64, (10,10), activation='relu', input_shape=input_shape,kernel_initializer=init_w, bias_initializer=init_b, kernel_regularizer=l2(2e-4)))
    model.add(MaxPooling2D())
    model.add(Conv2D(128, (7,7), activation='relu', kernel_initializer=init_w, bias_initializer=init_b, kernel_regularizer=l2(2e-4)))
    model.add(MaxPooling2D())
    model.add(Conv2D(128, (4,4), activation='relu', kernel_initializer=init_w, bias_initializer=init_b, kernel_regularizer=l2(2e-4)))
    model.add(MaxPooling2D())
    model.add(Conv2D(256, (4,4), activation='relu', kernel_initializer=init_w, bias_initializer=init_b, kernel_regularizer=l2(2e-4)))
    model.add(Flatten())
    model.add(Dense(4096, activation='sigmoid',kernel_regularizer=l2(1e-3), kernel_initializer=init_w,bias_initializer=init_b))
    dense_layer = Dense(1,activation='sigmoid',bias_initializer=init_b)
    L1_layer = Lambda(lambda tensors:keras.backend.abs(tensors[0] - tensors[1]))
    input_1, input_2 = Input(input_shape), Input(input_shape)
    outputs = dense_layer(L1_layer([model(input_1), model(input_2)]))
    siamese_model = Model(inputs=[input_1,input_2],outputs=outputs)
    return siamese_model


In [None]:
model = siamese((new_im_size, new_im_size, 1))
model.summary()
optimizer = Adam(lr = 0.00006)
model.compile(loss="binary_crossentropy",optimizer=optimizer)

Model: "model_1"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            (None, 105, 105, 1)  0                                            
__________________________________________________________________________________________________
input_2 (InputLayer)            (None, 105, 105, 1)  0                                            
__________________________________________________________________________________________________
sequential_1 (Sequential)       (None, 4096)         38947648    input_1[0][0]                    
                                                                 input_2[0][0]                    
__________________________________________________________________________________________________
lambda_1 (Lambda)               (None, 4096)         0           sequential_1[1][0]         

###Training


In [None]:
def train(max_epoch, ds):
    Logger.debug("training started")
    for i in range(1, max_epoch+1):
        (inputs,targets) = get_batch(batch_size)
        loss = model.train_on_batch(inputs, targets)
        if i % evaluate_every == 0:
            print("\n ------------- \n")
            print("Time for {0} iterations: {1} mins".format(i, (time.time()-t_start)/60.0))
            print("Train Loss: {0}".format(loss)) 
            val_acc = test_oneshot(model, N_way, n_val, verbose=True)
            model.save_weights(os.path.join(model_path, 'weights.{}.h5'.format(i)))
            if val_acc >= best:
                print("Current best: {0}, previous best: {1}".format(val_acc, best))
                best = val_acc

    




def train(max_epoch, ds):
    Logger.debug("training started")
    for i in range(1, max_epoch+1):
        (inputs,targets) = get_batch(batch_size)
        loss = model.train_on_batch(inputs, targets)
        if i % evaluate_every == 0:
            print("\n ------------- \n")
            print("Time for {0} iterations: {1} mins".format(i, (time.time()-t_start)/60.0))
            print("Train Loss: {0}".format(loss)) 
            val_acc = test_oneshot(model, N_way, n_val, verbose=True)
            model.save_weights(os.path.join(model_path, 'weights.{}.h5'.format(i)))
            if val_acc >= best:
                print("Current best: {0}, previous best: {1}".format(val_acc, best))
                best = val_acc

    




In [None]:
def train(max_epoch, ds):
    Logger.debug("training started")
    for i in range(1, max_epoch+1):
        (inputs,targets) = get_batch(batch_size)
        loss = model.train_on_batch(inputs, targets)
        if i % evaluate_every == 0:
            print("\n ------------- \n")
            print("Time for {0} iterations: {1} mins".format(i, (time.time()-t_start)/60.0))
            print("Train Loss: {0}".format(loss)) 
            val_acc = test_oneshot(model, N_way, n_val, verbose=True)
            model.save_weights(os.path.join(model_path, 'weights.{}.h5'.format(i)))
            if val_acc >= best:
                print("Current best: {0}, previous best: {1}".format(val_acc, best))
                best = val_acc

    


