Conv Block Part

In [3]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import keras

  from ._conv import register_converters as _register_converters
Using TensorFlow backend.


In [4]:
# filter initializer
def init_filter(d, mi, mo, stride):
    return (np.random.randn(d, d, mi, mo) * np.sqrt(2.0 / (d * d * mi))).astype(np.float32)

In [5]:
class ConvLayer():
    """
    Description:
        Creates convolutional layer
    
    Variables:
        d <- height and width of a filter
        mi <- number of input filters
        mo <- number of output filters
        stride <- by how many pixels filter moves each time 
        padding <- what shape will image have after convolution
    """
    def __init__(self,d,mi,mo,stride=2,padding='VALID'):
        self.W = tf.Variable(init_filter(d,mi,mo,stride))
        self.b = tf.Variable(np.zeros(mo, dtype=np.float32))
        self.stride = stride
        self.padding = padding
    def forward(self,X):
        X = tf.nn.conv2d(
            X,
            self.W,
            strides=[1,self.stride,self.stride,1],
            padding=self.padding
        )
        X = X + self.b
        return X
    
    def copyFromKerasLayers(self, layer):
        W, b = layer.get_weights()
        op1 = self.W.assign(W)
        op2 = self.b.assign(b)
        self.session.run((op1,op2))


    def get_params(self):
        return [self.W, self.b]

In [18]:
class BatchNormLayer():
    """
    Description:
        Creates batch normalization step. We are not doing traning because we copy the already trained weights from
        keras implementation. Then we only have test mode in batch norm.
    
    Variables:
        running_mean <- current global mean
        running_var <- current global variance
        gamma <- parameter that decides how important is 
        beta <-
    """
    def __init__(self, D):
        self.running_mean = tf.Variable(np.zeros(D, dtype=np.float32), trainable=False)
        self.running_var  = tf.Variable(np.ones(D, dtype=np.float32), trainable=False)
        self.gamma        = tf.Variable(np.ones(D, dtype=np.float32))
        self.beta         = tf.Variable(np.zeros(D, dtype=np.float32))


    def forward(self,X):
        return tf.nn.batch_normalization(
        X,
        self.running_mean,
        self.running_var,
        self.beta, # offset: An offset Tensor
        self.gamma, # scale: A scale Tensor
        1e-3 # variance_epsilon: A small float number to avoid dividing by 0
        )
    
    def copyFromKerasLayers(self, layer):
        gamma, beta, running_mean, running_var = layer.get_weights()
        op1 = self.running_mean.assign(running_mean)
        op2 = self.running_var.assign(running_var)
        op3 = self.gamma.assign(gamma)
        op4 = self.beta.assign(beta)
        self.session.run((op1, op2, op3, op4))

    def get_params(self):
        return [self.running_mean, self.running_var, self.gamma, self.beta]

