In [1]:
from sklearn.preprocessing import OneHotEncoder

import tensorflow as tf

from keras.models import Model
from keras.layers import Input
from keras.layers import Dense
from keras.layers import Flatten
from keras.layers.convolutional import Conv2D
from keras.layers.convolutional import AveragePooling2D
from keras import backend as K

from keras.datasets import cifar10

# Prepare Data

In [2]:
(X_train_org, y_train_org), (X_test_org, y_test_org) = cifar10.load_data()

In [3]:
ohe = OneHotEncoder()
ohe.fit(y_train_org)

y_train = ohe.transform(y_train_org).toarray()
y_test = ohe.transform(y_test_org).toarray()

In [4]:
X_train = X_train_org.astype('float32') / 127.5 - 1
X_test = X_test_org.astype('float32') / 127.5 - 1

# Define ResNet

In [5]:
class ResNet(object):
    def __init__(self, input_shape, num_residual_blocks_arr, num_filter_base, num_class):
        
        self._model = self._res_net(input_shape, num_residual_blocks_arr, num_filter_base, num_class)
        
        
        self._model.compile(loss='categorical_crossentropy',
              optimizer='adam',
              metrics=['accuracy'])
    
    def fit(self, X_train, y_train, batch_size=32, epochs=100, X_test=None, y_test=None):
        if X_test is not None and y_test is not None:
            self._model.fit(X_train, y_train,
                      batch_size=batch_size,
                      epochs=epochs,
                      validation_data=(X_test, y_test),
                      shuffle=True)
        else:
            self._model.fit(X_train, y_train,
                  batch_size=batch_size,
                  epochs=epochs,
                  shuffle=True)
    
    def predict(self, x):
        return self._model.predict(x)
    
    def _res_net(self, input_shape, num_residual_blocks_arr, num_filter_base, num_class):
        input = Input(shape=input_shape)
        conv0 = Conv2D(num_filter_base, 
                       kernel_size=(3, 3), 
                       strides=(1, 1), 
                       padding='same', 
                       activation='relu')(input)
   
        last_layer = conv0
        num_filters = num_filter_base
        
        for num_residual_blocks in num_residual_blocks_arr:
            num_filters = num_filters * 2
            for _ in range(num_residual_blocks):    
                conv = self._residual_block(last_layer, num_filters)
                last_layer = conv

        shape = K.int_shape(last_layer)
        # Global Average Pooling along Width and Height
        pool = AveragePooling2D(pool_size=(shape[1], shape[2]),
                                 strides=(1, 1))(last_layer)
        flatten = Flatten()(pool)
        logits = Dense(units=num_class,
                      activation="softmax")(flatten)

        model = Model(inputs=input, outputs=logits)

        return model

    def _residual_block(self, x, num_output_channel):
        num_input_channel = K.int_shape(x)[-1]

        if num_output_channel == num_input_channel * 2:
            # number of output channel doubled. 3 things will happen:
            # 1. The output image' width and length will be halved
            # 2. The input image will go through a pooling layer with its width and length will be halved too
            # 3. pad the number of channels of the input images for residual learning
            output_channels_doubled = True
            strides = (2, 2)
        elif num_input_channel == num_output_channel:
            # number of output channel remain the same
            # go through 2 convolution layers without changing images' size
            output_channels_doubled = False
            strides = (1, 1)
        else:
            raise ValueError(f"Invalid output channnel: {num_output_channel}")
        
        conv1 = Conv2D(num_output_channel, 
                       kernel_size=(3, 3), 
                       strides=strides, 
                       padding='same', 
                       activation='relu')(x)

        conv2 = Conv2D(num_output_channel, 
                       kernel_size=(3, 3), 
                       strides=(1, 1), 
                       padding='same', 
                       activation='relu')(conv1)    

        if output_channels_doubled:
            pooled_x = AveragePooling2D(pool_size=(2, 2),
                                        strides=(2, 2),
                                        padding = 'same')(x)

            padded_x = tf.pad(pooled_x,
                              [[0,0],
                               [0,0],
                               [0,0],
                               [num_input_channel // 2, num_input_channel // 2]])
        else:
            padded_x = x

        #residual learning
        output_x = conv2 + padded_x

        return output_x

# Training and Testing

In [6]:
batch_size = 128
nb_classes = 10
nb_epoch = 5

res34 = ResNet(input_shape=(32,32,3),
               num_residual_blocks_arr=[3, 4, 6, 3], 
               num_filter_base=32,
               num_class=10)

In [7]:
res34.fit(X_train, y_train,
          batch_size=batch_size,
          epochs=nb_epoch,
          X_test=X_test,
          y_test=y_test)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5
