In [3]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load in 

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the "../input/" directory.
# For example, running this (by clicking run or pressing Shift+Enter) will list the files in the input directory

import os
print(os.listdir("../input"))
print(os.listdir('../'))

# Any results you write to the current directory are saved as output.

['test', 'sample_submission.csv', 'train.csv', 'train']
['lib', 'working', 'config', 'input']


In [4]:
from sklearn.model_selection import train_test_split
from itertools import combinations_with_replacement
import cv2
import numpy.random as rng

data = pd.read_csv("../input/train.csv")
categories = data.groupby('Id').size()
batch_size = 32
base_size = 105

# Make Categories dictionary
Id_dict = {}
for i, id in enumerate(categories.index):
    if id in Id_dict:
        continue
    Id_dict[id] = i

categories_list = []
for k in Id_dict.keys():
    img_list = data[data.Id == k].Image.values
    categories_list.append(img_list)

def read_image(path, image_name, base_size):

    image_path = os.path.join(path, image_name)
    img = cv2.imread(image_path, 0)
    img = cv2.resize(img, (base_size, base_size))
    img = img / 255

    return img

def select_img(img_list):

    if len(img_list) > 1:
        choice_index = rng.randint(len(img_list), size=1)
        choice_img = img_list[choice_index][0]
    else:
        choice_img = img_list[0]

    return  choice_img

