<a href="https://colab.research.google.com/github/KT2001/Siamese-Model-for-Omniglot-Dataset/blob/Master/Omniglot_Dataset_OneShotProject.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# import necessary libraries
import os
import random
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from PIL import Image
from PIL import ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True

# import Tensorflow libraries
import tensorflow as tf
import tensorflow.keras.models as models
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten, Lambda
from tensorflow.compat.v1 import ConfigProto
from keras import backend as K
from keras.optimizers import SGD,Adam
from keras.regularizers import l2
from tensorflow.compat.v1 import InteractiveSession
config = ConfigProto()
config.gpu_options.per_process_gpu_memory_fraction = 0.5
config.gpu_options.allow_growth = True
session = InteractiveSession(config=config)

import time
import uuid
import cv2

import scipy.ndimage as ndi
import sys
import imageio
from imageio import imread

from sklearn.utils import shuffle

In [2]:
# Avoid OOM errors by setting GPU Memory Consumption Growth
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus: 
    tf.config.experimental.set_memory_growth(gpu, True)

In [3]:
!nvidia-smi

Mon Dec 19 13:42:45 2022       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 460.32.03    Driver Version: 460.32.03    CUDA Version: 11.2     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  Tesla T4            Off  | 00000000:00:04.0 Off |                    0 |
| N/A   73C    P0    33W /  70W |    312MiB / 15109MiB |      0%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Proces

In [None]:
!unzip /content/drive/MyDrive/Siamese/Omniglot/images_background.zip

In [None]:
!unzip /content/drive/MyDrive/Siamese/Omniglot/images_evaluation.zip

In [6]:
num_classes_background = {}
sum = 0
dir = os.listdir('/content/images_background')
print(f"Number of classes: {len(dir)} \n")
for filenames in dir:
  num_classes_background[filenames] = len(os.listdir(f"{'/content/images_background'}/{filenames}"))
  sum = num_classes_background[filenames]+sum
  print(f"{filenames}: {(num_classes_background[filenames])}")
print(f"\nTotal number of images: {sum}")

Number of classes: 30 

Futurama: 26
Arcadian: 26
Syriac_(Estrangelo): 23
Alphabet_of_the_Magi: 20
Tagalog: 17
Cyrillic: 33
Gujarati: 48
Japanese_(katakana): 47
Inuktitut_(Canadian_Aboriginal_Syllabics): 16
Blackfoot_(Canadian_Aboriginal_Syllabics): 14
N_Ko: 33
Mkhedruli_(Georgian): 41
Japanese_(hiragana): 52
Latin: 26
Tifinagh: 55
Korean: 40
Ojibwe_(Canadian_Aboriginal_Syllabics): 14
Grantha: 43
Early_Aramaic: 22
Burmese_(Myanmar): 34
Anglo-Saxon_Futhorc: 29
Asomtavruli_(Georgian): 40
Armenian: 41
Braille: 26
Balinese: 24
Hebrew: 22
Malay_(Jawi_-_Arabic): 40
Sanskrit: 42
Bengali: 46
Greek: 24

Total number of images: 964


In [7]:
num_classes_evaluation = {}
sum = 0
dir = os.listdir('/content/images_evaluation')
print(f"Number of classes: {len(dir)} \n")
for filenames in dir:
  num_classes_evaluation[filenames] = len(os.listdir(f"{'/content/images_evaluation'}/{filenames}"))
  sum = num_classes_evaluation[filenames]+sum
  print(f"{filenames}: {(num_classes_evaluation[filenames])}")
print(f"\nTotal number of images: {sum}")

Number of classes: 20 

Atlantean: 26
Manipuri: 40
Malayalam: 47
Old_Church_Slavonic_(Cyrillic): 45
Avesta: 26
Tengwar: 25
Sylheti: 28
Ge_ez: 26
Angelic: 20
Keble: 26
Tibetan: 42
Gurmukhi: 45
Glagolitic: 45
Mongolian: 30
ULOG: 26
Kannada: 41
Atemayar_Qelisayer: 26
Oriya: 46
Syriac_(Serto): 23
Aurek-Besh: 26

Total number of images: 659


In [8]:
train_path = os.path.join('/content/images_background')
val_path = os.path.join('/content/images_evaluation')