In [7]:
class ConvBlock:
    def __init__(self, mi, fm_sizes, stride=2):
        """
        Description:
            Initialize layers and creates architecture of Conv Block
        
        Variables:
            fm_sizes <- feature map sizes in a list like [64, 64, 256]
        """
        assert(len(fm_sizes) == 3)
        
        self.session = None
        self.f = tf.nn.relu
        
        
        #
        #                    init MAIN BRANCH
        # conv1 -> bn1 -> f() -> ... -> bn3
        self.conv1 = ConvLayer(1, mi, fm_sizes[0], stride)# only 1st layer has stride
        self.bn1   = BatchNorm(fm_sizes[0])

        # this layer always has filter_size = 3 so to have the same size as output it needs padding='SAME'
        self.conv2 = ConvLayer(3,fm_sizes[0],fm_sizes[1],1,'SAME') 
        self.bn2   = BatchNorm(fm_sizes[1])

        self.conv3 = ConvLayer(1,fm_sizes[1],fm_sizes[2],1)
        self.bn3   = BatchNorm(fm_sizes[2])
        
        #
        #                  init SKIP/SHORTCUT BRANCH
        # convS -> bnS
        self.convS =  ConvLayer(1,mi,fm_sizes[2],stride)
        self.bnS   =  BatchNorm(fm_sizes[2])
        
        self.layers = [
            self.conv1, self.bn1,
            self.conv2, self.bn2,
            self.conv3, self.bn3,
            self.convS, self.bnS,
        ]
        
        # a placeholder for data 
        # will not be used when input passed in from previous layer
        self.input_ = tf.placeholder(tf.float32, shape=(1,224,224,mi))
        self.output = self.forward(self.input_)
        
    def forward(self, X):
        """
        Description:
            Propagate data throught the Conv Block
        
        Variables:
            
        """
        # propagate data throught the main branch 
        FX = self.conv1.forward(X)
        FX = self.bn1.forward(FX)
        FX = self.f(FX)
        FX = self.conv2.forward(FX)
        FX = self.bn2.forward(FX)
        FX = self.f(FX)
        FX = self.conv3.forward(FX)
        FX = self.bn3.forward(FX)
        
        # propagate data throught the shortcut branch
        SX = self.convS.forward(X)
        SX = self.bnS.forward(SX)
        
        # add output and pass it throught activation function
        FX = self.f(FX + SX)
        return FX

    def predict(self, X):
        assert(self.session is not None)
        return self.session.run(
            self.output,
            feed_dict={self.input_: X}
        )
    
    def set_session(self, session):
        self.session = session
        self.conv1.session = session
        self.bn1.session = session
        self.conv2.session = session
        self.bn2.session = session
        self.conv3.session = session
        self.bn3.session = session
        self.convS.session = session
        self.bnS.session = session
    
    def copyFromKerasLayers(self, layer):
        self.conv1.copyFromKerasLayers(layer[0])
        self.bn1.copyFromKerasLayers(layer[1])
        self.conv2.copyFromKerasLayers(layer[3])
        self.bn2.copyFromKerasLayers(layer[4])
        self.conv3.copyFromKerasLayers(layer[6])
        self.bn3.copyFromKerasLayers(layer[8])
        self.convS.copyFromKerasLayers(layer[7])
        self.bnS.copyFromKerasLayers(layer[9])
        
    
    
    def get_params(self):
        params = []
        for layer in self.layers:
            params += layer.get_params()
        return params

In [8]:
from keras.applications.resnet50 import ResNet50
from keras.layers import Dense, Flatten
from keras.models import Model
from keras.preprocessing import image
from keras.applications.resnet50 import preprocess_input, decode_predictions

resnet = ResNet50(input_shape=[200,200,3], weights='imagenet', include_top=False)

# our layer = you can add more if u want
x = Flatten()(resnet.output)
# x = Dense(1000, activation='relu')(x)
prediction = Dense(10, activation='softmax')(x)

# create model object
model = Model(inputs=resnet.input, outputs=prediction)

model.summary()

____________________________________________________________________________________________________
Layer (type)                     Output Shape          Param #     Connected to                     
input_1 (InputLayer)             (None, 200, 200, 3)   0                                            
____________________________________________________________________________________________________
conv1 (Conv2D)                   (None, 100, 100, 64)  9472        input_1[0][0]                    
____________________________________________________________________________________________________
bn_conv1 (BatchNormalization)    (None, 100, 100, 64)  256         conv1[0][0]                      
____________________________________________________________________________________________________
activation_1 (Activation)        (None, 100, 100, 64)  0           bn_conv1[0][0]                   
___________________________________________________________________________________________

In [None]:
# Main to check if Conv Block is working

In [54]:
if __name__ == '__main__':
    conv_block = ConvBlock(3,fm_sizes=[64,64,256],stride=1)
    # make a fake image
    X = np.random.random((1, 224, 224, 3))

    init = tf.global_variables_initializer()
    with tf.Session() as session:
        conv_block.session = session
        session.run(init)

        output = conv_block.predict(X)
        print("output.shape:", output.shape)

