In [1]:
from keras.engine.topology import Layer
import keras.backend as k
from keras.models import Model
from keras.layers import Flatten, Dense, Input, Conv2D, MaxPooling2D, Dropout
from keras.layers import GlobalAveragePooling2D, GlobalMaxPooling2D, TimeDistributed
from keras.engine.topology import get_source_inputs
from keras.utils import layer_utils
from keras.utils.data_utils import get_file
from keras import backend as K

Using TensorFlow backend.


In [2]:
if k.backend() == 'tensorflow':
    import tensorflow as tf

In [3]:
class FixedBatchNormalization(Layer):
    
    def __init__(self, epsilon=1e-3, axis=-1,
                 weights=None, beta_init='zero', gamma_init='one',
                 gamma_regularizer=None, beta_regularizer=None, **kwargs):
        
        self.supports_masking = True
        self.beta_init = initializers.get(beta_init)
        self.gamma_init = initializers.get(gamma_init)
        self.epsilon = epsilon
        self.axis = axis
        self.gamma_regularizer = regularizers.get(gamma_regularizer)
        self.beta_regularizer = regularizers.get(beta_regularizer)
        self.initial_weights = weights
        super(FixedBatchNormalization, self).__init__(**kwargs)
        
    def build(self, input_shape):
        self.input_shape = [InputSpec(shape=input_shape)]
        shape = (input_shape[self.axis], )
        
        self.gamma = self.add_weight(shape,
                                     initializer = self.gamma_init,
                                     regularizer = self.gamma_regularizer,
                                     name = '{}_gamma'.format(self.name),\
                                     trainable = False)
        self.beta = self.add_weight(shape,
                                    initializer = self.beta_init,
                                    regularizer = self.beta_regularizer,
                                    name = '{}_beta'.format(self.name),
                                    trainable = False)\
        
        self.running_mean = self.add_weight(shape,
                                           initializer = 'zero',
                                           name = '{}_running_mean'.format(self.name),
                                           trainable = False)
        
        self.running_std = self.add_weight(shape,
                                           initializer = 'zero',
                                          name = '{}_running_std'.format(self.name),
                                          trainable = False)
        
        if self.initial_weights is not None:
            self.set_weights(self.initial_weights)
            del self.initial_weights
            
        self.built = True
        
    def call(self, x, mask=None):
        
        assert self.built, 'Layer must be built before being called'
        input_shape = K.int_shape(x)
        
        reduction_axes = list(range(len(input_shape)))
        del reduction_axes[self.axis]
        
        broadcast_shape = [1] * len(input_shape)
        broadcast_shape[self.axis] = input_shape[self.axis]
        
        if sorted(reduction_axes) == range(K.ndim(x))[:-1]:
            x_normed = K.batch_normalization(
                x, self.running_mean, self.running_std,
                self.beta, self.gamma, epsilon=self.epsilon)
            
        else:
            # need  broadcasting
            broadcast_running_mean = K.reshape(self.running_mean, broadcast_shape)
            broadcast_running_std  = K.reshape(self.running_std, broadcast_shape)
            broadcast_beta = K.reshape(self.beta, broadcast_shape)
            broadcast_gamma = K.reshape(self.gamma, broadcast_shape)
            x_normed = K.batch_normalization(
                x, broadcast_running_mean, broadcast_running_std,
                broadcast_beta, broadcast_gamma, epsilon=self.epsilon)
            
        return x_normed
    
    def get_config(self):
        
        config = {'epsilon': self.epsilon,
                  'axis': self.axis,
                  'gamma_regularizer': self.gamma_regularizer.get_config() if self.gamma_regularizer else None,
                  'beta_regularizer': self.beta_regularizer.get_config() if self.beta_regularizer else None}
        
        base_config = super(FixedBatchNormalization, self).get_config()
        return dict(list(base_config.items()) + list(config.items()))

In [4]:
class RoiPoolingConv(Layer):
    def __init__(self , pool_size , num_rois , **kwargs ):
        self.dim_ordering = k.image_dim_ordering()
        self.pool_size =  pool_size
        self.num_rois = num_rois
        super(RoiPoolingConv , self ).__init__(**kwargs)
        
    def build(self, input_shape ):
        self.nb_channels = input_shape[0][3]
        
    
    def compute_output_shape( self, input_shape ):
        return ( None , self.num_rois , self.pool_size , self.pool_size, self.nb_channels )
    
    
    def call( self , x ,  mask = None ):
        img  =  x[0]
        rois =  x[1]
        input_shape = k.shape(img)
        outputs = []
        
        for roi_idx in range( self.rois ):
            x = rois[0 , roi_idx , 0 ]
            y = rois[0 , roi_idx , 1 ]
            w = rois[0 , roi_idx , 2 ]
            h = rois[0 , roi_idx , 3 ]
            
            row_length = w / float(self.pool_size)
            col_length = h / float(self.pool_size)
            num_pools_regions = self.pool_size
            
            x = k.cast( x , 'int32')
            y = k.cast( y , 'int32')
            w = k.cast( w , 'int32')
            h = k.cast( h , 'int32')
            rs = tf.image.resize_images( img[: , y + y+h , x : x+w , :], (self.pool_size,self.pool_size))
            outputs.append(rs)
            
        final_output = k.concatenate(outputs , axis=0)
        final_output = k.reshape(final_output , ( 1 , self.num_rois , self.pool_size, self.pool_size,self.nb_channels))
        final_output = k.permute_dimensions(final_output , ( 0 , 1 , 2 , 3 , 4 ))
        return final_output
    
    def get_config( self):
        config = {'pool_size': self.pool_size,
                  'num_rois': self.num_rois}
        base_config = super(RoiPoolingConv, self).get_config()
        return dict(list(base_config.items()) + list(config.items()))
        
            
            
             
    