In [12]:
def aug(img):
  img = tf.image.resize(img, size=[105, 105])
  #img = tf.image.rgb_to_grayscale(img)
  img = tf.image.stateless_random_brightness(img, max_delta=0.02, seed=(1,2))
  img = tf.image.stateless_random_contrast(img, lower=0.6, upper=1, seed=(1,3))
  # img = tf.image.stateless_random_crop(img, size=(20,20,3), seed=(1,2))
  img = tf.image.stateless_random_flip_left_right(img, seed=(np.random.randint(100),np.random.randint(100)))
  #img = tf.image.stateless_random_jpeg_quality(img, min_jpeg_quality=90, max_jpeg_quality=100, seed=(np.random.randint(100),np.random.randint(100)))
  #img = tf.image.stateless_random_saturation(img, lower=0.9,upper=1, seed=(np.random.randint(100),np.random.randint(100)))
  
  return img


In [17]:
def loading(path,n = 0, verbose = 0):
    '''
    path => Path of train directory or test directory
    '''
    X=[]
    y = []
    cat_dict = {}
    lang_dict = {}
    curr_y = n
    # we load every alphabet seperately so we can isolate them later
    for alphabet in os.listdir(path):
        print("loading alphabet: " + alphabet)
        lang_dict[alphabet] = [curr_y,None]
        alphabet_path = os.path.join(path,alphabet)
        # every letter/category has it's own column in the array, so  load seperately
        for letter in os.listdir(alphabet_path):
            cat_dict[curr_y] = (alphabet, letter)
            category_images=[]
            letter_path = os.path.join(alphabet_path, letter)
            # read all the images in the current category
            for filename in os.listdir(letter_path):
                image_path = os.path.join(letter_path, filename)
                image = imread(image_path)
                category_images.append(image)
                y.append(curr_y)
            try:
                X.append(np.stack(category_images))
            # edge case  - last one
            except ValueError as e:
                print(e)
                print("error - category_images:", category_images)
            curr_y += 1
            lang_dict[alphabet][1] = curr_y - 1
    y = np.vstack(y)
    X = np.stack(X)
    return X,y,lang_dict

In [18]:
xtrain, y_train, c_train = loading(train_path, verbose = True)

loading alphabet: Futurama
loading alphabet: Arcadian
loading alphabet: Syriac_(Estrangelo)
loading alphabet: Alphabet_of_the_Magi
loading alphabet: Tagalog
loading alphabet: Cyrillic
loading alphabet: Gujarati
loading alphabet: Japanese_(katakana)
loading alphabet: Inuktitut_(Canadian_Aboriginal_Syllabics)
loading alphabet: Blackfoot_(Canadian_Aboriginal_Syllabics)
loading alphabet: N_Ko
loading alphabet: Mkhedruli_(Georgian)
loading alphabet: Japanese_(hiragana)
loading alphabet: Latin
loading alphabet: Tifinagh
loading alphabet: Korean
loading alphabet: Ojibwe_(Canadian_Aboriginal_Syllabics)
loading alphabet: Grantha
loading alphabet: Early_Aramaic
loading alphabet: Burmese_(Myanmar)
loading alphabet: Anglo-Saxon_Futhorc
loading alphabet: Asomtavruli_(Georgian)
loading alphabet: Armenian
loading alphabet: Braille
loading alphabet: Balinese
loading alphabet: Hebrew
loading alphabet: Malay_(Jawi_-_Arabic)
loading alphabet: Sanskrit
loading alphabet: Bengali
loading alphabet: Greek


In [19]:
# loading the validation set
xval, y_val, c_val = loading(val_path)

loading alphabet: Atlantean
loading alphabet: Manipuri
loading alphabet: Malayalam
loading alphabet: Old_Church_Slavonic_(Cyrillic)
loading alphabet: Avesta
loading alphabet: Tengwar
loading alphabet: Sylheti
loading alphabet: Ge_ez
loading alphabet: Angelic
loading alphabet: Keble
loading alphabet: Tibetan
loading alphabet: Gurmukhi
loading alphabet: Glagolitic
loading alphabet: Mongolian
loading alphabet: ULOG
loading alphabet: Kannada
loading alphabet: Atemayar_Qelisayer
loading alphabet: Oriya
loading alphabet: Syriac_(Serto)
loading alphabet: Aurek-Besh


In [20]:
xval.shape

(659, 20, 105, 105)

In [23]:
xtrain.shape

(964, 20, 105, 105)