output.shape: (1, 224, 224, 256)


In [31]:
class IdentityBlock():
    def __init__(self, mi, fm_sizes):
        """
        Description:
            Initialize layers and creates architecture of Identity Block

        Variables:
            fm_sizes <- feature map sizes in a list like [64, 64, 256]
        """
        assert(len(fm_sizes) == 3)
        # start new session or sth
        self.session = None
        
        self.f = tf.nn.relu
        
        #
        #           MAIN BRANCH
        # conv1 -> bn1 -> f() -> ...
        self.conv1 = ConvLayer(1,mi,fm_sizes[0],1) 
        self.bn1   = BatchNorm(fm_sizes[0])
        
        self.conv2 = ConvLayer(3,fm_sizes[0],fm_sizes[1],1,'SAME') 
        self.bn2   = BatchNorm(fm_sizes[1])
        
        self.conv3 = ConvLayer(1,fm_sizes[1],fm_sizes[2],1)
        self.bn3   = BatchNorm(fm_sizes[2])
        
        self.layers = [
            self.conv1, self.bn1,
            self.conv2, self.bn2,
            self.conv3, self.bn3
        ]
                                        
        # placeholder for data
        self.input_ = tf.placeholder(tf.float32,shape=(1,224,224,mi))
        self.output = self.forward(self.input_)
        pass                            
                                        
    def forward(self, X):
        
        FX = self.conv1.forward(X)
        FX = self.bn1.forward(FX)
        FX = self.f(FX)
        FX = self.conv2.forward(FX)
        FX = self.bn2.forward(FX)
        FX = self.f(FX)
        FX = self.conv3.forward(FX)
        FX = self.bn3.forward(FX)
        
        # combine main branch and shortcut branch
        FX = self.f(FX + X)
        return FX
        
    def predict(self, X):
        assert(self.session is not None)
        return self.session.run(
            self.output,
            feed_dict={self.input_: X}
        )
        
    
    def set_session(self, session):
        self.session = session
        self.conv1.session = session
        self.bn1.session = session
        self.conv2.session = session
        self.bn2.session = session
        self.conv3.session = session
        self.bn3.session = session
            
    def copyFromKerasLayers(self, layers):
        assert(len(layers) == 10)
        # <keras.layers.convolutional.Conv2D at 0x7fa44255ff28>,
        # <keras.layers.normalization.BatchNormalization at 0x7fa44250e7b8>,
        # <keras.layers.core.Activation at 0x7fa44252d9e8>,
        # <keras.layers.convolutional.Conv2D at 0x7fa44253af60>,
        # <keras.layers.normalization.BatchNormalization at 0x7fa4424e4f60>,
        # <keras.layers.core.Activation at 0x7fa442494828>,
        # <keras.layers.convolutional.Conv2D at 0x7fa4424a2da0>,
        # <keras.layers.normalization.BatchNormalization at 0x7fa44244eda0>,
        # <keras.layers.merge.Add at 0x7fa44245d5c0>,
        # <keras.layers.core.Activation at 0x7fa44240aba8>
        self.conv1.copyFromKerasLayers(layers[0])
        self.bn1.copyFromKerasLayers(layers[1])
        self.conv2.copyFromKerasLayers(layers[3])
        self.bn2.copyFromKerasLayers(layers[4])
        self.conv3.copyFromKerasLayers(layers[6])
        self.bn3.copyFromKerasLayers(layers[7])

    def get_params(self):
        params = []
        for layer in self.layers:
            params += layer.get_params()
        return params
                                        
                                        

In [None]:
# Main to check if Identity Block is working correclty

In [73]:
if __name__ == '__main__':
    identity_block = IdentityBlock(mi=256,fm_sizes=[64,64,256]) # why 256? before i thnik it was 3 cuz 3 color channles and not its 256 cuz thats the output of the 1st conv block

    # make a fake image
    X = np.random.random((1, 224, 224, 256))

    init = tf.global_variables_initializer()
    with tf.Session() as session:
        identity_block.set_session(session)
        session.run(init)

        output = identity_block.predict(X)
        print("output.shape:", output.shape)

