<a href="https://colab.research.google.com/github/aashishpiitk/pclub-model-zoo/blob/master/Resnet_Keras.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
from google.colab import drive
drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [0]:
from keras.callbacks import *
filepath="/content/gdrive/My Drive/epochs:{epoch:03d}-val_accuracy:{val_accuracy:.4f}.hdf5"

In [0]:
import sys
sys.path.append('/content/gdrive/My Drive')

In [0]:
from __future__ import division
import six

from keras.models import Model
from keras.layers import(
    Activation,
    Input,
    Dense,
    Flatten
)
from keras.layers.convolutional import(
    Conv2D,
    MaxPooling2D,
    AveragePooling2D
)
from keras.layers.merge import add
from keras.layers.normalization import BatchNormalization
from keras.regularizers import l2
from keras import backend as K

In [0]:
K.backend()

'tensorflow'

In [0]:
def _bn_relu(input):
  """Helper to build a BN -> relu block
  """
  norm=BatchNormalization(axis=CHANNEL_AXIS)(input)
  return Activation('relu')(norm)

In [0]:
def _conv_bn_relu(**conv_params):
  strides=conv_params.setdefault("strides",(1,1))
  padding=conv_params.setdefault('padding',"same")
  kernel_size=conv_params['kernel_size']
  filters=conv_params['filters']
  kernel_regularizer=conv_params.setdefault("kernel_regularizer",l2(1.e-4))
  kernel_initializer=conv_params.setdefault("kernel_initializer","he_normal")


  def accept(input):
    conv=Conv2D(filters=filters,
                kernel_size=kernel_size,
                strides=strides,
                padding=padding,
                kernel_initializer=kernel_initializer,
                kernel_regularizer=kernel_regularizer)(input)
    return _bn_relu(conv)
  
  return accept               #closure concept

In [0]:
def _bn_relu_conv(**conv_params):
  strides=conv_params.setdefault("strides",(1,1))
  padding=conv_params.setdefault('padding',"same")
  kernel_size=conv_params['kernel_size']
  filters=conv_params['filters']
  kernel_regularizer=conv_params.setdefault("kernel_regularizer",l2(1.e-4))
  kernel_initializer=conv_params.setdefault("kernel_initializer","he_normal")

  def accept(input):
    activation=_bn_relu(input)
    conv=Conv2D(filters=filters,
                kernel_size=kernel_size,
                strides=strides,
                padding=padding,
                kernel_initializer=kernel_initializer,
                kernel_regularizer=kernel_regularizer)(activation)
    
    return conv
  
  return accept


In [0]:
def _shortcut(input,residual):
  """Adds a shortcut between input and residual block and merges them with "sum"
  """
  #input and residual are tensors
  #if the no.of channels do not match then equalize
  #if height or width does not match then 
  #that identity layer is converted to convulational layer with kernel_size=1,stride=as required,filter=to match with residual block,no paddin
  input_shape=K.int_shape(input)
  residual_shape=K.int_shape(residual)
  stride_height=int(input_shape[ROW_AXIS]/residual_shape[ROW_AXIS])
  stride_width=int(input_shape[COLUMN_AXIS]/residual_shape[COLUMN_AXIS])
  equal_channels=int(input_shape[CHANNEL_AXIS])==int(residual_shape[CHANNEL_AXIS])

  shortcut = input
  #print(input_shape,residual_shape)
  if stride_height>1 or stride_width>1 or (not equal_channels):
    shortcut=Conv2D(strides=(stride_height,stride_width),
                filters=int(residual_shape[CHANNEL_AXIS]),
                kernel_size=(1,1),
                padding="valid",
                kernel_initializer="he_normal",
                kernel_regularizer=l2(1.e-4))(input)
  #print(K.int_shape())
  return add([shortcut,residual])


In [0]:
def _residual_block(block_func,filters,repetitions,is_first_layer=False):
  """Builds a NN with basic/bottleneck blocks stacked together 
  """
  def accept(input):
    for i in range(repetitions):
      init_strides=(1,1)
      if i==0 and not is_first_layer:
        init_strides=(2,2)
      input=block_func(strides=init_strides,
                       filters=filters,
                       is_first_block_of_first_layer=(is_first_layer and i==0))(input)
    return input
  return accept
  

