Libraries

In [None]:
import sys
import numpy as np
import pandas as pd
from imread import imread
import pickle
import os
import matplotlib.pyplot as plt
%matplotlib inline

import cv2
import time

import tensorflow as tf
from keras.models import Sequential
from tensorflow.keras.optimizers import Adam
from keras.layers import Conv2D, ZeroPadding2D, Activation, Input, concatenate
from keras.models import Model

from tensorflow.keras.models import Sequential
#from keras.layers.normalization import BatchNormalization
from keras.layers.pooling import MaxPooling2D
from keras.layers.merge import Concatenate
from keras.layers.core import Lambda, Flatten, Dense
from keras.initializers import glorot_uniform

from keras.utils.layer_utils import get_source_inputs
#from keras.engine.topology import Layer
from keras.regularizers import l2
from keras import backend as K

from sklearn.utils import shuffle

import numpy.random as rng




import os
import cv2
import torch
import numpy as np
import torch.nn as nn
from torch import optim
from numpy import random as rng
from sklearn.utils import shuffle
import torch.nn.functional as F
import time

print(torch.cuda.is_available())
device = torch.device("cuda")
torch.cuda.empty_cache()

cwd = os.getcwd()


train_folder = str(cwd) + '/images_background'
val_folder = str(cwd) +  '/images_evaluation'
save_path = str(cwd) +  '/data'


In [None]:
def loadimgs(path,n = 0):
    '''
    path => Path of train directory or test directory
    '''
    X=[]
    y = []
    cat_dict = {}
    lang_dict = {}
    curr_y = n
    # we load every alphabet seperately so we can isolate them later
    for alphabet in os.listdir(path):
        print("loading alphabet: " + alphabet)
        lang_dict[alphabet] = [curr_y,None]
        alphabet_path = os.path.join(path,alphabet)
        # every letter/category has it's own column in the array, so  load seperately
        for letter in os.listdir(alphabet_path):
            cat_dict[curr_y] = (alphabet, letter)
            category_images=[]
            letter_path = os.path.join(alphabet_path, letter)
            # read all the images in the current category
            for filename in os.listdir(letter_path):
                image_path = os.path.join(letter_path, filename)
                image = imread(image_path)
                category_images.append(image)
                y.append(curr_y)
            try:
                X.append(np.stack(category_images))
            # edge case  - last one
            except ValueError as e:
                print(e)
                print("error - category_images:", category_images)
            curr_y += 1
            lang_dict[alphabet][1] = curr_y - 1
    y = np.vstack(y)
    X = np.stack(X)
    return X,y,lang_dict

In [None]:
X,y,c=loadimgs(train_folder)

In [None]:
#with open(os.path.join(save_path,"train.pickle"), "wb") as f:
#    pickle.dump((X,c),f)

In [None]:
Xval,yval,cval=loadimgs(val_folder)

In [None]:
#with open(os.path.join(save_path,"val.pickle"), "wb") as f:
#    pickle.dump((Xval,cval),f)

In [None]:
def initialize_weights(shape, name=None):
    """
        The paper, http://www.cs.utoronto.ca/~gkoch/files/msc-thesis.pdf
        suggests to initialize CNN layer weights with mean as 0.0 and standard deviation of 0.01
    """
    return np.random.normal(loc = 0.0, scale = 1e-2, size = shape)

In [None]:
def initialize_bias(shape, name=None):
    """
        The paper, http://www.cs.utoronto.ca/~gkoch/files/msc-thesis.pdf
        suggests to initialize CNN layer bias with mean as 0.5 and standard deviation of 0.01
    """
    return np.random.normal(loc = 0.5, scale = 1e-2, size = shape)

In [None]:
class SNN(nn.Module):
    def __init__(self):
        super(SNN, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(1, 64, 10),  # 64@96*96
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2),  # 64@48*48
            nn.Conv2d(64, 128, 7),
            nn.ReLU(),    # 128@42*42
            nn.MaxPool2d(2),   # 128@21*21
            nn.Conv2d(128, 128, 4),
            nn.ReLU(), # 128@18*18
            nn.MaxPool2d(2), # 128@9*9
            nn.Conv2d(128, 256, 4),
            nn.ReLU(),   # 256@6*6
        )
        self.liner = nn.Sequential(nn.Linear(9216, 4096), nn.Sigmoid())
        self.out = nn.Linear(4096, 1)

    def forward_one(self, x):
        x = self.conv(x)
        x = x.view(x.size()[0], -1)
        x = self.liner(x)
        return x

    def forward(self, x1, x2):
        out1 = self.forward_one(x1)
        out2 = self.forward_one(x2)
        dis = torch.abs(out1 - out2)
        out = self.out(dis)
        #  return self.sigmoid(out)
        return out