output.shape: (1, 224, 224, 256)


In [10]:
"""
Create support classes
"""

class ReLULayer:
    def forward(self, X):
        return tf.nn.relu(X)
    
    def get_params(self):
        return []
    
class MaxPoolLayer:
    def __init__(self, dim):
        self.dim = dim
        
    def forward(self, X):
        return tf.nn.max_pool(
        X,
        ksize=[1, self.dim, self.dim, 1],
        strides=[1,2,2,1],
        padding='VALID'
    )
    
    def get_params(self):
        return []
    

In [11]:

class PartialResNet:
    """
    Create part of the ResNet network
    """
    def __init__(self):

        self.layers = [
          # before conv block
          ConvLayer(d=7, mi=3, mo=64, stride=2, padding='SAME'),
          BatchNorm(64),
          ReLULayer(),
          MaxPoolLayer(dim=3),
          # conv block
          ConvBlock(mi=64, fm_sizes=[64, 64, 256], stride=1),
        ]
        self.input_ = tf.placeholder(tf.float32, shape=[None,224,224,3])
        self.output = self.forward(self.input_)
    
    def copyFromKerasLayers(self, layers):
        self.layers[0].copyFromKerasLayers(layers[1])
        self.layers[1].copyFromKerasLayers(layers[2])
        self.layers[4].copyFromKerasLayers(layers[5:])
    
    def forward(self, X):
        for layer in self.layers:
            X = layer.forward(X)
        return X
    
    def predict(self, X):
        assert(self.session is not None)
        return self.session.run(
            self.output,
            feed_dict={self.input_:X}
        )
    
    def set_session(self, session):
        self.session = session
        self.layers[0].session = session
        self.layers[1].session = session
        self.layers[4].set_session(session)
        
    def get_params(self):
        params = []
        for layer in self.layers:
            params += layer.get_params()
        

In [168]:
if __name__ == '__main__':
    # you can also set weights to None, it doesn't matter
    resnet = ResNet50(weights='imagenet')

    # you can determine the correct layer
    # by looking at resnet.layers in the console
    partial_model = Model(
    inputs=resnet.input,
    outputs=resnet.layers[16].output
    )
    print(partial_model.summary())
    # for layer in partial_model.layers:
    #   layer.trainable = False

    my_partial_resnet = PartialResNet()

    # make a fake image
    X = np.random.random((1, 224, 224, 3))

    # get keras output
    keras_output = partial_model.predict(X)

    # get my model output
    init = tf.variables_initializer(my_partial_resnet.get_params())

    # note: starting a new session messes up the Keras model
    session = keras.backend.get_session()
    my_partial_resnet.set_session(session)
    session.run(init)

    # first, just make sure we can get any output
    first_output = my_partial_resnet.predict(X)
    print("first_output.shape:", first_output.shape)

    # copy params from Keras model
    my_partial_resnet.copyFromKerasLayers(partial_model.layers)
#     print(partial_model.layers)
#     print(partial_model.layers[3])
#     print(partial_model.layers[3][0])
    
    # compare the 2 models
    output = my_partial_resnet.predict(X)
    diff = np.abs(output - keras_output).sum()
    if diff < 1e-10:
        print("Everything's great!")
    else:
        print("diff = %s" % diff)

____________________________________________________________________________________________________
Layer (type)                     Output Shape          Param #     Connected to                     
input_40 (InputLayer)            (None, 224, 224, 3)   0                                            
____________________________________________________________________________________________________
conv1 (Conv2D)                   (None, 112, 112, 64)  9472        input_40[0][0]                   
____________________________________________________________________________________________________
bn_conv1 (BatchNormalization)    (None, 112, 112, 64)  256         conv1[0][0]                      
____________________________________________________________________________________________________
activation_1912 (Activation)     (None, 112, 112, 64)  0           bn_conv1[0][0]                   
___________________________________________________________________________________________

