# Residual Network Implementation

In [1]:
import tensorflow as tf
import numpy as np

In [2]:
#1. hyperparameter
Epochs = 10

In [3]:
#2. build model #Residual Network

#Residual unit : Residual Network를 구성하는 가장 작은 단위로, Gradient highway가 발생하는 Pre-Activation 부분입니다. 
#Pre-activation Residual Unit : Batch normalization-ReLu-Conv layer구조
class ResUnit(tf.keras.Model):
    def __init__(self, filter_in, filter_out, kernel_size):
        super(ResUnit, self).__init__()
        self.batchnormalization1 = tf.keras.layers.BatchNormalization()
        #relu => 변수가 없는 layer이기 때문에 initialize 안해줘도 되어서 그냥 call에서 바로 쓴다. 
        self.conv1 = tf.keras.layers.Conv2D(filter_out, kernel_size, padding='SAME')
        
        self.batchnormalization2 = tf.keras.layers.BatchNormalization()
        #relu
        self.conv2 = tf.keras.layers.Conv2D(filter_out, kernel_size, padding='SAME')
        
        #identity: dimension(channel의 개수가 같아야한다.)
        if filter_in == filter_out: #개수가 같으면 그대로 사용하지만  
            self.identity = lambda x: x
        else: #같지 않으면 1x1 conv를 사용하여 크기를 맞춰줍니다. 
            self.identity = tf.keras.layers.Conv2D(filter_out, (1, 1), padding = 'SAME')
    
    def call(self, x, training = False, mask = None):
        h = self.batchnormalization1(x, training = training)
        h = tf.nn.relu(h)
        h = self.conv1(h)
        
        h = self.batchnormalization2(h, training = training)
        h = tf.nn.relu(h)
        h = self.conv2(h)
        
        #call해서 Pre-activation인 ResUnit을 통과했으면 이 결과를 input이랑 다시 더 해 주어야 한다(이를 identity함수로 만들었다. ). 따라서
        #input과 결과를 크기 맞춰주기 위해
        return self.identity(x) + h


In [4]:
#Residual layer: Residual Unit을 여러개 연결 한 것

class ResLayer(tf.keras.Model):
    def __init__(self, filter_in, filters, kernel_size):
        super(ResLayer, self).__init__()
        self.sequence = list()
        
        #filter_in[16] + filters[32, 32, 32] = zip([16, 32, 32, 32], [32, 32, 32])
        for f_in, f_out in zip([filter_in]+list(filters), filters):
            self.sequence.append(ResUnit(f_in, f_out, kernel_size))
        
    def call(self, x, training = False, mask = None):
        for unit in self.sequence:
            x = unit(x, training = training)
        return x

In [5]:
#ResNet 모델 정의
class ResNet(tf.keras.Model):
    def __init__(self):
        super(ResNet, self).__init__()
        self.conv1 = tf.keras.layers.Conv2D(8, (3, 3), padding = 'same', activation = 'relu')#28x28 #처음엔 영상하나ㅏ 뽑고 시작
        
        self.res1 = ResLayer(8, (16, 16), (3, 3)) #28x28s16
        self.pool1 = tf.keras.layers.MaxPool2D((2, 2)) #14x14x16
        
        self.res2 = ResLayer(16, (32, 32), (3, 3)) #14x14x32
        self.pool2 = tf.keras.layers.MaxPool2D((2, 2))#7x7x32
        
        self.res3 = ResLayer(32, (64, 64), (3, 3)) #7x7x64
        self.flatten = tf.keras.layers.Flatten() #7x7x64 = 3136
        self.dense1 = tf.keras.layers.Dense(128, activation='relu') #3136 -> 128
        self.dense2 = tf.keras.layers.Dense(10, activation = 'softmax') #128 -> 10(class_num)
    
    def call(self, x, training = False, mask = None): #call에서 모든 layer 연결
        x = self.conv1(x)
        
        x = self.res1(x, training=training)
        x = self.pool1(x) 
                  
        x = self.res2(x, training=training)
        x = self.pool2(x) 
                  
        x = self.res3(x, training=training)
        x = self.flatten(x)
        x = self.dense1(x)
        output = self.dense2(x)
        return output        
        

In [6]:
#3. Implement training loop