def get_siamese_model(input_shape):
    """
        Model architecture based on the one provided in: http://www.cs.utoronto.ca/~gkoch/files/msc-thesis.pdf
    """
    
    # Define the tensors for the two input images
    left_input = Input(input_shape)
    right_input = Input(input_shape)
    
    # Convolutional Neural Network
    model = Sequential()
    model.add(Conv2D(64, (10,10), activation='relu', input_shape=input_shape,
                   kernel_initializer=initialize_weights, kernel_regularizer=l2(2e-4)))
    model.add(MaxPooling2D())
    model.add(Conv2D(128, (7,7), activation='relu',
                     kernel_initializer=initialize_weights,
                     bias_initializer=initialize_bias, kernel_regularizer=l2(2e-4)))
    model.add(MaxPooling2D())
    model.add(Conv2D(128, (4,4), activation='relu', kernel_initializer=initialize_weights,
                     bias_initializer=initialize_bias, kernel_regularizer=l2(2e-4)))
    model.add(MaxPooling2D())
    model.add(Conv2D(256, (4,4), activation='relu', kernel_initializer=initialize_weights,
                     bias_initializer=initialize_bias, kernel_regularizer=l2(2e-4)))
    model.add(Flatten())
    model.add(Dense(4096, activation='sigmoid',
                   kernel_regularizer=l2(1e-3),
                   kernel_initializer=initialize_weights,bias_initializer=initialize_bias))
    
    # Generate the encodings (feature vectors) for the two images
    encoded_l = model(left_input)
    encoded_r = model(right_input)
    
    # Add a customized layer to compute the absolute difference between the encodings
    L1_layer = Lambda(lambda tensors:K.abs(tensors[0] - tensors[1]))
    L1_distance = L1_layer([encoded_l, encoded_r])
    
    # Add a dense layer with a sigmoid unit to generate the similarity score
    prediction = Dense(1,activation='sigmoid',bias_initializer=initialize_bias)(L1_distance)
    
    # Connect the inputs with the outputs
    siamese_net = Model(inputs=[left_input,right_input],outputs=prediction)
    
    # return the model
    return siamese_net

Data Load function

Returns: 

X: np.array of images

y: np.array of labels, a number from 0 to 963

lang_dict: dictionary of alphabets, each dictionary's entry is a range of values that belong to it


In [None]:
def load_data(dir_path):
    X = []
    y = []
    lang_dict = {}
    classNo = 0
    
    for alphabet in sorted(os.listdir(dir_path)):
        lang_dict[alphabet] = [classNo, None]
        alpha_path = os.path.join(dir_path, alphabet)
        
        for character in sorted(os.listdir(alpha_path)):
            cat_images = []
            
            for img in sorted(os.listdir(os.path.join(alpha_path, character))):
                img_path = os.path.join(alpha_path, character, img)
                cat_images.append(cv2.cvtColor(cv2.imread(img_path),cv2.COLOR_BGR2GRAY))
                y.append(classNo)
                
            classNo += 1
            X.append(cat_images)
        lang_dict[alphabet][1] = classNo-1
        
    X = np.array(X)
    y = np.array(y)
    
    return X, y, lang_dict

Load train DS made up of 30 alphabets and the test DS made up of 20 alphabets

In [None]:
X_train, y_train, ld_train = load_data(train_folder)
X_test, y_test, ld_test = load_data(val_folder)

get_batch function

input: batch_size and DS selector

output:

pairs: pairs of images that are non matching in the first half and matching in the second

targets: np.array of 0s and 1s to sign if they're matching

