In [None]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

In [None]:
import warnings
warnings.filterwarnings("ignore")

import os
import numpy as np
from sklearn.utils import shuffle
import cv2
import numpy.random as random
from keras.models import Model, Input, Sequential
from keras.layers.core import Dense, Flatten, Lambda
from keras.layers import Conv2D, MaxPooling2D
from keras.regularizers import l2
import keras.backend as K
from keras.optimizers import Adam

## Constant variable

In [None]:
TRAIN_DIR = "../input/omniglot-dataset/images_background/images_background/"
TEST_DIR = "../input/omniglot-dataset/images_evaluation/images_evaluation/"
BATCH_SIZE = 32
HEIGHT = 105
WIDTH = 105
CHANNEL = 1

## Data generation

In [None]:
class DataGeneration:
    def __init__(self):
        self.train_dir = TRAIN_DIR
        self.test_dir = TEST_DIR
        self.batch_size = BATCH_SIZE
        
    def load_image(self, img_path):
        image = cv2.imread(img_path, 0)
        return image
    
    def load_dataset(self, data_path, current_y=0):
        X = []
        y = []
        lang_dict = {}
        current_y = current_y
        
        for alphabet in os.listdir(data_path):
            print("Loading {}".format(alphabet))
            alphabet_path = os.path.join(data_path, alphabet)
            lang_dict[alphabet] = [current_y, None]
            
            for letter in os.listdir(alphabet_path):
                letter_path = os.path.join(alphabet_path, letter)
                categories_images = []
                
                for img in os.listdir(letter_path):
                    img_path = os.path.join(letter_path, img)
                    image = self.load_image(img_path)
                    categories_images.append(image)
                    y.append(current_y)
                
                try:
                    X.append(np.stack(categories_images))
                except ValueError as e:
                    print(e)
                    print("error - category_images:", category_images)
                
                lang_dict[alphabet][1]=current_y
                current_y += 1
               
        X = np.stack(X)
        y = np.vstack(y)
        
        return X, y, lang_dict

    def get_batch(self, X, y):
        n_classes, n_samples, w, h = X.shape
        categories = random.choice(n_classes, size=(self.batch_size,), replace=False)
        targets = np.zeros((self.batch_size,))
        pairs = [np.zeros((self.batch_size, h, w, 1)) for _ in range(2)]
        targets[self.batch_size//2:] = 1
        
        for i in range(self.batch_size):
            category = categories[i]
            idx_1 = random.randint(0, n_samples)
            pairs[0][i,:,:,:] = X[category, idx_1].reshape(w, h, 1)
            idx_2 = random.randint(0, n_samples)
            
            if i > self.batch_size//2:
                category_2 = category
            else:
                category_2 = (category + random.randint(1, n_classes))%n_classes
                
            pairs[1][i,:,:,:] = X[category_2, idx_2].reshape(w, h, 1)
        
        return pairs, targets

    def generate(self, X, y):
        while True:
            pairs, targets = self.get_batch(X, y)
            yield (pairs, targets)

In [None]:
data_gener = DataGeneration()

In [None]:
X_train, y_train, train_categories = data_gener.load_dataset(TRAIN_DIR)

In [None]:
X_test, y_test, test_categories = data_gener.load_dataset(TEST_DIR)

In [None]:
X_train.shape

## Model Architecture

In [None]:
def initialize_weights(shape, dtype=None):
    return np.random.normal(loc=0.0, scale=1e-2, size=shape)

In [None]:
def initialize_bias(shape, dtype=None):
    return np.random.normal(loc=0.5, scale=1e-2, size=shape)

In [None]:
def get_siamese_net(input_shape):
    # Build input
    left_input = Input(input_shape)
    right_input = Input(input_shape)

    # Model architecture of features extraction
    model = Sequential()
    model.add(Conv2D(64, (10, 10), activation="relu",
                     input_shape=input_shape,
                     kernel_initializer=initialize_weights,
                     kernel_regularizer=l2(1e-4)))
    model.add(MaxPooling2D())
    model.add(Conv2D(128, (7, 7), activation="relu",
                     kernel_initializer=initialize_weights,
                     bias_initializer=initialize_bias,
                     kernel_regularizer=l2(1e-4)))
    model.add(MaxPooling2D())
    model.add(Conv2D(128, (4, 4), activation="relu",
                     kernel_initializer=initialize_weights,
                     bias_initializer=initialize_bias,
                     kernel_regularizer=l2(1e-4)))
    model.add(MaxPooling2D())
    model.add(Conv2D(256, (4, 4), activation="relu",
                     kernel_initializer=initialize_weights,
                     bias_initializer=initialize_bias,
                     kernel_regularizer=l2(1e-4)))
    model.add(Flatten())
    model.add(Dense(4096, activation="sigmoid",
                    kernel_initializer=initialize_weights,
                    bias_initializer=initialize_bias,
                    kernel_regularizer=l2(1e-3)))
    
    # Encoding inputs
    left_encoding = model(left_input)
    right_encoding = model(right_input)
    
    # Add a layer to compute distance
    L1_layer = Lambda(lambda tensors: K.abs(tensors[0] - tensors[1]))
    L1_distance = L1_layer([left_encoding, right_encoding])
    
    # Add dense layer to predict
    prediction = Dense(1, activation="sigmoid", bias_initializer=initialize_bias)(L1_distance)
    
    #Connect inputs and output
    siamese_net = Model(inputs=[left_input, right_input], outputs=prediction)
    
    return siamese_net

In [None]:
model = get_siamese_net((WIDTH, HEIGHT, CHANNEL))
model.summary()

In [None]:
optimizer = Adam(lr = 0.00006)

In [None]:
model.compile(optimizer=optimizer, loss="binary_crossentropy")

## Model evaluation

In [None]:
def make_n_test(N, s="val", language=None):
    if s=="train":
        X = X_train
        categories = train_categories
    else:
        X = X_test
        categories = test_categories
        
    n_classes, n_samples, w, h = X.shape
        
    if language is not None:
        low, high = categories[language]
        if high - low < N:
            raise ValueError("This language {} has less than {} letters".format(language, N))
        categories = random.choice(range(low, high), size=N, replace=False)
    else:
        categories = random.choice(range(n_classes), size=N, replace=False)
    
    true_category = categories[0]
    ex_1, ex_2 = random.choice(range(n_samples), size=2, replace=False)
    main_set = np.asarray([X[true_category, ex_1, :, :]]*N).reshape((N, w, h, 1))
    support_set = np.asarray(X[categories, ex_1, :, :])
    support_set[0] = X[true_category, ex_2, :, :]
    support_set = support_set.reshape(N, w, h, 1)
    targets = np.zeros((N,))
    targets[0] = 1
    targets, main_set, support_set = shuffle(targets, main_set, support_set)
    pairs = [main_set, support_set]
    
    return pairs, targets

In [None]:
def test_one_shot(model, N, k, s="val", verbose=0):
    n_correct = 0
    
    if verbose == 0:
        print("Evaluating model on {} random {} way one-shot learning tasks ... \n".format(k,N))
    
    for i in range(k):
        inputs, targets = make_n_test(N, s)
        probs = model.predict(inputs)
        
        if probs.argmax() == targets.argmax():
              n_correct += 1
    percent_correct = (100.0 * n_correct / k)

    if verbose:
        print("Got an average of {}% {} way one-shot learning accuracy \n".format(percent_correct,N))
    return percent_correct

In [None]:
evaluate_every = 200 # interval for evaluating on one-shot tasks
n_iter = 60000 # No. of training iterations
N_way = 20 # how many classes for testing one-shot tasks
n_val = 250 # how many one-shot tasks to validate on
best = -1

In [None]:
!rm -rf "./weights/"
model_path = './weights/'

if not os.path.exists(model_path):
    os.mkdir(model_path)

In [None]:
import time
print("Starting training process!")
print("-------------------------------------")

t_start = time.time()
for i in range(1, n_iter+1):
#     print(data_generation.get_batch(X_train, y_train))
    inputs, targets = data_gener.get_batch(X_train, y_train)
    loss = model.train_on_batch(inputs, targets)
    
    if i % evaluate_every == 0:
        print("\n ------------- \n")
        print("Time for {0} iterations: {1} mins".format(i, (time.time() - t_start)/60.0))
        print("Train Loss: {0}".format(loss)) 
        val_acc = test_one_shot(model, N_way, n_val, verbose=True)
        if val_acc > 86:
            model.save_weights(os.path.join(model_path, 'weights.{}.h5'.format(i)))
        
        if val_acc >= best:
            print("Current best: {0}, previous best: {1}".format(val_acc, best))
            best = val_acc