def make_batch(batch_size):

    img1 = []
    img2 = []

    categories = rng.choice(len(Id_dict), size=(batch_size,),replace=False)
    label = np.zeros(batch_size)
    label[batch_size//2:] = 1.0

    for i in range(batch_size):
        category_num = categories[i]
        img_list = categories_list[category_num]
        x1 = select_img(img_list)

        if i >= batch_size // 2:
            dif_num = rng.randint(len(Id_dict), size=1)
            while dif_num == category_num:
                dif_num = rng.randint(len(Id_dict), size=1)

            dif_img_list = categories_list[int(dif_num)]
            x2 = select_img(dif_img_list)
        else:
            x2 = select_img(img_list)

        img1.append(x1)
        img2.append(x2)

    return img1, img2, label

def generator(batch_size):
    
    while True:
        img1, img2, label = make_batch(batch_size=batch_size)
        left_input = np.zeros((len(img1), base_size, base_size, 1))
        right_input = np.zeros((len(img2), base_size, base_size, 1))
        
        for i, img_name in enumerate(img1):
            left_input[i, :, :, 0] = read_image(path='../input/train', image_name=img_name, base_size=base_size)
        for i, img_name in enumerate(img2):
            right_input[i, :, :, 0] = read_image(path='../input/train', image_name=img_name, base_size=base_size)
            
        yield [left_input, right_input], label

batch_gen = generator(batch_size=batch_size)

In [5]:
import keras.backend as K
from keras.layers import Conv2D, Dense, MaxPool2D, Flatten, Input, merge, subtract, Lambda
from keras.models import Sequential, Model
from keras.initializers import random_normal
from keras import optimizers
from keras import metrics
from keras import losses
from keras.callbacks import EarlyStopping, ReduceLROnPlateau

class Siamese_Net(object):

    def __init__(self, input_shape):

        self.input_shape = input_shape
        self.initializers_weight = random_normal(mean=0.0, stddev=0.01)
        self.initializers_bias = random_normal(mean=0.5, stddev=0.01)
        self.initializers_weight_fully = random_normal(mean=0.0, stddev=0.2)

    def build(self):

        left_input = Input(self.input_shape)
        right_input = Input(self.input_shape)

        convert = Sequential()
        convert.add(Conv2D(64, kernel_size=10, strides=(1,1), activation='relu',
                           kernel_initializer=self.initializers_weight, bias_initializer=self.initializers_bias))
        convert.add(MaxPool2D(pool_size=2))
        convert.add(Conv2D(128, kernel_size=7, strides=(1,1), activation='relu',
                           kernel_initializer=self.initializers_weight, bias_initializer=self.initializers_bias))
        convert.add(MaxPool2D(pool_size=2))
        convert.add(Conv2D(128, kernel_size=4, strides=(1,1), activation='relu',
                           kernel_initializer=self.initializers_weight, bias_initializer=self.initializers_bias))
        convert.add(MaxPool2D(pool_size=2))
        convert.add(Conv2D(256, kernel_size=4, strides=(1,1), activation='relu',
                           kernel_initializer=self.initializers_weight, bias_initializer=self.initializers_bias))
        #the units in the final convolutional layer are flattened into a single vector
        convert.add(Flatten())
        #the convolutional layer is followed by a fully connected layer
        convert.add(Dense(4096, activation='sigmoid', kernel_initializer=self.initializers_weight_fully,
                          bias_initializer=self.initializers_bias))

        left_features = convert(left_input)
        right_features = convert(right_input)

        #L1 siamese dist
        dist = Lambda(lambda x: K.abs(x[0]-x[1]))([left_features, right_features])

        #fully connected + sigmoid
        out = Dense(1, activation='sigmoid')(dist)

        model = Model(inputs=[left_input, right_input], outputs=out)

        return model

Using TensorFlow backend.


In [6]:
model = Siamese_Net(input_shape=(105, 105, 1)).build()
model.summary()
model.compile(optimizer= optimizers.Adam(lr=0.01, decay=0.01),loss=losses.binary_crossentropy, metrics=[metrics.binary_accuracy])
callbacks = [EarlyStopping(patience=1, monitor='loss')]

__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            (None, 105, 105, 1)  0                                            
__________________________________________________________________________________________________
input_2 (InputLayer)            (None, 105, 105, 1)  0                                            
__________________________________________________________________________________________________
sequential_1 (Sequential)       (None, 4096)         38947648    input_1[0][0]                    
                                                                 input_2[0][0]                    
__________________________________________________________________________________________________
lambda_1 (Lambda)               (None, 4096)         0           sequential_1[1][0]               
          

In [7]:
def test_generator():
    
    choice_num = rng.randint(len(Id_dict), size=1)
    img_list = categories_list[int(choice_num)]
    test_img_name = select_img(img_list)
    
    #support_category = rng.choice(len(Id_dict), size=(batch_size-1,),replace=False)
    
    #while choice_num in support_category:
        #support_category = rng.choice(len(Id_dict), size=(batch_size-1,),replace=False)
    
    support_set = np.zeros((len(Id_dict), base_size, base_size, 1))
    test_set = np.zeros((len(Id_dict), base_size, base_size, 1))
    
    for i in range(len(Id_dict)):
        test_set[i, :, :, 0] = read_image(path='../input/train', image_name=test_img_name, base_size=base_size)
        #if i == 0:
            #support_img_list = img_list
            #support_img_name = select_img(support_img_list)
        #else:
            #support_img_list = categories_list[support_category[i-1]]
            #support_img_name = select_img(support_img_list)
        support_img_list = categories_list[i]
        support_img_name = select_img(support_img_list)
        support_set[i, :, : , 0] = read_image(path='../input/train', image_name=support_img_name, base_size=base_size)
        
    return test_set, support_set, choice_num

In [8]:
from tqdm import tqdm

def test_one_shot(K):
    
    n_correct = 0
    
    for i in tqdm(range(K)):
        test_set, support_set, label = test_generator()
        prob = model.predict([test_set, support_set])
        prob = np.squeeze(prob)
        max_indices = np.argpartition(prob, 5)[:5]
        for index in max_indices:
            if index == label:
                n_correct +=1
            
    precision = (n_correct / K) * 100
    
    print("Got an average of {}% in one-shot learning accuracy".format(precision))
    
    return precision

In [None]:
model.fit_generator(batch_gen, verbose=1, epochs=10, steps_per_epoch=200)
precision = test_one_shot(K=100)

In [None]:
def predict_generator(predict_img_name):
    
    support_set = np.zeros((len(Id_dict), base_size, base_size, 1))
    predict_set = np.zeros((len(Id_dict), base_size, base_size, 1))
    
    for i in range(len(Id_dict)):
        predict_set[i, :, :, 0] = read_image(path='../input/test', image_name=predict_img_name, base_size=base_size)
        support_img_list = categories_list[i]
        support_img_name = select_img(support_img_list)
        support_set[i, :, : , 0] = read_image(path='../input/train', image_name=support_img_name, base_size=base_size)
        
    return predict_set, support_set

In [None]:
def preds2catids(predictions):
    predictions = np.transpose(predictions)
    return pd.DataFrame(np.argsort(predictions, axis=1)[:, :5], columns=['a', 'b', 'c', 'd', 'e'])

def one_shot_predict(test_set):
    
    all_top5 = pd.DataFrame()
    for img in tqdm(test_set):
        predict_set, support_set= predict_generator(img)
        prob = model.predict([predict_set, support_set])
        top5 = preds2catids(prob)
        all_top5 = all_top5.append(top5, ignore_index=True)
        
    return all_top5

In [None]:
to_class = {}
for key, value in Id_dict.items():
    to_class[value] = key

test_set = os.listdir('../input/test')
all_top5 = one_shot_predict(test_set)
all_top5 = all_top5.replace(to_class)
all_top5.head()

In [None]:
predictions = all_top5['a'] + ' ' + all_top5['b'] + ' ' + all_top5['c'] + ' ' + all_top5['d'] + ' ' + all_top5['e'] 

submission = pd.DataFrame({'Image':test_set,
                          'Id': predictions})

submission.head()
submission.to_csv('submission.csv', index=False)