In [24]:
def get_batches(batch_size, data=xtrain):
  #tf.config.run_functions_eagerly(True)
  n_classes, n_examples, w, h = data.shape

  # initialize the data we return in memory
  pairs = [np.zeros((batch_size, h, w, 1)) for i in range(2)]

  # make the target vector with half same half other category
  targets = np.zeros((batch_size,))
  targets[batch_size//2:] = 1

  # pick the categories of characters we will return 
  categories = np.random.choice(n_classes, size=(batch_size,), replace = False)

  for i in range(batch_size):
    category = categories[i]

    ## choose two indices from the amount of examples 
    id1 = np.random.randint(0, n_examples)
    id2 = np.random.randint(0, n_examples)

    if targets[i] == 0:
      category_2 = category # if target is set pick from same class 
    else:
      # pick new classes by picking random number 
      category_2 = (category + np.random.randint(1, n_classes)) % n_classes

    pairs[0][i,:,:,:] = data[category, id1].reshape(w, h, 1)
    pairs[1][i,:,:,:] = data[category_2, id2].reshape(w, h, 1)

  return pairs, targets

In [26]:
from sklearn.utils import shuffle

In [27]:
def make_oneshot_tasks(N, data, language=None):
    """Create pairs of test image, support set for testing N way one-shot learning. """
    if data == xtrain:
        X = xtrain
        categories = c_train
    else:
        X = xval
        categories = c_val
    n_classes, n_examples, w, h = X.shape
    
    indices = np.random.randint(0, n_examples,size=(N,))
    if language is not None: # if language is specified, select characters for that language
        low, high = categories[language]
        if N > high - low:
            raise ValueError("This language ({}) has less than {} letters".format(language, N))
        categories = np.random.choice(range(low,high),size=(N,),replace=False)

    else: # if no language specified just pick a bunch of random letters
        categories = np.random.choice(range(n_classes),size=(N,),replace=False)            
    true_category = categories[0]
    ex1, ex2 = np.random.choice(n_examples,replace=False,size=(2,))
    test_image = np.asarray([X[true_category,ex1,:,:]]*N).reshape(N, w, h,1)
    support_set = X[categories,indices,:,:]
    support_set[0,:,:] = X[true_category,ex2]
    support_set = support_set.reshape(N, w, h,1)
    targets = np.zeros((N,))
    targets[0] = 1
    targets, test_image, support_set = shuffle(targets, test_image, support_set)
    pairs = [test_image,support_set]

    return pairs, targets

In [28]:
def test_oneshots(model, N, k, s = xval, verbose = 0):
    """Test average N way oneshot learning accuracy of a siamese neural net over k one-shot tasks"""
    n_correct = 0
    if verbose:
        print("Evaluating model on {} random {} way one-shot learning tasks ... \n".format(k,N))
    for i in range(k):
        inputs, targets = make_oneshot_tasks(N,s)
        probs = model.predict(inputs)
        if np.argmax(probs) == np.argmax(targets):
            n_correct = n_correct+1
    percent_correct = (100.0 * n_correct / k)
    if verbose:
        print("Got an average of {}% {} way one-shot learning accuracy \n".format(percent_correct,N))
    return percent_correct

In [29]:
def W_init(shape, dtype=None):
    """Initialize weights as in paper"""
    values = np.random.normal(loc=0,scale=1e-2,size=shape)
    return K.variable(values, dtype=dtype)
#//TODO: figure out how to initialize layer biases in keras.
def b_init(shape, dtype=None):
    """Initialize bias as in paper"""
    values=np.random.normal(loc=0.5,scale=1e-2,size=shape)
    return K.variable(values, dtype=dtype)

In [30]:
'''
input_shape = (105, 105, 1)
left_input = Input(input_shape)
right_input = Input(input_shape)
#build convnet to use in each siamese 'leg'
convnet = models.Sequential()
convnet.add(Conv2D(64,(10,10),activation='relu',input_shape=input_shape, kernel_initializer=W_init,kernel_regularizer=l2(2e-4)))
convnet.add(MaxPooling2D())
convnet.add(Conv2D(128,(7,7),activation='relu',
                   kernel_regularizer=l2(2e-4),kernel_initializer=W_init,bias_initializer=b_init))
convnet.add(MaxPooling2D())
convnet.add(Conv2D(128,(4,4),activation='relu',kernel_initializer=W_init,kernel_regularizer=l2(2e-4),bias_initializer=b_init))
convnet.add(MaxPooling2D())
convnet.add(Conv2D(256,(4,4),activation='relu',kernel_initializer=W_init,kernel_regularizer=l2(2e-4),bias_initializer=b_init))
convnet.add(Flatten())
convnet.add(Dense(4096,activation="sigmoid",kernel_regularizer=l2(1e-3),kernel_initializer=W_init,bias_initializer=b_init))

#call the convnet Sequential model on each of the input tensors so params will be shared
encoded_l = convnet(left_input)
encoded_r = convnet(right_input)
#layer to merge two encoded inputs with the l1 distance between them
L1_layer = Lambda(lambda tensors:K.abs(tensors[0] - tensors[1]))
#call this layer on list of two input tensors.
L1_distance = L1_layer([encoded_l, encoded_r])
prediction = Dense(1,activation='sigmoid',bias_initializer=b_init)(L1_distance)
siamese_net = models.Model(inputs=[left_input,right_input],outputs=prediction)

optimizer = Adam(0.00006)
'''

In [70]:
def make_embeddings():
  inp = Input(shape=(105,105,1), name='input_image')
  c1 = Conv2D(64,(10,10),activation='relu',input_shape=input_shape, kernel_initializer=W_init,kernel_regularizer=l2(2e-4))(inp)
  m1 = MaxPooling2D(64, (2,2), padding='same')(c1)

  c2 = Conv2D(128,(7,7),activation='relu',
                   kernel_regularizer=l2(2e-4),kernel_initializer=W_init,bias_initializer=b_init)(m1)
  m2 = MaxPooling2D(64, (2,2), padding='same')(c2)

  c3 = Conv2D(128,(4,4),activation='relu',kernel_initializer=W_init,kernel_regularizer=l2(2e-4),bias_initializer=b_init)(m2)
  m3 = MaxPooling2D(64, (2,2), padding='same')(c3)

  c4 = Conv2D(256,(4,4),activation='relu',kernel_initializer=W_init,kernel_regularizer=l2(2e-4),bias_initializer=b_init)(m3)

  f1 = Flatten()(c4)

  d1 = Dense(4096,activation="sigmoid",kernel_regularizer=l2(1e-3),kernel_initializer=W_init,bias_initializer=b_init)(f1)

  return models.Model(inputs = [inp], outputs = [d1], name = 'embedding')

#call the convnet Sequential model on each of the input tensors so params will be shared
embedding = make_embeddings()
'''
#layer to merge two encoded inputs with the l1 distance between them
L1_layer = Lambda(lambda tensors:K.abs(tensors[0] - tensors[1]))
#call this layer on list of two input tensors.
L1_distance = L1_layer([embedding_left, embedding_right])
prediction = Dense(1,activation='sigmoid',bias_initializer=b_init)(L1_distance)
siamese_net = models.Model(inputs=[left_input,right_input],outputs=prediction)
'''

"\n#layer to merge two encoded inputs with the l1 distance between them\nL1_layer = Lambda(lambda tensors:K.abs(tensors[0] - tensors[1]))\n#call this layer on list of two input tensors.\nL1_distance = L1_layer([embedding_left, embedding_right])\nprediction = Dense(1,activation='sigmoid',bias_initializer=b_init)(L1_distance)\nsiamese_net = models.Model(inputs=[left_input,right_input],outputs=prediction)\n"

In [71]:
embedding.summary()

Model: "embedding"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_image (InputLayer)    [(None, 105, 105, 1)]     0         
                                                                 
 conv2d_99 (Conv2D)          (None, 96, 96, 64)        6464      
                                                                 
 max_pooling2d_72 (MaxPoolin  (None, 48, 48, 64)       0         
 g2D)                                                            
                                                                 
 conv2d_100 (Conv2D)         (None, 42, 42, 128)       401536    
                                                                 
 max_pooling2d_73 (MaxPoolin  (None, 21, 21, 128)      0         
 g2D)                                                            
                                                                 
 conv2d_101 (Conv2D)         (None, 18, 18, 128)       26

In [68]:
# Siamese L1 Distance class
class L1Dist(Layer):

    def __init__(self, **kwargs):
        super().__init__()
       
    # similarity calculation
    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding)

