In [1]:
import numpy as np
import tensorflow as tf

In [2]:
(X_train, Y_train), (X_test, Y_test) = tf.keras.datasets.mnist.load_data()
print(X_train.shape)
print(Y_train.shape)
print(X_test.shape)
print(Y_test.shape)

size_height = X_train.shape[1]
size_width = X_train.shape[2]
num_data_train = X_train.shape[0]
num_data_test = X_test.shape[0]

X_train = X_train.astype(np.float32)
X_test = X_test.astype(np.float32)

X_train = X_train[..., np.newaxis]
X_test = X_test[..., np.newaxis]

X_train /= 255.0
X_test /= 255.0

print(X_train.shape)
print(Y_train.shape)
print(X_test.shape)
print(Y_test.shape)

(60000, 28, 28)
(60000,)
(10000, 28, 28)
(10000,)
(60000, 28, 28, 1)
(60000,)
(10000, 28, 28, 1)
(10000,)


In [3]:
num_epochs = 10
size_batch = 64
num_classes = np.unique(Y_train).shape[0]

rate_learning = 1e-3
rate_dropout = 0.5

In [4]:
dataset_train = tf.data.Dataset.from_tensor_slices((X_train, Y_train))
dataset_train = dataset_train.shuffle(10000).batch(size_batch)

dataset_test = tf.data.Dataset.from_tensor_slices((X_test, Y_test))
dataset_test = dataset_test.batch(size_batch)

In [9]:
class ResidualBlock(tf.keras.layers.Layer):
    def __init__(self, num_filters, downsampling):
        super(ResidualBlock, self).__init__()
                
        ### 채우기 시작 ###
        self.conv2a = tf.keras.layers.Conv2D(filters = num_filters,
                                             kernel_size = (1, 1),
                                            strides = 1,
                                            padding = 'same')
        self.bn2a = tf.keras.layers.BatchNormalization()
        self.conv2b = tf.keras.layers.Conv2D(filters = num_filters,
                                             kernel_size = (3, 3),
                                            strides = 1,
                                            padding = 'same')
        self.bn2b = tf.keras.layers.BatchNormalization()
        self.conv2c = tf.keras.layers.Conv2D(filters = num_filters * 4,
                                             kernel_size = (1, 1),
                                            strides = 1,
                                            padding = 'same')
        self.bn2c = tf.keras.layers.BatchNormalization()
        
        self.downsample = tf.keras.Sequential()
        self.downsample.add(tf.keras.layers.Conv2D(filters = num_filters * 4,
                                                  kernel_size = (1, 1),
                                                  strides = 1))
        self.downsample.add(tf.keras.layers.BatchNormalization())
        
        ### 채우기 끝 ###

    def call(self, inputs, training):
        ### 채우기 시작 ###
        residual = self.downsample(inputs)
        
        outputs = self.conv2a(inputs)
        outputs = self.bn2a(outputs, training = training)
        outputs = tf.nn.relu(outputs)
        outputs = self.conv2b(outputs)
        outputs = self.bn2b(outputs, training = training)
        outputs = tf.nn.relu(outputs)
        outputs = self.conv2c(outputs)
        outputs = self.bn2c(outputs, training = training)

        outputs = tf.nn.relu(tf.keras.layers.add([residual, outputs]))
        
        ### 채우기 끝 ###
        
        return outputs

In [6]:
def make_residual_block_layer(num_filters, num_blocks, downsampling):
    block_residual = tf.keras.Sequential()
    block_residual.add(ResidualBlock(num_filters, downsampling))

    for _ in range(1, num_blocks):
        block_residual.add(ResidualBlock(num_filters, False))

    return block_residual

