# One Shot Learning with Siamese Networks using Keras-Tensorflow

## What are Siamese Networks

### Siamese networks are neural networks containing two or more identical subnetwork components.

In [2]:
import os 
import numpy as np 
import matplotlib.pyplot as plt  

import tensorflow as tf 
import tensorflow.keras as keras

import cv2 
import time 

from sklearn.utils import shuffle

In [35]:
DATA_DIR = '/media/toanmh/Workspace/Github/Datasets/omniglot-master/python/'
train_folder = DATA_DIR + 'images_background/'
val_folder = DATA_DIR + 'images_evaluation/'
save_path = DATA_DIR + 'snapshots/'
batch_size = 16

In [4]:
# Load dataset
def load_imgs(path, n=0):
    X = []
    y = []
    cat_dict = {}
    lang_dict = {}
    curr_y = n
    
    for alphabet in os.listdir(path):
        print('loading alphabet: ' + alphabet)
        lang_dict[alphabet] = [curr_y, None]
        alphabet_path = os.path.join(path, alphabet)
        # every letter/category has it's own column in the array, so load seperately
        for letter in os.listdir(alphabet_path):
            cat_dict[curr_y] = (alphabet, letter)
            category_images = []
            letter_path = os.path.join(alphabet_path, letter)
            # read all the images in the current category
            for filename in os.listdir(letter_path):
                image_path = os.path.join(letter_path, filename)
                image = cv2.cvtColor(cv2.imread(image_path), cv2.COLOR_BGR2GRAY)
                category_images.append(image)
                y.append(curr_y)
            try:
                X.append(np.stack(category_images))
            except ValueError as e:
                print('{} error - category_image {}'.format(e, category_images))

            curr_y += 1
            lang_dict[alphabet][1] = curr_y - 1
    y = np.vstack(y)
    X = np.stack(X)

    return X, y, lang_dict

In [5]:
X, y, c = load_imgs(train_folder)

loading alphabet: Alphabet_of_the_Magi
loading alphabet: Anglo-Saxon_Futhorc
loading alphabet: Arcadian
loading alphabet: Armenian
loading alphabet: Asomtavruli_(Georgian)
loading alphabet: Balinese
loading alphabet: Bengali
loading alphabet: Blackfoot_(Canadian_Aboriginal_Syllabics)
loading alphabet: Braille
loading alphabet: Burmese_(Myanmar)
loading alphabet: Cyrillic
loading alphabet: Early_Aramaic
loading alphabet: Futurama
loading alphabet: Grantha
loading alphabet: Greek
loading alphabet: Gujarati
loading alphabet: Hebrew
loading alphabet: Inuktitut_(Canadian_Aboriginal_Syllabics)
loading alphabet: Japanese_(hiragana)
loading alphabet: Japanese_(katakana)
loading alphabet: Korean
loading alphabet: Latin
loading alphabet: Malay_(Jawi_-_Arabic)
loading alphabet: Mkhedruli_(Georgian)
loading alphabet: N_Ko
loading alphabet: Ojibwe_(Canadian_Aboriginal_Syllabics)
loading alphabet: Sanskrit
loading alphabet: Syriac_(Estrangelo)
loading alphabet: Tagalog
loading alphabet: Tifinagh


In [6]:
import pickle

# Saving the traing tensors on disk
with open(os.path.join(save_path, 'train.pickle'), 'wb') as f:
    pickle.dump((X, c), f)

In [7]:
# loading the validation images into tensors
Xval, yval, cval = load_imgs(val_folder)

with open(os.path.join(save_path, 'val.pickle'), 'wb') as f:
    pickle.dump((Xval, cval), f)

loading alphabet: Angelic
loading alphabet: Atemayar_Qelisayer
loading alphabet: Atlantean
loading alphabet: Aurek-Besh
loading alphabet: Avesta
loading alphabet: Ge_ez
loading alphabet: Glagolitic
loading alphabet: Gurmukhi
loading alphabet: Kannada
loading alphabet: Keble
loading alphabet: Malayalam
loading alphabet: Manipuri
loading alphabet: Mongolian
loading alphabet: Old_Church_Slavonic_(Cyrillic)
loading alphabet: Oriya
loading alphabet: Sylheti
loading alphabet: Syriac_(Serto)
loading alphabet: Tengwar
loading alphabet: Tibetan
loading alphabet: ULOG