In [72]:
def siamese_model():
  input_shape = (105, 105, 1)
  left_input = Input(input_shape)
  right_input = Input(input_shape)
  #embedding_left = make_embeddings(left_input)
  #embedding_right = make_embeddings(right_input)

  siamese_layer = L1Dist()
  #siamese_layer._name = 'distance'

  distances = siamese_layer(embedding(left_input), embedding(right_input))

  # Classification layer
  classifier = Dense(1,activation='sigmoid',bias_initializer=b_init)(distances)

  return models.Model(inputs=[left_input,right_input],outputs=classifier)


In [73]:
siamese_net = siamese_model()

In [74]:
siamese_net.summary()

Model: "model_1"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_34 (InputLayer)          [(None, 105, 105, 1  0           []                               
                                )]                                                                
                                                                                                  
 input_35 (InputLayer)          [(None, 105, 105, 1  0           []                               
                                )]                                                                
                                                                                                  
 embedding (Functional)         (None, 4096)         38947648    ['input_34[0][0]',               
                                                                  'input_35[0][0]']         

In [75]:
#//TODO: get layerwise learning rates and momentum annealing scheme described in paperworking
siamese_net.compile(loss="binary_crossentropy",optimizer=Adam(0.00006))

siamese_net.count_params()

38951745

