In [20]:
from constants import img_size
import numpy as np
from keras import backend as K
from keras.layers import Input, Dense, Flatten, subtract
from keras.models import Model
from keras.applications.resnet50 import ResNet50
import pandas as pd
import numpy as np
import tensorflow as tf
from tqdm import tqdm

## Model

In [2]:
class Seamese_Model:
    def __init__(self, base_model, opts, kernel_opts=None, bias_opts=None):
        self.base_model = base_model
        self.opts = opts
        self.kernel_opts = kernel_opts
        self.bias_opts = bias_opts
    
    def kernel_initializer(self, shape, name=None):
        if (self.kernel_opts != None):
            values = np.random.normal(loc=self.kernel_opts['loc'], scale=self.kernel_opts['scale'], size=shape)
            return K.variable(values, name=name)
        else:
            return None
        
    def bias_initializer(self, shape, name=None):
        if (self.bias_opts != None):
            values = np.random.normal(loc=self.bias_opts['loc'], scale=self.bias_opts['scale'], size=shape)
            return K.variable(values, name=name)
        else:
            return None
    
    def get_top_model(self):
        input_1 = Input(self.opts['features_shape'])
        input_2 = Input(self.opts['features_shape'])
        
        X_1 = Flatten()(input_1)
        X_2 = Flatten()(input_2)
        distance = subtract([X_1, X_2])

        prediction = Dense(1, activation='sigmoid', bias_initializer=self.bias_initializer, name='log_reg')(distance)
        model = Model(input=[input_1, input_2], output=prediction)
        return model

In [3]:
model = Seamese_Model(
    ResNet50, 
    opts={
        'weights': 'imagenet',
        'input_shape': (img_size, img_size, 3),
        'features_shape': (1, 2048),
        'pooling': 'avg',
    },
    kernel_opts={
        'loc': 0,
        'scale': 1e-2,
    },
    bias_opts={
        'loc': 0.5,
        'scale': 1e-2,
    }
)

model = model.get_top_model()



In [4]:
model.summary()

__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            (None, 1, 2048)      0                                            
__________________________________________________________________________________________________
input_2 (InputLayer)            (None, 1, 2048)      0                                            
__________________________________________________________________________________________________
flatten_1 (Flatten)             (None, 2048)         0           input_1[0][0]                    
__________________________________________________________________________________________________
flatten_2 (Flatten)             (None, 2048)         0           input_2[0][0]                    
__________________________________________________________________________________________________
subtract_1

## Supervisor

In [21]:
class Supervisor:
    # use seed only for testing
    def __init__(self, model, data_train, classes_train, data_dev, classes_dev, seed=None):
        self.model = model
        self.data_train = data_train
        self.classes_train = classes_train
        self.data_dev = data_dev
        self.classes_dev = classes_dev
        self.seed = seed
        
    def get_pair(self, index_1, index_2):
        el_1 = np.take(self.data_train, [index_1], axis=0)
        el_2 = np.take(self.data_train, [index_2], axis=0)
        return([el_1, el_2])
    
    def get_selection_index(self, index, indices):
        selection_index = index
        while selection_index == index:
            selection_index = np.random.choice(indices, 1)[0]
        return selection_index
    
    def get_batch(self, n):
        np.random.seed(self.seed)
        indices = np.random.choice(list(range(len(self.data_train))), size=n)
        
        pairs = []
        y = []
        
        for index in indices[:n//2]:
            selection_indices = np.argwhere(self.classes_train == self.classes_train[index]).flatten()
            selection_index = self.get_selection_index(index, selection_indices)
            pairs.append(self.get_pair(index, selection_index))
            y.append(1)
            
        for index in indices[n//2:]:
            selection_indices = np.argwhere(self.classes_train != self.classes_train[index]).flatten()
            selection_index = self.get_selection_index(index, selection_indices)
            pairs.append(self.get_pair(index, selection_index))
            y.append(0)

        return (np.array(pairs), np.array(y))
    
    def train(self, iterations, batch_size, learning_rate=0.0001, path=None):
        self.model.compile(optimizer=tf.train.AdamOptimizer(learning_rate),
                  loss='binary_crossentropy',
                  metrics=['acc'])
        
        if path != None:
            self.model.load_weights(path)
        
        for i in tqdm(range(iterations)):
            inputs, targets = self.get_batch(batch_size)
            self.model.train_on_batch([inputs[:,0,:], inputs[:,1,:]], targets)
    
    def get_validation_task(self, index):
        targets = np.repeat(np.take(self.data_dev, [index], axis=0), self.data_dev.shape[0] - 1, axis=0)
        support_set = np.delete(self.data_dev, index, axis=0)
        pairs = np.stack([targets, support_set], axis=1)
        return pairs

In [7]:
features_train = np.load('./data/features/res_net/features_train.npy')
classes_train = np.load('./data/features/res_net/classes_train.npy')
features_dev = np.load('./data/features/res_net/features_dev.npy')
classes_dev = np.load('./data/features/res_net/classes_dev.npy')

In [22]:
supervisor = Supervisor(model, features_train, classes_train, features_dev, classes_dev, 1)

In [23]:
supervisor.train(100, 128)

100%|██████████| 100/100 [00:01<00:00, 96.49it/s]