In [0]:
def basic_block(filters,strides=(1,1),is_first_block_of_first_layer=False):
  """Builds a basic residual block with two conv nets with a shortcut using 3 x 3 conv blocks
  """
  def accept(input):
    if is_first_block_of_first_layer:
      #don't do Bn -> relu because the network starts with conv2D -> bn -> relu -> max-pooling
      conv1=Conv2D(kernel_size=(3,3),
                   filters=filters,
                   strides=strides,
                   kernel_initializer="he_normal",
                   kernel_regularizer=l2(1.e-4))(input)
    else:
      conv1=_bn_relu_conv(kernel_size=(3,3), 
                          filters=filters,
                          strides=strides)(input)


    residual=_bn_relu_conv(kernel_size=(3,3),
                           filters=filters,
                           strides=strides)(conv1)


    return _shortcut(input,residual)
  return accept

In [0]:
def bottleneck_block(filters,strides=(1,1),is_first_block_of_first_layer=False):
  """
  Builds a bootleneck block with three conv layers with kernel of 3 x 3
  """
  def accept(input):
    if is_first_block_of_first_layer:
      #don't do Bn -> relu because the network starts with conv2D -> bn -> relu -> max-pooling
      conv1=Conv2D(kernel_size=(1,1),
                   filters=filters,
                   strides=strides,
                   kernel_initializer="he_normal",
                   kernel_regularizer=l2(1.e-4))(input)
    else:
      conv1=_bn_relu_conv(kernel_size=(1,1), 
                          filters=filters,
                          strides=strides)(input)

    conv2=_bn_relu_conv(kernel_size=(3,3), 
                        filters=filters,
                        strides=strides)(conv1)
    residual=_bn_relu_conv(kernel_size=(1,1),
                           filters=filters*4,
                           strides=strides)(conv2)


    return _shortcut(input,residual)
  return accept

In [0]:
K.common.image_dim_ordering()

'tf'

In [0]:
def _handle_dim_ordering():
  global ROW_AXIS
  global COLUMN_AXIS
  global CHANNEL_AXIS

  if K.common.image_dim_ordering()=='tf':
    ROW_AXIS=1
    COLUMN_AXIS=2
    CHANNEL_AXIS=3
  else:
    ROW_AXIS=2
    COLUMN_AXIS=3
    CHANNEL_AXIS=1


In [0]:
print(globals().get('yo'))

None


In [0]:
def _get_block(identifier):
  if isinstance(identifier,six.string_types):
    res=globals().get(identifier)
    if not res:
      raise ValueError('Invalid {}'.format(identifier))
    return res
  return identifier

In [0]:
class ResnetBuilder(object):

  @staticmethod
  def build(input_shape,num_outputs,block_func,repetitions):
    """builds a resnet like structure

    Args:
      input_shape: The input shape in the form (nb_channels, nb_rows, nb_cols)
      repetitions:a list of repetitions of various block units
      block_func:whether to use bottleneck or basic residual block
      
    Returns:
      keras `model`
    
    """
    _handle_dim_ordering()

    if len(input_shape) != 3:
      raise Exception("Input shape should be a tuple (nb_channels, nb_rows, nb_cols)")
    
    if K.common.image_dim_ordering() == 'tf':###understand more about this,suspected
            input_shape = (input_shape[1], input_shape[2], input_shape[0])
    
    #Load function from str if needed
    block_func=_get_block(block_func)

    input=Input(shape=input_shape)
    conv1=_conv_bn_relu(strides=(2,2),filters=64,kernel_size=(7,7))(input)
    pool1=MaxPooling2D(strides=(2,2),pool_size=(3,3),padding='same')(conv1)


    block=pool1
    filters=64
    for i,r in enumerate(repetitions):
      block=_residual_block(block_func=block_func,
                            filters=filters,
                            repetitions=r,
                            is_first_layer=(i==0))(block)
      filters*=2
    
    activation=_bn_relu(block)
    act_shape=K.int_shape(activation)

    pool2=AveragePooling2D(strides=(1,1),
                           padding='valid',
                           pool_size=(act_shape[ROW_AXIS],
                                      act_shape[COLUMN_AXIS])
                           )(activation)
    flatten1=Flatten()(pool2)
    dense1=Dense(units=num_outputs,
                 activation="softmax",
                 kernel_initializer="he_normal")(flatten1)
    model=Model(inputs=input,output=dense1)
    return model
  

  @staticmethod
  def build_resnet_18(input_shape,num_outputs):
    return ResnetBuilder.build(input_shape,num_outputs,basic_block,[2,2,2,2])
  
  @staticmethod
  def build_resnet_34(input_shape,num_outputs):
    return ResnetBuilder.build(input_shape,num_outputs,basic_block,[3,4,6,3])
  
  @staticmethod
  def build_resnet_50(input_shape,num_outputs):
    return ResnetBuilder.build(input_shape,num_outputs,bottleneck_block,[3,4,6,3])
  
  @staticmethod
  def build_resnet_101(input_shape,num_outputs):
    return ResnetBuilder.build(input_shape,num_outputs,bottleneck_block,[3,4,6,3])
  
  @staticmethod
  def build_resnet_18(input_shape,num_outputs):
    return ResnetBuilder.build(input_shape,num_outputs,bottleneck_block,[3,8,36,3])