In [5]:
def get_img_output_length( width , height ):
    
    def get_output_length( input_length ):
        return input_length/16
    
    return get_output_length(width) , get_output_length(height)

In [6]:
def nn_base( input_tensor = None , trainable = False ):
    input_shape = ( None , None ,  3)
    
    if input_tensor is None :
        img_input = Input(shape = input_shape)
    else:
        if not k.is_keras_tensor(input_tensor):
            img_input = Input(tensor=input_tensor , shape = input_shape)
        else:
            img_input = input_tensor
            
    bn_axis = 1
    # Bloack 1
    x = Conv2D( 64 , ( 3 , 3) , activation= 'relu' , padding = 'same' , name = 'block1_conv1')(img_input)
    x = Conv2D( 64 , ( 3 , 3) , activation= 'relu' , padding = 'same' , name = 'block1_conv2')(x)
    x = MaxPooling2D( ( 2 , 2 ) , strides=( 2 ,  2) , name ='bloack1_pool')(x)
    
    # Block 2
    x = Conv2D(128, (3, 3), activation='relu', padding='same', name='block2_conv1')(x)
    x = Conv2D(128, (3, 3), activation='relu', padding='same', name='block2_conv2')(x)
    x = MaxPooling2D((2, 2), strides=(2, 2), name='block2_pool')(x)

    # Block 3
    x = Conv2D(256, (3, 3), activation='relu', padding='same', name='block3_conv1')(x)
    x = Conv2D(256, (3, 3), activation='relu', padding='same', name='block3_conv2')(x)
    x = Conv2D(256, (3, 3), activation='relu', padding='same', name='block3_conv3')(x)
    x = MaxPooling2D((2, 2), strides=(2, 2), name='block3_pool')(x)

    # Block 4
    x = Conv2D(512, (3, 3), activation='relu', padding='same', name='block4_conv1')(x)
    x = Conv2D(512, (3, 3), activation='relu', padding='same', name='block4_conv2')(x)
    x = Conv2D(512, (3, 3), activation='relu', padding='same', name='block4_conv3')(x)
    x = MaxPooling2D((2, 2), strides=(2, 2), name='block4_pool')(x)

    # Block 5
    x = Conv2D(512, (3, 3), activation='relu', padding='same', name='block5_conv1')(x)
    x = Conv2D(512, (3, 3), activation='relu', padding='same', name='block5_conv2')(x)
    x = Conv2D(512, (3, 3), activation='relu', padding='same', name='block5_conv3')(x)
    # x = MaxPooling2D((2, 2), strides=(2, 2), name='block5_pool')(x)

    return x

In [7]:
def RPN( base_layers , num_anchors ):
    
    x  = Conv2D(512 , ( 3,3) , padding = 'same' , activation = 'relu' , kernel_initializer='normal' , name = 'rpn_conv1')(base_layers)
    # apply regression and classifier
    x_class = Conv2D(num_anchors , (1 , 1) , activation='sigmoid' , kernel_initializer='uniform' , name = 'rpn_out_class')(x)
    r_regr = Conv2D(num_anchors*4 , (1 , 1) , activation='linear' , kernel_initializer='zero' , name = 'rpn_out_regress')(x)    
    return [x_class , r_regr , base_layers]


In [8]:
def classifier( base_layers , input_rois , num_rois , nb_classess = 21 , trainable =  False):
    pooling_regions =  7
    input_shape = (num_rois , 7 , 7 , 512 )
    
    out_roi_pool = RoiPoolingConv(pooling_regions , num_rois )([base_layers , input_rois])
    
    out = TimeDistributed(Flatten(name='flatten'))(out_roi_pool)
    out = TimeDistributed(Dense(4096, activation='relu', name='fc1'))(out)
    out = TimeDistributed(Dropout(0.5))(out)
    out = TimeDistributed(Dense(4096, activation='relu', name='fc2'))(out)
    out = TimeDistributed(Dropout(0.5))(out)

    out_class = TimeDistributed(Dense(nb_classes, activation='softmax', kernel_initializer='zero'), name='dense_class_{}'.format(nb_classes))(out)
    # note: no regression target for bg class
    out_regr = TimeDistributed(Dense(4 * (nb_classes-1), activation='linear', kernel_initializer='zero'), name='dense_regress_{}'.format(nb_classes))(out)

    return [out_class, out_regr]