In [19]:
def initialize_weights(shape, dtype=None):
    values = np.random.normal(loc=0.0, scale=1e-2, size=shape)
    return tf.keras.backend.variable(value=values, dtype=dtype)

def initialize_bias(shape, dtype=None):
    values = np.random.normal(loc=0.0, scale=1e-2, size=shape)
    return tf.keras.backend.variable(value=values, dtype=dtype)

In [13]:
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.layers import Input, Conv2D, BatchNormalization, Activation, MaxPooling2D, Flatten, Dense, Dropout, Lambda, Layer, Concatenate
from tensorflow.keras.regularizers import l1, l2

# create siamese model
def get_siamese_model(input_shape):
    # Define the tensors for the two input images
    left_input = Input(input_shape)
    right_input = Input(input_shape)
    
    # Convolutional Neural Network
    model = Sequential()
    model.add(Conv2D(64, (10,10), activation='relu', input_shape=input_shape,
                   kernel_initializer=initialize_weights, kernel_regularizer=l2(2e-4)))
    model.add(MaxPooling2D())
    model.add(Conv2D(128, (7,7), activation='relu',
                     kernel_initializer=initialize_weights,
                     bias_initializer=initialize_bias, kernel_regularizer=l2(2e-4)))
    model.add(MaxPooling2D())
    model.add(Conv2D(128, (4,4), activation='relu', kernel_initializer=initialize_weights,
                     bias_initializer=initialize_bias, kernel_regularizer=l2(2e-4)))
    model.add(MaxPooling2D())
    model.add(Conv2D(256, (4,4), activation='relu', kernel_initializer=initialize_weights,
                     bias_initializer=initialize_bias, kernel_regularizer=l2(2e-4)))
    model.add(Flatten())
    model.add(Dense(4096, activation='sigmoid',
                   kernel_regularizer=l2(1e-3),
                   kernel_initializer=initialize_weights,bias_initializer=initialize_bias))
    
    # Generate the encodings (feature vectors) for the two images
    encoded_l = model(left_input)
    encoded_r = model(right_input)

    # Add a customized layer to compute the absolute difference between the encodings
    L1_layer = Lambda(lambda tensors:tf.keras.backend.abs(tensors[0] - tensors[1]))
    L1_distance = L1_layer([encoded_l, encoded_r])

    # Add a dense layer with a sigmoid unit to generate the similarity score
    predict = Dense(1, activation='sigmoid', bias_initializer=initialize_bias)(L1_distance)

    # Connect the inputs with the outputs
    siamese_net = Model(inputs=[left_input, right_input], outputs=predict)

    return siamese_net