KeyboardInterrupt: 

In [29]:
class AvgPool():
    def __init__(self, ksize):
        self.ksize = ksize

    def forward(self, X):
        return tf.nn.avg_pool(
          X,
          ksize=[1, self.ksize, self.ksize, 1],
          strides=[1, 1, 1, 1],
          padding='VALID'
        )

    def get_params(self):
        return []

class Flatten:
    def forward(self, X):
        return tf.contrib.layers.flatten(X)

    def get_params(self):
        return []


    def custom_softmax(x):
        m = tf.reduce_max(x, 1)
        x = x - m
        e = tf.exp(x)
        return e / tf.reduce_sum(e, -1)

class DenseLayer:
    def __init__(self,mi,mo):
        self.W = tf.Variable((np.random.randn(mi, mo) * np.sqrt(2.0 / mi)).astype(np.float32))
        self.b = tf.Variable(np.zeros(mo, dtype=np.float32))
        
    def forward(self, X):
        # softmax gives different results 
        # return tf.nn.softmax(tf.matmul(X,self.W)+self.b)
        # return keras.activations.softmax(tf.matmul(X,self.W)+self.b)
        return tf.matmul(X,self.W)+self.b
    
    def copyFromKerasLayers(self, layer):
        W, b = layer.get_weights()
        op1 = self.W.assign(W)
        op2 = self.b.assign(b)
        self.session.run((op1,op2))
    
    def get_params(self):
        return [self.W, self.b]
        

In [34]:
class TFResNet:
    """
    Define Resnet architecture 
    """
    def __init__(self):
        self.layers = [
            ConvLayer(d=7,mi=3,mo=64,stride=2,padding='SAME'),
            BatchNorm(64),
            ReLULayer(),
            MaxPoolLayer(dim=3), # maxpool does not need a copy

            ConvBlock(mi=64,fm_sizes=[64,64,256],stride=1),
            IdentityBlock(mi=256,fm_sizes=[64,64,256]),
            IdentityBlock(mi=256,fm_sizes=[64,64,256]),

            ConvBlock(mi=256,fm_sizes=[128,128,512],stride=2),
            IdentityBlock(mi=512,fm_sizes=[128,128,512]),
            IdentityBlock(mi=512,fm_sizes=[128,128,512]),
            IdentityBlock(mi=512,fm_sizes=[128,128,512]),

            ConvBlock(mi=512,fm_sizes=[256,256,1024],stride=2),
            IdentityBlock(mi=1024,fm_sizes=[256,256,1024]),
                IdentityBlock(mi=1024,fm_sizes=[256,256,1024]),
            IdentityBlock(mi=1024,fm_sizes=[256,256,1024]),
            IdentityBlock(mi=1024,fm_sizes=[256,256,1024]),
            IdentityBlock(mi=1024,fm_sizes=[256,256,1024]),

            ConvBlock(mi=1024,fm_sizes=[512,512,2048],stride=2),
            IdentityBlock(mi=2048,fm_sizes=[512,512,2048]),
            IdentityBlock(mi=2048,fm_sizes=[512,512,2048]),

            #AveragePooling2D(),
            GlobalAveragePool(ksize=7),
            Flatten(),
            DenseLayer(mi=2048,mo=1000)
        ]
        
        self.input_ = tf.placeholder(tf.float32, shape=(None,224,224,3))
        self.output = self.forward(self.input_)
        

    def copyFromKerasLayers(self, layers):
        # conv
        self.layers[0].copyFromKerasLayers(layers[1])
        # bn
        self.layers[1].copyFromKerasLayers(layers[2])
        # cb size 12
        self.layers[4].copyFromKerasLayers(layers[5:17])
        # ib x 2 size 10
        self.layers[5].copyFromKerasLayers(layers[17:27])
        self.layers[6].copyFromKerasLayers(layers[27:37])
        # cb size 12
        self.layers[7].copyFromKerasLayers(layers[37:49])
        # ib x 3 size 10
        self.layers[8].copyFromKerasLayers(layers[49:59])
        self.layers[9].copyFromKerasLayers(layers[59:69])
        
        self.layers[10].copyFromKerasLayers(layers[69:79])
        # cb size 12
        self.layers[11].copyFromKerasLayers(layers[79:91])
        # ib x 5 size 10
        self.layers[12].copyFromKerasLayers(layers[91:101])
        self.layers[13].copyFromKerasLayers(layers[101:111])
        self.layers[14].copyFromKerasLayers(layers[111:121])
        self.layers[15].copyFromKerasLayers(layers[121:131])
        self.layers[16].copyFromKerasLayers(layers[131:141])
        # cb size 12
        self.layers[17].copyFromKerasLayers(layers[141:153])
        # ib x 2 size 10
        self.layers[18].copyFromKerasLayers(layers[153:163])
        self.layers[19].copyFromKerasLayers(layers[163:173])
        # dense layer
        self.layers[22].copyFromKerasLayers(layers[175])


    def forward(self, X):
        for layer in self.layers:
            X = layer.forward(X)
        return X

    def predict(self, X):
        assert(self.session is not None)
        return self.session.run(
            self.output,
            feed_dict={self.input_:X})

    def set_session(self, session):
        # set sessions according to the copied layers
        self.session = session
        for layer in self.layers:
            if isinstance(layer, ConvBlock) or isinstance(layer, IdentityBlock):
                layer.set_session(session)
            else:
                layer.session = session

    def get_params(self):
        params = []
        for layer in self.layers:
            params += layer.get_params()


