[View in Colaboratory](https://colab.research.google.com/github/Naren-Jegan/Deep-Learning-Keras/blob/master/One_Shot_Classification_V1.ipynb)

# One Shot Learning on Omniglot Dataset

The [Omniglot](https://github.com/brendenlake/omniglot) dataset contains 1623 different handwritten characters from 50 different alphabets.
Each of the 1623 characters was drawn online via Amazon's Mechanical Turk by 20 different people.
This dataset has been the baseline for any one-shot learning algorithm.


Some of the machine learning algorithms used for learning this dataset over the years are listed below in order of accuracy:
*  Hierarchical Bayesian Program Learning - 95.2%
*  Convolutional Siamese Net                        - 92.0%
*  Affine model                                                  - 81.8%
*  Hierarchical Deep                                         - 65.2%
*  Deep Boltzmann Machine                           - 62.0%
*  Siamese Neural Net                                     - 58.3%
*  Simple Stroke                                                - 35.2%
*  1-Nearest Neighbor                                      - 21.7%


This notebook implements a [Convolutional Siamese Neural Network](https://https://www.cs.cmu.edu/~rsalakhu/papers/oneshot1.pdf) using a background set of 30 alphabets for training and evaluate on set of 20 alphabets.

In [0]:
from google.colab import auth, drive
auth.authenticate_user()
drive.mount('/content/drive')

In [0]:
%matplotlib inline
import matplotlib.pyplot as plt
import tensorflow as tf
import numpy as np
import math
import os
from PIL import Image, ImageFilter, ImageOps, ImageMath
import numpy.random as rnd
import pickle
from time import sleep
from copy import deepcopy

In [0]:
# from tf.keras.models import Sequential  # This does not work!
from tensorflow.python.keras.models import Model, Sequential
from tensorflow.python.keras.layers import InputLayer, Input, Lambda
from tensorflow.python.keras.layers import Reshape, MaxPooling2D, Dropout, BatchNormalization
from tensorflow.python.keras.layers import Conv2D, Dense, Flatten
from tensorflow.python.keras.preprocessing.image import ImageDataGenerator
from tensorflow.python.keras.models import load_model
from tensorflow.python.keras import backend as K
from tensorflow.python.keras.regularizers import l2
from tensorflow.python.keras.initializers import RandomNormal
from tensorflow import test, logging
from keras.wrappers.scikit_learn import KerasClassifier
from keras.wrappers.scikit_learn import GridSearchCV
logging.set_verbosity(tf.logging.ERROR)
test.gpu_device_name()

In [0]:
tf.__version__

In [0]:
one_shot_path = os.path.join("drive", "My Drive", "Colab Notebooks", "One-Shot Classification")
background_path = os.path.join(one_shot_path, "background")
evaluation_path = os.path.join(one_shot_path, "evaluation")
recognition_model_path = os.path.join(one_shot_path, "recognition_model.h5")

In [0]:
##creating training set
train_data = np.ndarray(shape=(964, 20, 105, 105))
train_alphabets = dict()

#for alphabet in os.listdir(background_path):
#  alphabet_path = os.path.join(background_path, alphabet)
#  for character in os.listdir(alphabet_path):
#    character_path = os.path.join(alphabet_path, character)
#    for image in os.listdir(character_path):
#      index = int(image[0:4]) - 1
#      writer = int(image[5:7]) - 1
#      train_data[index][writer] = np.array(Image.open(os.path.join(character_path, image)))
#      train_alphabets[alphabet] = index if alphabet not in train_alphabets or train_alphabets[alphabet] > index else train_alphabets[alphabet]

#with open(os.path.join("train.pickle"), 'wb') as f:
#  pickle.dump([train_data, train_alphabets], f, protocol=2)

In [0]:
with open(os.path.join(one_shot_path, "train.pickle"), 'rb') as f:
  train_data, train_alphabets = pickle.load(f, encoding='latin1')

In [0]:
#@title Inputs

conv_activation = 'relu' #@param ['relu', 'softplus', 'tanh', 'sigmoid'] {type:"string"}
dense_activation = 'sigmoid' #@param ['relu', 'softplus', 'tanh', 'sigmoid'] {type:"string"}
learning_rate = 1e-2 #@param {type:"number"}
conv_regularization_parameter = 1e-2 #@param {type:"number"}
dense_regularization_parameter = 1e-4 #@param {type:"number"}
batch_size = 128 #@param {type:"slider", min:0, max:1024, step:16}
batches_per_epoch = 75 #@param {type:"slider", min:0, max:100, step:5}
n_epochs = 200 #@param {type:"slider", min:25, max:500, step:25}


batch_size = 1 if batch_size == 0 else batch_size
batches_per_epoch = 1 if batches_per_epoch == 0 else batches_per_epoch


In [0]:
#@title Data Augmentation
image_size = 105 #@param {type:"slider", min:32, max:512, step:1}

rotation_range = 10 #@param {type:"slider", min:0, max:90, step:1}
width_shift_range = 2 #@param {type:"slider", min:0, max:10, step:0.1}
height_shift_range = 2 #@param {type:"slider", min:0, max:10, step:0.1}
shear_range = 0.3 #@param {type:"slider", min:0, max:1, step:0.1}
zoom_range = 0.2 #@param {type:"slider", min:0, max:1, step:0.01}

In [0]:
# this is the augmentation configuration we will use for training
datagen = ImageDataGenerator()

def transform_image(image):
  return datagen.apply_transform(image.reshape((image_size, image_size, 1)), 
                                 transform_parameters = 
                       {'theta': rnd.uniform(-rotation_range, rotation_range),
                        'tx'   : rnd.uniform(-width_shift_range, width_shift_range),
                        'ty'   : rnd.uniform(-height_shift_range, height_shift_range),
                        'shear': rnd.uniform(-shear_range, shear_range),
                        'zx'   : rnd.uniform(-zoom_range, zoom_range),
                        'zy'   : rnd.uniform(-zoom_range, zoom_range)
                       })

#generate image pairs [x1, x2] with target y = 1/0 representing same/different
def datagen_flow(datagen, val = False):
    while True:
      X1 = np.ndarray(shape=(batch_size, image_size, image_size, 1))
      X2 = np.ndarray(shape=(batch_size, image_size, image_size, 1))
      Y = np.ndarray(shape=(batch_size,))
      
      s_alphabets = sorted(train_alphabets.values())
      a_indices = list(range(len(s_alphabets)))
      times = batch_size//(2*len(a_indices))
      remainder = (batch_size//2)%len(a_indices)
      
      aindices = a_indices*times + list(rnd.choice(a_indices, remainder))
      rnd.shuffle(aindices)
      
      w_range = list(range(12, 20) if val else range(12))
      
      i = 0   
      for a in aindices:
        end_index = (len(train_data) if a+1 == len(s_alphabets) else s_alphabets[a+1])
        c_range = list(range(s_alphabets[a], end_index))
        
        writers = rnd.choice(w_range, 2)
        same = rnd.choice(c_range)
        X1[2*i] = transform_image(train_data[same, writers[0]])
        X2[2*i] = transform_image(train_data[same, writers[1]])
        Y[2*i] = 1.0
        
        writers = rnd.choice(w_range, 2)
        diff = rnd.choice(c_range, 2)
        X1[2*i + 1] = transform_image(train_data[diff[0], writers[0]])
        X2[2*i + 1] = transform_image(train_data[diff[1], writers[1]])
        Y[2*i + 1] = 0.0
        
        i += 1
        
      yield [X1, X2], Y

train_generator = datagen_flow(datagen)  

# this is a similar generator, for validation data that takes only the remaining 8 writers
train_dev_generator = datagen_flow(datagen, val=True)

In [0]:

w_init = RandomNormal(mean=0.0, stddev=1e-2)
b_init = RandomNormal(mean=0.5, stddev=1e-2)

In [0]:
input_shape=(image_size, image_size, 1)
left_input = Input(input_shape)
right_input = Input(input_shape)

# Start construction of the Keras Sequential model.
convnet = Sequential()

# First convolutional layer with activation, batchnorm and max-pooling.
convnet.add(Conv2D(kernel_size=10, strides=1, filters=64, padding='valid',
                   input_shape=input_shape, bias_initializer=b_init,
                   activation=conv_activation,
                   name='layer_conv1', kernel_regularizer=l2(conv_regularization_parameter)))
convnet.add(BatchNormalization(axis = 3, momentum=0.5, name = 'bn1'))
convnet.add(MaxPooling2D(pool_size=2, strides=2, name="max_pooling1"))

# Second convolutional layer with activation, batchnorm and max-pooling.
convnet.add(Conv2D(kernel_size=7, strides=1, filters=128, padding='valid',
                  kernel_initializer=w_init, bias_initializer=b_init,
                 activation=conv_activation, name='layer_conv2', kernel_regularizer=l2(conv_regularization_parameter)))
convnet.add(BatchNormalization(axis = 3, name = 'bn2'))
convnet.add(MaxPooling2D(pool_size=2, strides=2, name="max_pooling2"))

# Third convolutional layer with activation, batchnorm and max-pooling.
convnet.add(Conv2D(kernel_size=4, strides=1, filters=128, padding='valid',
                  kernel_initializer=w_init, bias_initializer=b_init,
                 activation=conv_activation, name='layer_conv3', kernel_regularizer=l2(conv_regularization_parameter)))
convnet.add(BatchNormalization(axis = 3, name = 'bn3'))
convnet.add(MaxPooling2D(pool_size=2, strides=2, name="max_pooling3"))

# Fourth convolutional layer with activation, batchnorm and max-pooling.
convnet.add(Conv2D(kernel_size=4, strides=1, filters=256, padding='valid',
                  kernel_initializer=w_init, bias_initializer=b_init,
                 activation=conv_activation, name='layer_conv4', kernel_regularizer=l2(conv_regularization_parameter)))
convnet.add(BatchNormalization(axis = 3, name = 'bn4'))
convnet.add(MaxPooling2D(pool_size=2, strides=2, name="max_pooling4"))

# Flatten the 4-rank output of the convolutional layers
# to 2-rank that can be input to a fully-connected / dense layer.
convnet.add(Flatten())


# First fully-connected / dense layer with activation.
convnet.add(Dense(4096, activation=dense_activation,
                 kernel_initializer=w_init, bias_initializer=b_init,
                 name = "dense_1", kernel_regularizer=l2(dense_regularization_parameter)))
convnet.add(BatchNormalization(axis = 1, name = 'bn5'))

#call the convnet Sequential model on each of the input tensors so params will be shared
encoded_l = convnet(left_input)
encoded_r = convnet(right_input)

#layer to merge two encoded inputs with the l1 distance between them
L1_layer = Lambda(lambda tensors:K.abs(tensors[0] - tensors[1]))

#call this layer on list of two input tensors.
L1_distance = L1_layer([encoded_l, encoded_r])

prediction = Dense(1,activation='sigmoid',bias_initializer=b_init)(L1_distance)

model = Model(inputs=[left_input,right_input],outputs=prediction)

In [0]:
from tensorflow.python.keras.optimizers import SGD, Adam

#optimizer = SGD(lr=learning_rate, momentum=0.5)
optimizer = Adam(lr=learning_rate)

model.compile(optimizer=optimizer,
              loss='binary_crossentropy',
              metrics=['accuracy'])

steps_train = batches_per_epoch
steps_validation = batches_per_epoch 

In [0]:
from tensorflow.python.keras.callbacks import ModelCheckpoint, Callback, LearningRateScheduler, ReduceLROnPlateau

model_checkpoint = ModelCheckpoint(recognition_model_path, monitor='val_loss',
                                   save_best_only=True, period=10)

lr_scheduler = LearningRateScheduler(lambda epoch, lr: 0.99*lr)

reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.5,
                              patience=5, min_lr=1e-4)

class LearningRateFinder(Callback):
  def __init__(self, steps=100, period=10):
    super(LearningRateFinder, self).__init__()
    self.steps = steps
    self.batch_size=batch_size
    self.period = period
    self.best_lr = 1e-4
    self.best_loss = 1000
    self.find_lr = True
    self.current_lr = None
    self.training_path = os.path.join(one_shot_path, "training_model.h5")
    self.model_weights = None
  
  def reset_values(self):
    K.set_value(self.model.optimizer.lr, self.best_lr)
    self.best_lr = 1e-4
    self.best_loss = 1000
    self.model = load_model(self.training_path)
    
  def on_train_begin(self, logs={}):
    return

  def on_train_end(self, logs={}):
    return

  def on_epoch_begin(self, epoch, logs={}):
    self.find_lr = epoch % self.period == 0
    if epoch % self.period == 1:
      print("Learning Rate: " + "{0:.2g}".format(K.get_value(self.model.optimizer.lr)))
    if(self.find_lr):
      self.current_lr = K.get_value(self.model.optimizer.lr)
      self.model.save(self.training_path)
      self.model_weights = self.model.get_weights()
      
  def on_epoch_end(self, epoch, logs={}):
    if(self.find_lr):
      self.reset_values()
    return 

  def on_batch_begin(self, batch, logs={}):
    if(self.find_lr):
      K.set_value(self.model.optimizer.lr, 10**(2*batch/self.steps + np.log10(self.current_lr) - 1))
    return

  def on_batch_end(self, batch, logs={}):
    if(self.find_lr):
      loss = logs.get('loss')
      if loss < self.best_loss:
        self.best_loss = loss
        self.best_lr = K.get_value(self.model.optimizer.lr)
      elif loss >= 1.25*self.best_loss:
        self.find_lr = False
        self.reset_values()
      self.model.set_weights(self.model_weights)
      
    return
  
lr_finder = LearningRateFinder(steps=steps_train, period=n_epochs//4)

In [0]:
model.fit_generator(train_generator, 
                    steps_per_epoch = steps_train,
                    epochs=n_epochs,
                    validation_data = train_dev_generator,
                    validation_steps = steps_validation,
                    callbacks = [model_checkpoint, lr_scheduler, reduce_lr]
         )

In [0]:
model = load_model(recognition_model_path)

In [0]:
##creating test set
test_data = np.ndarray(shape=(659, 20, 105, 105))
test_alphabets = dict()

#for alphabet in os.listdir(evaluation_path):
#  alphabet_path = os.path.join(evaluation_path, alphabet)
#  for character in os.listdir(alphabet_path):
#    character_path = os.path.join(alphabet_path, character)
#    for image in os.listdir(character_path):
#      index = int(image[0:4]) - 965
#      writer = int(image[5:7]) - 1
#      test_data[index][writer] = np.array(Image.open(os.path.join(character_path, image)))
#      test_alphabets[alphabet] = index if alphabet not in test_alphabets or test_alphabets[alphabet] > index else test_alphabets[alphabet]

#with open(os.path.join("test.pickle"), 'wb') as f:
#  pickle.dump([test_data, test_alphabets], f, protocol=2)

In [0]:
with open(os.path.join(one_shot_path, "test.pickle"), 'rb') as f:
  test_data, test_alphabets = pickle.load(f, encoding='latin1')

In [0]:
N = 20
st_alphabets = sorted(test_alphabets.values())
correct = 0
show = True
for i in range(len(st_alphabets)):
  end_index = len(test_data) if i+1 == len(st_alphabets) else st_alphabets[i+1] 
  c_range = list(range(st_alphabets[i],end_index))
  
  for j in range(2):
    c_list = rnd.choice(c_range, N)
    w_list = rnd.choice(range(20), 2)
    
    for c_i in range(N):
      image = test_data[c_list[c_i]][w_list[0]]
      
      X1 = np.array([image]*N).reshape((N, image_size, image_size, 1))
      X2 = np.array(test_data[c_list][w_list[1]]).reshape((N, image_size, image_size, 1))
      if show and c_i == 2 and i == 3:
        plt.imshow(image)
        plt.show()
        for m in range(N):
          plt.imshow(test_data[c_list[m]][w_list[1]])
          plt.show()
        
        
      targets = np.zeros((N,))
      targets[c_i] = 1
      predictions = model.predict([X1, X2])
      
      if show and c_i == 2 and i == 3:
        print(targets)
        print(predictions)
        show = False
        
      if(np.argmax(predictions) == np.argmax(targets)):
        correct += 1

print(str(N) + "-Way Classification Accuracy: " + "{0:.2f}".format(correct/(N*20*2)))  