In [None]:
def get_batch(batch_size, ds = 'train', addi = 'norm'):
    
    #DS selector
    if ds == 'train':
        X = X_train
    else:
        X = X_test
    n_classes, n_examples, w, h = X.shape
    cat = rng.choice(n_classes, size = batch_size, replace = False) # Sampling category without replacement
    targets = np.zeros((batch_size,))
    pairs = [np.zeros((batch_size, w, h, 1)) for _ in range(2)]        
    if addi == 'norm':
        targets[batch_size//2:] = 1

        # "pairs" is a matrix of height 2 that contains examples to confront
        # The first half has non matching examples, the second half has matching examples
        for i in range(batch_size):
            ex = rng.randint(n_examples) #example's number

            #extracting the ex's example from i's category
            #passing it to pairs' first example
            pairs[0][i, :, :, :] = X[cat[i], ex, :, :].reshape(w, h, 1)
            cat2 = 0
            if i >= batch_size // 2:
                cat2 = cat[i]
            else:
                cat2 = (cat[i] + rng.randint(1, n_classes)) % n_classes
            ex2 = rng.randint(n_examples)

            #same as earlier
            pairs[1][i, :, :, :] = X[cat2, ex2, :, :].reshape(w, h, 1)
    else:
        print("Not norm")
        # Extract 1 matching pair and the rest are non-matching
        extracted = 0

        for i in range(batch_size):
            ex = rng.randint(n_examples) #example's number

            #extracting the ex's example from i's category
            #passing it to pairs' first example
            pairs[0][i, :, :, :] = X[cat[i], ex, :, :].reshape(w, h, 1)
            
            num = rng.randint(batch_size)
            cat2 = 0
            if num == 1 and extracted == 0:
                extracted = 1
                cat2 = (cat[i] + rng.randint(1, n_classes)) % n_classes 
            ex2 = rng.randint(n_examples)
            pairs[1][i, :, :, :] = X[cat2, ex2, :, :].reshape(w, h, 1)

    return pairs, targets

one_shot function.

Output: vector of pairs of characters, with only one matching.


In [None]:
def one_shot(N, ds = 'val'):
    if ds == 'train':
        X = X_train
    else:
        X = X_test
    
    n_classes, n_examples, w, h = X.shape
    cats = rng.choice(n_classes, size = (N,)) # sample N categories with repetition
    indices = rng.choice(n_examples, size = (N,)) # sample N examples with repetition
    cat = cats[0] # get first category
    ex1 = rng.randint(n_examples) # sample example number
    test_image = np.array([X[cat, ex1]] * N).reshape(N, w, h, 1) #sample a test image from first category
    support_set = X[cats,indices].reshape(N,w,h,1)
    targets = np.zeros((N,))
    targets[0] = 1
    
    test_image, support_set, targets = shuffle(test_image, support_set, targets) #?
    
    return [test_image, support_set], targets

Tests accuracy of the one_shot model

In [None]:
def test_one_shot(model, N, k, ds = 'val'):
    
    # k is the number of examples we're testing
    n_correct = 0
    for l in range(k):
        inputs, outputs = one_shot(N, ds)
        
        input_1 = torch.from_numpy(inputs[0]).to(device)
        input_2 = torch.from_numpy(inputs[1]).to(device)
        input1_test = torch.reshape(input_1,(N, 1, 105, 105)).to(device)
        input2_test = torch.reshape(input_2,(N, 1, 105, 105)).to(device)
        outputs = outputs.astype(int)
        outputs = torch.from_numpy(outputs).to(device)

        y_hat, output1, output2 = model(input1_test.float(), input2_test.float())
        #loss = ContrastiveLoss()(output1, output2, outputs)
    
        
        #preds = model(input1_test, input2_test) # change
        #print (outputs.detach().numpy())
        #print (y_hat.detach().numpy())
        if np.argmax(outputs.cpu().detach().numpy()) == np.argmax(y_hat.cpu().detach().numpy()):
        #if np.argmax(outputs) == np.argmax(y_hat): #convert y_hat to int
        	#print (np.argmax(outputs.cpu().detach().numpy()))
        	#print (np.argmax(y_hat.cpu().detach().numpy()))
        	#print (outputs)
        	#print (y_hat)
        	n_correct += 1
        #print (n_correct)
        #print (l)
        #print("_____")
    return n_correct / k

In [None]:
def predict_model(batch_size, model, loss_fn, ds = 'train', addi = "norm"):
    inputs , y = get_batch(batch_size, ds, addi) 

    input_1 = torch.from_numpy(inputs[0]).to(device)
    input_2 = torch.from_numpy(inputs[1]).to(device)
    input1_test = torch.reshape(input_1,(batch_size, 1, 105, 105)).to(device)
    input2_test = torch.reshape(input_2,(batch_size, 1, 105, 105)).to(device)
    y = y.astype(int)
    y = torch.from_numpy(y).to(device)

    y_hat, output1, output2 = model(input1_test.float(), input2_test.float())
    #print (y_hat.size(), y.size())
    y = torch.reshape(y,(batch_size, 1)).to(device)
    loss = loss_fn(y_hat.float(), y.float()).to(device)
    return y_hat, output1, output2, y, loss

SNN class.

Contains: forward function 

In [None]:
class SNN(nn.Module):
    def __init__(self):
        super(SNN, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(1, 64, 10),  # 64@96*96
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2),  # 64@48*48
            nn.Conv2d(64, 128, 7),
            nn.ReLU(),    # 128@42*42
            nn.MaxPool2d(2),   # 128@21*21
            nn.Conv2d(128, 128, 4),
            nn.ReLU(), # 128@18*18
            nn.MaxPool2d(2), # 128@9*9
            nn.Conv2d(128, 256, 4),
            nn.ReLU(),   # 256@6*6
        )
        self.liner = nn.Sequential(nn.Linear(9216, 4096), nn.Sigmoid())
        self.out = nn.Linear(4096, 1)

    def forward_one(self, x):
        x = self.conv(x)
        x = x.view(x.size()[0], -1)
        x = self.liner(x)
        return x

    def forward(self, x1, x2):
        out1 = self.forward_one(x1)
        out2 = self.forward_one(x2)
        dis = torch.abs(out1 - out2)
        out = self.out(dis)
        #  return self.sigmoid(out)
        return out, out1, out2


class SNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.flat = torch.nn.Flatten() 
        self.layer1 = torch.nn.Linear(in_features=11025, out_features=1)
        
    def forward(self, input1, input2):
        output1 = self.flat(input1)
        output1 = self.layer1(output1)
        #out = torch.nn.functional.relu(out)
        output2 = self.flat(input2)
        output2 = self.layer1(output2)
        return output1


Loss function.

Input: SNN

Output: loss

test_eval function.

Used for testing and getting train/test accuracy.

It receives batches of n pairs, with only one matching pair (to do function that creates it).

If the output 


In [None]:
def test_eval(model, n, k, loss_fn, ds):

    corr = 0
    
    for si in range(k):
        y_hat, output1, output2,  y, loss = predict_model(n, model, loss_fn, ds, addi = "test")
        
        if np.argmax(y.cpu().detach().numpy()) == np.argmax(y_hat.cpu().detach().numpy()):
            corr += 1
    
    return corr/k


In [None]:
def train_epoch(model, loss_fn, optimizer):
    loss_v = []
    train_acc_v = []
    val_acc_v = []
    currTime = time.time()
    for i in range(num_iterations):
        
        optimizer.zero_grad() 
        
        y_hat, output1, output2,  y, loss = predict_model(batch_size, model, loss_fn)
        
        loss.backward()
        
        optimizer.step()
        
        if i % 10 == 0:
            print (f"Iteration: {i}")
            print(y_hat, y)
        '''
        if i % evaluateEvery == 0:
            loss_v.append(loss.item())
            train_acc_v.append(test_eval(model, n, k, loss_fn, ds = 'train'))
            val_acc_v.append(test_eval(model, n, k, loss_fn, ds = 'val'))   
            
            print(loss_v)
            print(train_acc_v)
            print(val_acc_v)
            print("______")
        '''
    return lossArr, trainAccArr, valAccArr


In [None]:
num_iterations = 3000
batch_size = 32
evaluateEvery = 2000
k = 10 #iterations
n = 5 #batch size

n_classes, n_examples, width, height = X_train.shape

model = SNN().cuda()

#optimizer = optim.Adam(model.parameters(),lr = 0.0005 )
optimizer = optim.RMSprop(model.parameters(), lr = 1e-4, alpha = 0.99, eps = 1e-8, weight_decay = 0.0005, momentum = 0.9)

l_2, t_2, v_2 = train_epoch(model, nn.BCEWithLogitsLoss(),  optimizer)