In [35]:
if __name__ == '__main__':
    # you can also set weights to None, it doesn't matter
    resnet_ = ResNet50(weights='imagenet')

    #make new resnet without softmax
    x = resnet_.layers[-2].output
    W, b = resnet_.layers[-1].get_weights()
    y = Dense(1000)(x)
    resnet = Model(resnet_.input, y)
    resnet.layers[-1].set_weights([W, b])
    
    
    # you can determine the correct layer
    # by looking at resnet.layers in the console
    partial_model = Model(
        inputs=resnet.input,
        outputs=resnet.layers[175].output
    )

    print(partial_model.summary())

    my_partial_resnet = TFResNet()

    # make a fake image
    X = np.random.random((1, 224, 224, 3))

    # get keras output
    keras_output = partial_model.predict(X)

    # get my model output
    init = tf.variables_initializer(my_partial_resnet.get_params() )
    
    # note: starting a new session messes up the Keras model
    session = keras.backend.get_session()
    my_partial_resnet.set_session(session)
    session.run(init)

    # first, just make sure we can get any output
    first_output = my_partial_resnet.predict(X)
    print("first_output.shape:", first_output.shape)

    # copy params from Keras model
    my_partial_resnet.copyFromKerasLayers(partial_model.layers)

    # compare the 2 models
    output = my_partial_resnet.predict(X)
    diff = np.abs(output - keras_output).sum()
    if diff < 1e-10:
        print("Everything's great!")
    else:
        print("diff = %s" % diff)

____________________________________________________________________________________________________
Layer (type)                     Output Shape          Param #     Connected to                     
input_11 (InputLayer)            (None, 224, 224, 3)   0                                            
____________________________________________________________________________________________________
conv1 (Conv2D)                   (None, 112, 112, 64)  9472        input_11[0][0]                   
____________________________________________________________________________________________________
bn_conv1 (BatchNormalization)    (None, 112, 112, 64)  256         conv1[0][0]                      
____________________________________________________________________________________________________
activation_491 (Activation)      (None, 112, 112, 64)  0           bn_conv1[0][0]                   
___________________________________________________________________________________________

first_output.shape: (1, 1000)


TypeError: copyFromKerasLayers() takes 0 positional arguments but 2 were given