In [76]:
siamese_net.summary()

Model: "model_1"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_34 (InputLayer)          [(None, 105, 105, 1  0           []                               
                                )]                                                                
                                                                                                  
 input_35 (InputLayer)          [(None, 105, 105, 1  0           []                               
                                )]                                                                
                                                                                                  
 embedding (Functional)         (None, 4096)         38947648    ['input_34[0][0]',               
                                                                  'input_35[0][0]']         

In [79]:
# Hyper parameters
evaluate_every = 200 # interval for evaluating on one-shot tasks
batch_size = 32
n_iter = 8000 # No. of training iterations
N_way = 20 # how many classes for testing one-shot tasks
n_val = 250 # how many one-shot tasks to validate on
best = []

In [80]:
best.append(0)

In [81]:
model_path = '/content/drive/MyDrive/Siamese/Weights/Weights_1'

In [82]:
print("Starting training process!")
print("-------------------------------------")
t_start = time.time()
for i in range(1, n_iter+1):
    inputs,targets = get_batches(batch_size)
    loss = siamese_net.train_on_batch(inputs, targets)
    if i % evaluate_every == 0:
        print("\n ------------- \n")
        print("Time for {0} iterations: {1} mins".format(i, (time.time()-t_start)/60.0))
        print("Train Loss: {0}".format(loss)) 
        val_acc = test_oneshots(siamese_net, N_way, n_val, verbose=True)
        #siamese_net.save_weights(os.path.join(model_path, 'weights.{}.h5'.format(i)))
        '''
        if val_acc >= best:
            print("Current best: {0}, previous best: {1}".format(val_acc, best))
            best = val_acc
        '''
        best.append(val_acc)
        print(f"Current best:{val_acc}")
siamese_net.save_weights(os.path.join(model_path, 'weights.{}.h5'.format(i)))


Starting training process!
-------------------------------------

 ------------- 

Time for 200 iterations: 1.583556071917216 mins
Train Loss: 1.9527766704559326
Evaluating model on 250 random 20 way one-shot learning tasks ... 



  if data == xtrain:


Got an average of 0.0% 20 way one-shot learning accuracy 

Current best:0.0

 ------------- 

Time for 400 iterations: 3.664175756772359 mins
Train Loss: 1.233680009841919
Evaluating model on 250 random 20 way one-shot learning tasks ... 

Got an average of 0.0% 20 way one-shot learning accuracy 

Current best:0.0

 ------------- 

Time for 600 iterations: 5.779332494735717 mins
Train Loss: 1.0253713130950928
Evaluating model on 250 random 20 way one-shot learning tasks ... 

Got an average of 0.0% 20 way one-shot learning accuracy 

Current best:0.0

 ------------- 

Time for 800 iterations: 7.906153412659963 mins
Train Loss: 0.811917781829834
Evaluating model on 250 random 20 way one-shot learning tasks ... 

Got an average of 0.0% 20 way one-shot learning accuracy 

Current best:0.0

 ------------- 

Time for 1000 iterations: 10.031447398662568 mins
Train Loss: 0.731614887714386
Evaluating model on 250 random 20 way one-shot learning tasks ... 

Got an average of 0.4% 20 way one-sho

KeyboardInterrupt: ignored

In [89]:
best

[0, 0.0, 0.0, 0.0, 0.0, 0.4, 0.0, 0.8, 0.0, 0.0, 0.4]

In [None]:
n_correct = 0
for i in range(20):
  inputs, targets = make_oneshot_tasks(N=20,data=xval)
  probs = siamese_net.predict(inputs)
print(n_correct)
print(np.argmax(targets))
print(targets)
print("--------")
print(np.argmax(probs))
print(probs)
#percent_correct = (100.0 * n_correct / 1)
#percent_correct



  if data == xtrain:


0
7
[0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
--------
10
[[0.999951  ]
 [0.89835787]
 [0.7864284 ]
 [0.99066496]
 [0.99849224]
 [0.99998987]
 [0.9998016 ]
 [0.00374138]
 [0.09852473]
 [0.9845558 ]
 [0.9999999 ]
 [0.9998964 ]
 [0.99996877]
 [0.8905903 ]
 [0.98736095]
 [0.99999976]
 [0.9999901 ]
 [0.9724901 ]
 [0.99999475]
 [0.99870944]]