In [20]:
model = get_siamese_model((105, 105, 1))
model.summary()

Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_11 (InputLayer)           [(None, 105, 105, 1) 0                                            
__________________________________________________________________________________________________
input_12 (InputLayer)           [(None, 105, 105, 1) 0                                            
__________________________________________________________________________________________________
sequential_5 (Sequential)       (None, 4096)         38947648    input_11[0][0]                   
                                                                 input_12[0][0]                   
__________________________________________________________________________________________________
lambda (Lambda)                 (None, 4096)         0           sequential_5[1][0]           

In [30]:
from tensorflow.keras.utils import plot_model
import pydot

plot_model(model, to_file='model.png', show_shapes=True, show_layer_names=True)

Failed to import pydot. You must install pydot and graphviz for `pydotprint` to work.


In [31]:
optimizer = tf.keras.optimizers.Adam(learning_rate=6e-4)
model.compile(loss='binary_crossentropy', optimizer=optimizer)

In [32]:
# Loading the train tensors
with open(os.path.join(save_path, 'train.pickle'), 'rb') as f:
    (X_train, y_train) = pickle.load(f)

print('Training alphabets: {}'.format(list(y_train.keys())))

Training alphabets: ['Alphabet_of_the_Magi', 'Anglo-Saxon_Futhorc', 'Arcadian', 'Armenian', 'Asomtavruli_(Georgian)', 'Balinese', 'Bengali', 'Blackfoot_(Canadian_Aboriginal_Syllabics)', 'Braille', 'Burmese_(Myanmar)', 'Cyrillic', 'Early_Aramaic', 'Futurama', 'Grantha', 'Greek', 'Gujarati', 'Hebrew', 'Inuktitut_(Canadian_Aboriginal_Syllabics)', 'Japanese_(hiragana)', 'Japanese_(katakana)', 'Korean', 'Latin', 'Malay_(Jawi_-_Arabic)', 'Mkhedruli_(Georgian)', 'N_Ko', 'Ojibwe_(Canadian_Aboriginal_Syllabics)', 'Sanskrit', 'Syriac_(Estrangelo)', 'Tagalog', 'Tifinagh']


In [33]:
with open(os.path.join(save_path, 'val.pickle'), 'rb') as f:
    (X_val, y_val) = pickle.load(f)

In [49]:
def get_batch(batch_size=batch_size, s='train'):
    if s == 'train':
        X, categories = X_train, y_train
    else:
        X, categories = X_val, y_val
    n_classes, n_examples, w, h, c = X.shape

    categories = np.random.choice(n_classes, size=(batch_size,), replace=False)

    pairs = [np.zeros((batch_size, h, w, 1)) for i in range(2)]

    targets = np.zeros((batch_size, ))

    targets[batch_size//2:] == 1
    for i in range(batch_size):
        category = categories[i]
        idx_1 = np.random.randint(0, n_examples)
        pairs[0][i, :, :, :] = X[category, idx_1].reshape(w, h, 1)
        idx_2 = np.random.randint(0, n_examples)

        if i >= batch_size // 2:
            category_2 = category
        else:
            category_2 = (category + np.random.randint(1, n_classes)) % n_classes

        pairs[1][i, :, :, :] = X[category_2, idx_2].reshape(w, h, 1)

    return pairs, targets

In [50]:
def generate(batch_size=batch_size, s='train'):
    while True:
        pairs, targets = get_batch(batch_size, s)
        yield(pairs, targets)

In [45]:
def make_oneshot_task(N, s='val', language=None):
    if s == 'train':
        X, categories = X_train, y_train
    else:
        X, categories = X_val, y_val

    n_classes, n_examples, w, h = X.shape 
    indices = np.random.randint(0, n_examples, size=(N,))
    if language is not None:
        low, high = categories[language]
        if N > high - low:
            raise ValueError('This language ({}) has less than ({}) letters'.format(language, N))
        categories = np.random.choice(range(low, high), size=(N, ), replace=False)

    else:
        categories = np.random.choice(range(n_classes), size=(N, ), replace=False)

    true_category = categories[0]
    ex1, ex2 = np.random.choise(n_examples, size=(2,), replace=False)
    test_image = np.asarray([X[true_category, ex1, :, :]] * N).reshape(N, w, h, 1)
    support_set = X[categories, indices,:,:]
    support_set[0,:,:] = X[true_category, ex2]
    support_set = support_set.reshape(N, w, h, 1)
    targets = np.zeros((N,))
    targets[0] = 1
    targets, test_image, support_set = shuffle(targets, test_image, support_set)
    pairs = [test_image, support_set]

    return pairs, targets


In [46]:
def test_oneshot(model, N, k, s='val', verbose=2):
    n_correct = 0
    if verbose:
        print('Evaluating model on {} rnaodm {} way one-shot learning tasks ...\n'.format(k, N))
    for i in range(k):
        inputs, targets = make_oneshot_task(N, s)
        probs = model.predict(inputs)
        if np.argmax(probs) == np.argmax(targets):
            n_correct += 1

    precent_correct = (n_correct * 100.) / k 
    if verbose:
        print('Got an average of {}% {} way one-shot learning accuracy \n'.format(precent_correct, N))

    return precent_correct

In [47]:

# Hyper parameters
evaluate_every = 200 # interval for evaluating on one-shot tasks
n_iter = 20000 # No. of training iterations
N_way = 20 # how many classes for testing one-shot tasks
n_val = 250 # how many one-shot tasks to validate on
best = -1

In [51]:
t_start = time.time()
for i in range(1, n_iter):
    (inputs, targets) = get_batch(batch_size)
    loss = model.train_on_batch(inputs, targets)
    if i % evaluate_every == 0:
        print("\n ------------- \n")
        print("Time for {0} iterations: {1} mins".format(i, (time.time()-t_start)/60.0))
        print("Train Loss: {0}".format(loss)) 
        val_acc = test_oneshot(model, N_way, n_val, verbose=True)
        model.save_weights(os.path.join(os.path.join(save_path, 'weights.{}.h5'.format(i))))
        if val_acc >= best:
            print("Current best: {0}, previous best: {1}".format(val_acc, best))
            best = val_acc

ValueError: cannot reshape array of size 33075 into shape (105,105,1)