@tf.function
def train_step(model, images, labels, loss_object, optimizer, train_loss, train_accuracy):
    with tf.GradientTape() as tape:
        predictions = model(images)
        loss = loss_object(labels, predictions)
    gradients = tape.gradient(loss, model.trainable_variables)
    
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))
    train_loss(loss)
    train_accuracy(labels, predictions)

In [7]:
#4. Implement testing loop
@tf.function
def test_step(model, images, labels, loss_object, test_loss, test_accuracy):
    predictions = model(images)
    
    t_loss = loss_object(labels, predictions)
    test_loss(t_loss)
    test_accuracy(labels, predictions)

In [8]:
#5. Prepare Dataset : keras에서 제공하는 mnist dataset 사용

#call Data
mnist = tf.keras.datasets.mnist #이렇게 keras에서 mnist데이터를 가져옵니다. 
(X_train, y_train), (X_test, y_test) = mnist.load_data() #이 데이터의 구성이 train/test로 나뉘어져 있다. 

#Normalization
X_train, x_test = X_train/255.0, X_test/255.0 #0~255로 표현이 되어 있으니 0~1로 바꿔줄 수 있다. 

#(num_sample, height, weight)-> (num_sample, height, weight, channel in)추가
X_train = X_train[..., tf.newaxis].astype(np.float32)#casting도 잊지말고 해줍니다. 
X_test = X_test[..., tf.newaxis].astype(np.float32)

#dataset 만둘어주기
train_ds = tf.data.Dataset.from_tensor_slices((X_train, y_train)).shuffle(10000).batch(32) #tf.data.Dataset.from_tensor_slices() 이 함수를 이용하면 numpy나 tensor로부터 dataset을 구축할 수 있습니다. 
test_ds = tf.data.Dataset.from_tensor_slices((X_test, y_test)).batch(32)#test는 shuffle이 필요없습니다. 


In [9]:
#6. Define the training environment

#create model
model = ResNet()

#Define loss and optimizer
loss_object = tf.keras.losses.SparseCategoricalCrossentropy()
optimizer = tf.keras.optimizers.Adam()

#Define performance metrics
train_loss = tf.keras.metrics.Mean(name = 'train_loss')
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name = 'train_accuracy')

test_loss = tf.keras.metrics.Mean(name = 'test_loss')
test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name = 'test_accuracy')

In [10]:
# 7. Training

for epoch in range(Epochs):
    for images, labels in train_ds:
        train_step(model, images, labels, loss_object, optimizer, train_loss, train_accuracy)
        
    for test_images, test_labels in test_ds:
        test_step(model, test_images, test_labels, loss_object, test_loss, test_accuracy)
        
    template = 'Epoch {}, loss: {}, Accuracy: {}, Test Loss: {}, Test Accuracy: {}'
    print(template.format(epoch+1, 
                         train_loss.result(), #result() — 누적된 측정 결과를 얻습니다.
                         train_accuracy.result()*100, 
                         test_loss.result(), 
                         test_accuracy.result()*100))

Epoch 1, loss: 0.1103610247373581, Accuracy: 96.56166076660156, Test Loss: 4.02578067779541, Test Accuracy: 98.73999786376953
Epoch 2, loss: 0.07808578014373779, Accuracy: 97.56416320800781, Test Loss: 4.287351608276367, Test Accuracy: 98.86500549316406
Epoch 3, loss: 0.06364230811595917, Accuracy: 98.02777862548828, Test Loss: 4.632501602172852, Test Accuracy: 98.86333465576172
Epoch 4, loss: 0.05491233617067337, Accuracy: 98.29750061035156, Test Loss: 5.520465850830078, Test Accuracy: 98.79750061035156
Epoch 5, loss: 0.048532795161008835, Accuracy: 98.49766540527344, Test Loss: 6.2708024978637695, Test Accuracy: 98.802001953125
Epoch 6, loss: 0.04405029118061066, Accuracy: 98.6369400024414, Test Loss: 6.352503299713135, Test Accuracy: 98.80500030517578
Epoch 7, loss: 0.040740326046943665, Accuracy: 98.73475646972656, Test Loss: 6.281417369842529, Test Accuracy: 98.85428619384766
Epoch 8, loss: 0.03764975443482399, Accuracy: 98.82521057128906, Test Loss: 6.469360828399658, Test Accura

In [11]:
from IPython.core.display import display, HTML
display(HTML("<style>.container {width:90% !important;}</style>"))