In [0]:
#Now we will be compiling and training the model

In [0]:
from __future__ import print_function
from keras.datasets import cifar10
from keras.preprocessing.image import ImageDataGenerator
from keras.utils import np_utils
from keras.callbacks import ReduceLROnPlateau,CSVLogger,EarlyStopping,ModelCheckpoint

In [0]:
import numpy as np


In [0]:
lr_reducer=ReduceLROnPlateau(factor=np.sqrt(0.1),cooldown=0,patience=5,min_lr=0.5e-6)
early_stopper=EarlyStopping(min_delta=0.001,patience=10)
csv_logger=CSVLogger('resnet_cifar10.csv')

In [0]:
batch_size=32
nb_classes=10
nb_epoch=5
data_augmentation=False

In [0]:
#input image dimension
img_rows,img_cols=32,32
img_channels=3

In [0]:
#making the train/test sets
(X_train,Y_train),(X_test,Y_test)=cifar10.load_data()

Downloading data from https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz


In [0]:
#making one-hot vectors/binary class matrices for y
Y_train=np_utils.to_categorical(Y_train)
Y_test=np_utils.to_categorical(Y_test)

In [0]:
#changing datatype
X_train=X_train.astype("float32")
X_test=X_test.astype("float32")

In [0]:
#normalising the data
mean_image_train=np.mean(X_train,axis=0)
mean_image_test=np.mean(X_test,axis=0)
X_train -= mean_image_train
X_test -= mean_image_test
X_train /= 128
X_test /= 128

In [0]:
checkpoint=ModelCheckpoint(filepath,monitor='val_accuracy',verbose=1)

In [36]:
model=ResnetBuilder.build_resnet_18((img_channels,img_rows,img_cols),
                                    nb_classes
                                    )#creating a model instance
model.compile(loss="categorical_crossentropy",
              optimizer="adam",
              metrics=['accuracy'])

if not data_augmentation:
  print("not using data augmentation")
  model.fit(X_train,Y_train,
            batch_size=batch_size,
            nb_epoch=nb_epoch,
            validation_data=(X_test,Y_test),
            shuffle=True,
            callbacks=[lr_reducer, early_stopper, csv_logger, checkpoint]
            )

  from ipykernel import kernelapp as app


not using data augmentation
Train on 50000 samples, validate on 10000 samples
Epoch 1/5

Epoch 00001: saving model to /content/gdrive/My Drive/epochs:001-val_accuracy:0.4609.hdf5
Epoch 2/5

Epoch 00002: saving model to /content/gdrive/My Drive/epochs:002-val_accuracy:0.5364.hdf5
Epoch 3/5

Epoch 00003: saving model to /content/gdrive/My Drive/epochs:003-val_accuracy:0.6082.hdf5
Epoch 4/5

Epoch 00004: saving model to /content/gdrive/My Drive/epochs:004-val_accuracy:0.5651.hdf5
Epoch 5/5

Epoch 00005: saving model to /content/gdrive/My Drive/epochs:005-val_accuracy:0.6466.hdf5