In [7]:
class ResNet(tf.keras.Model):
    def __init__(self, num_classes, list_num_filters):
        super(ResNet, self).__init__()

        self.layer_conv_1 = tf.keras.layers.Conv2D(filters=64,
                                                   kernel_size=(7, 7),
                                                   strides=2,
                                                   padding='same')
        self.layer_bn_1 = tf.keras.layers.BatchNormalization()
        self.layer_act_1 = tf.keras.layers.ReLU()
        self.layer_pool_1 = tf.keras.layers.MaxPool2D(pool_size=(3, 3),
                                                      strides=2,
                                                      padding='same')

        self.layer_block_1 = make_residual_block_layer(
            num_filters=64,
            num_blocks=list_num_filters[0],
            downsampling=False
        )
        self.layer_block_2 = make_residual_block_layer(
            num_filters=128,
            num_blocks=list_num_filters[1],
            downsampling=True
        )
        self.layer_block_3 = make_residual_block_layer(
            num_filters=256,
            num_blocks=list_num_filters[2],
            downsampling=True
        )
        self.layer_block_4 = make_residual_block_layer(
            num_filters=512,
            num_blocks=list_num_filters[3],
            downsampling=True
        )

        self.layer_pool_avg = tf.keras.layers.GlobalAveragePooling2D()
        self.layer_fc = tf.keras.layers.Dense(num_classes,
                                              activation=tf.nn.softmax)
        
    def call(self, inputs, training):
        outputs = inputs
        
        outputs = self.layer_conv_1(outputs)
        outputs = self.layer_bn_1(outputs, training=training)
        outputs = self.layer_act_1(outputs)
        outputs = self.layer_pool_1(outputs)
        outputs = self.layer_block_1(outputs, training=training)
        outputs = self.layer_block_2(outputs, training=training)
        outputs = self.layer_block_3(outputs, training=training)
        outputs = self.layer_block_4(outputs, training=training)
        outputs = self.layer_pool_avg(outputs)
        outputs = self.layer_fc(outputs)

        return outputs

In [10]:
model = ResNet(num_classes, [2, 2, 2, 2])
optimizer = tf.keras.optimizers.Adam(learning_rate=rate_learning)
loss = tf.keras.losses.SparseCategoricalCrossentropy()

metric_train = tf.keras.metrics.SparseCategoricalAccuracy()
metric_test = tf.keras.metrics.SparseCategoricalAccuracy()

model.build((None, size_height, size_width, 1))
model.summary()

Model: "res_net_1"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d_4 (Conv2D)            multiple                  3200      
_________________________________________________________________
batch_normalization_4 (Batch multiple                  256       
_________________________________________________________________
re_lu_1 (ReLU)               multiple                  0         
_________________________________________________________________
max_pooling2d_1 (MaxPooling2 multiple                  0         
_________________________________________________________________
sequential_2 (Sequential)    multiple                  215296    
_________________________________________________________________
sequential_5 (Sequential)    multiple                  930304    
_________________________________________________________________
sequential_8 (Sequential)    multiple                  36

In [11]:
@tf.function
def step_train(X, by):
    with tf.GradientTape() as tape:
        preds_ = model(X, True)
        loss_ = loss(by, preds_)

    grads_ = tape.gradient(loss_, model.trainable_weights)
    optimizer.apply_gradients(zip(grads_, model.trainable_weights))
    metric_train.update_state(by, preds_)

    return loss_

@tf.function
def step_test(X, by):
    preds_ = model(X, False)
    loss_ = loss(by, preds_)

    metric_test.update_state(by, preds_)

    return loss_

In [None]:
for ind_epoch in range(0, num_epochs):
    loss_train = 0.0
    loss_test = 0.0

    num_train = 0.0
    num_test = 0.0

    for ind_iter, (X_batch, by_batch) in enumerate(dataset_train):
        loss_ = step_train(X_batch, by_batch)
        loss_train += loss_ * X_batch.shape[0]
        num_train += X_batch.shape[0]

    loss_train /= num_train

    acc_train = metric_train.result()
    metric_train.reset_states()

    for X_batch, by_batch in dataset_test:
        loss_ = step_test(X_batch, by_batch)
        loss_test += loss_ * X_batch.shape[0]
        num_test += X_batch.shape[0]

    loss_test /= num_test

    acc_test = metric_test.result()
    metric_test.reset_states()

    print('{} EPOCH: loss_train {:.4f} acc_train {:.4f} loss_test {:.4f} acc_test {:.4f}'.format(
        ind_epoch + 1, loss_train, acc_train, loss_test, acc_test))   