# ResNet 개요
- VGG이후 Network를 깊게 하는 것에 대한 연구가 진행. 하지만, Network의 깊이가 깊어질수록 성능이 저하되는 문제점이 존재.
    - Vanishing Gradient
    - 최적으로 loss 감소가 이루어지지 않음(수렴이 아닌 발산)

- ResNet의 주요 특징
    - Shortcut
        - 이전 레이어의 출력값을 conv layer를 거치지 않고 전달
    - Identity block

In [None]:
from tensorflow.keras.layers import Conv2D, Dense, BatchNormalization, Activation
from tensorflow.keras.layers import ZeroPadding2D, MaxPooling2D
from tensorflow.keras.layers import Dropout, GlobalAveragePooling2D
from tensorflow.keras.layers import Add
from tensorflow.keras.layers import Input
from tensorflow.keras.models import Model
import numpy as np

## Identity block 구현하기

In [None]:
# identity block 구현하기

def identity_block(input_tensor, middle_kernel_size, filters, stage, block):

    filter1, filter2, filter3 = filters
    
    # 레이어 이름설정
    conv_name = 'res_s' + str(stage) + '_b' + str(block) + '_branch_'
    bn_name = 'bn_s'+ str(stage) + '_b' + str(block) + '_branch_'

    # 1번째 Conv
    x = Conv2D(filters=filter1, kernel_size=(1,1), kernel_initializer='he_normal', name=conv_name+'a')(input_tensor)
    x = BatchNormalization(axis=3, name=bn_name+'a')(x)
    x = Activation('relu')(x)

    # 2번째 Conv
    x = Conv2D(filters=filter2, kernel_size=middle_kernel_size, padding='same', kernel_initializer='he_normal', name=conv_name+'b')(x)
    x = BatchNormalization(axis=3, name=bn_name+'b')(x)
    x = Activation('relu')(x)

    # 3번째 Conv
    x = Conv2D(filters=filter3, kernel_size=(1,1), kernel_initializer='he_normal', name=conv_name+'c')(x)
    x = BatchNormalization(axis=3, name=bn_name+'c')(x)
    
    # Add
    x = Add()([input_tensor, x])
    x = Activation('relu')(x)

    return x   


In [None]:
# identity block 실습
input_tensor = Input(shape=(56,56,256), name='test_input')
x = identity_block(input_tensor, middle_kernel_size=(3,3), filters=[64,64,256], stage=2, block='a')
x = identity_block(x, middle_kernel_size=(3,3), filters=[64,64,256], stage=2, block='b')
output = identity_block(x, middle_kernel_size=(3,3), filters=[64,64,256], stage=2, block='c')
identity_layers = Model(inputs=input_tensor, outputs=output)
identity_layers.summary()

Model: "model_4"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 test_input (InputLayer)        [(None, 56, 56, 256  0           []                               
                                )]                                                                
                                                                                                  
 res_2a_branch_a (Conv2D)       (None, 56, 56, 64)   16448       ['test_input[0][0]']             
                                                                                                  
 bn_2a_branch_a (BatchNormaliza  (None, 56, 56, 64)  256         ['res_2a_branch_a[0][0]']        
 tion)                                                                                            
                                                                                            

## reduce_block 구현하기
- feature map의 사이즈를 절반으로 줄이는 block 구현
- feature map의 사이즈가 절반으로 줄어드는 stage의 첫번째 block으로 사용

In [None]:
# feature map의 크기를 절반으로 줄이는 reduce_block()만들기

def reduce_block(input_tensor, middle_kernel_size, filters, stage, block, strides=(2,2)):
    
    filter1, filter2, filter3 = filters

    conv_name = 'res_s' + str(stage) + '_b' + str(block) + '_branch_'
    bn_name = 'bn_s'+ str(stage) + '_b' + str(block) + '_branch_'

    # 첫번째 conv (feature map의 크기를 절반으로 줄임. 첫번째 stage에서는 예외.)
    x = Conv2D(filters=filter1, kernel_size=(1,1), strides=strides, kernel_initializer='he_normal', name=conv_name+'a')(input_tensor)
    x = BatchNormalization(axis=3, name=bn_name+'a')(x)
    x = Activation('relu')(x)

    # 두번째 conv
    x = Conv2D(filters=filter2, kernel_size=middle_kernel_size, padding='same', kernel_initializer='he_normal', name=conv_name+'b')(x)
    x = BatchNormalization(axis=3, name=bn_name+'b')(x)
    x = Activation('relu')(x)

    # 세번째 conv
    x = Conv2D(filters=filter3, kernel_size=(1,1), kernel_initializer='he_normal', name=conv_name+'c')(x)
    x = BatchNormalization(axis=3, name=bn_name+'c')(x)
    
    # shortcut 
    shortcut = Conv2D(filters=filter3, kernel_size=(1,1), strides=strides, kernel_initializer='he_normal', name = conv_name+'shortcut')(input_tensor)
    shortcut = BatchNormalization(axis=3, name=bn_name+'shortcut')(shortcut)

    # ADD
    x = Add()([x, shortcut])
    x = Activation('relu')(x)

    return x



In [None]:
# reduce_block 실습

input_tensor = Input(shape=(56,56,256), name='test_input')
x = reduce_block(input_tensor, middle_kernel_size=(3,3), filters=[64,64,256], stage=2, block=1, strides=(2,2))
x = identity_block(x, middle_kernel_size=3, filters=[64,64,256], stage=2, block=2)
output = identity_block(x, middle_kernel_size=3, filters=[64,64,256], stage=2, block=3)
identity_layers = Model(inputs=input_tensor, outputs=output)
identity_layers.summary()


Model: "model_1"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 test_input (InputLayer)        [(None, 56, 56, 256  0           []                               
                                )]                                                                
                                                                                                  
 res_s2_b1_branch_a (Conv2D)    (None, 28, 28, 64)   16448       ['test_input[0][0]']             
                                                                                                  
 bn_s2_b1_branch_a (BatchNormal  (None, 28, 28, 64)  256         ['res_s2_b1_branch_a[0][0]']     
 ization)                                                                                         
                                                                                            

## 첫번째 stage 구현

In [None]:
# stage1 구현
# input image에 7x7 conv, strides = 2, filters = 64
# 3x3 maxpooling, strides = 2

from tensorflow.keras.layers import ZeroPadding2D, MaxPooling2D

def first_conv(input_tensor):

    x = ZeroPadding2D(padding=(3,3), name='conv1_pad')(input_tensor)
    x = Conv2D(filters=64, kernel_size=(7,7), strides=2, padding = 'valid', kernel_initializer='he_normal', name='conv1')(x)
    x = BatchNormalization(axis=3, name='conv1_nm')(x)
    x = Activation('relu')(x)

    x = ZeroPadding2D(padding=(1,1), name='pool1')(x)
    x = MaxPooling2D(pool_size=(3,3), strides=2)(x)

    return x


In [None]:
input_tensor=Input(shape=(224,224,3), name='test_input')
output = first_conv(input_tensor)

model = Model(inputs=input_tensor, outputs = output)
model.summary()

Model: "model_2"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 test_input (InputLayer)     [(None, 224, 224, 3)]     0         
                                                                 
 conv1_pad (ZeroPadding2D)   (None, 230, 230, 3)       0         
                                                                 
 conv1 (Conv2D)              (None, 112, 112, 64)      9472      
                                                                 
 conv1_nm (BatchNormalizatio  (None, 112, 112, 64)     256       
 n)                                                              
                                                                 
 activation_14 (Activation)  (None, 112, 112, 64)      0         
                                                                 
 pool1 (ZeroPadding2D)       (None, 114, 114, 64)      0         
                                                           

In [None]:
# first_conv 실습
# 주의할 점! 
# first_conv의 결과값(= (56,56,64)), identity_block의 결과값(= (56,56,256))
# 채널수가 다르기 때문에 연산을 할 수 없다. 
# first_conv의 결과값을 (56,56,64) -> (56,56,256)으로 변환하여 더해야 한다.=> 따라서, 첫번째 블록으로는 identity_block이 아닌 reduce_block을 사용.

input_tensor=Input(shape=(224,224,3), name='test_input')
x = first_conv(input_tensor)
# output = identity_block(x, middle_kernel_size=(3,3), filters=[64,64,256], stage=2, block=1)
output = reduce_block(x, 3, filters=[64,64,256], stage=2, block=1, strides=(1,1))


model = Model(inputs=input_tensor, outputs = output)
model.summary()

Model: "model_3"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 test_input (InputLayer)        [(None, 224, 224, 3  0           []                               
                                )]                                                                
                                                                                                  
 conv1_pad (ZeroPadding2D)      (None, 230, 230, 3)  0           ['test_input[0][0]']             
                                                                                                  
 conv1 (Conv2D)                 (None, 112, 112, 64  9472        ['conv1_pad[0][0]']              
                                )                                                                 
                                                                                            

## ResNet 구현하기

In [None]:
# resnet 구현
# cifar10 기준

def create_resnet(shape=(224,224,3), n_classes = 10):
    input_tensor=Input(shape=shape)

    # 첫번째 stage.
    # Conv, pool
    x = first_conv(input_tensor)

    # 두번째 stage.
    x = reduce_block(x, middle_kernel_size=(3,3), filters=[64,64,256], stage=2, block=1, strides=(1,1))    
    x = identity_block(x, middle_kernel_size=(3,3), filters=[64,64,256], stage=2, block=2)
    x = identity_block(x, middle_kernel_size=(3,3), filters=[64,64,256], stage=2, block=3)

    # 세번째 stage
    x = reduce_block(x, middle_kernel_size=(3,3), filters=[128,128,512], stage=3, block=1, strides=(2,2))    
    x = identity_block(x, middle_kernel_size=(3,3), filters=[128,128,512], stage=3, block=2)
    x = identity_block(x, middle_kernel_size=(3,3), filters=[128,128,512], stage=3, block=3)
    x = identity_block(x, middle_kernel_size=(3,3), filters=[128,128,512], stage=3, block=4)

    # 네번째 stage
    x = reduce_block(x, middle_kernel_size=(3,3), filters=[256,256,1024], stage=4, block=1, strides=(2,2))    
    x = identity_block(x, middle_kernel_size=(3,3), filters=[256,256,1024], stage=4, block=2)
    x = identity_block(x, middle_kernel_size=(3,3), filters=[256,256,1024], stage=4, block=3)
    x = identity_block(x, middle_kernel_size=(3,3), filters=[256,256,1024], stage=4, block=4)
    x = identity_block(x, middle_kernel_size=(3,3), filters=[256,256,1024], stage=4, block=5)
    x = identity_block(x, middle_kernel_size=(3,3), filters=[256,256,1024], stage=4, block=6)

    # 다섯번째 stage
    x = reduce_block(x, middle_kernel_size=(3,3), filters=[512,512,2048], stage=5, block=1, strides=(2,2))
    x = identity_block(x, middle_kernel_size=(3,3), filters=[512,512,2048], stage=5, block=2)
    x = identity_block(x, middle_kernel_size=(3,3), filters=[512,512,2048], stage=5, block=3)


    # fc layer
    x = GlobalAveragePooling2D(name='GAP')(x)
    x = Dropout(rate=0.5)(x)
    x = Dense(200, activation='relu', name='fc_1')(x)
    x = Dropout(rate=0.5)(x)
    output = Dense(n_classes, activation='relu', name='fc_final')(x)

    model = Model(inputs=input_tensor, outputs=output)
    model.summary()

    return model




In [None]:
model = create_resnet(shape=(224,224,3), n_classes=10)

Model: "model_4"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 224, 224, 3  0           []                               
                                )]                                                                
                                                                                                  
 conv1_pad (ZeroPadding2D)      (None, 230, 230, 3)  0           ['input_1[0][0]']                
                                                                                                  
 conv1 (Conv2D)                 (None, 112, 112, 64  9472        ['conv1_pad[0][0]']              
                                )                                                                 
                                                                                            

In [None]:
# CIFAR-10 데이터 세트로 성능 테스트

IMAGE_SIZE = 128
BATCH_SIZE = 64

In [None]:
from tensorflow.keras.utils import to_categorical
from sklearn.model_selection import train_test_split
from tensorflow.keras.datasets import cifar10
from tensorflow.keras.utils import Sequence
import cv2
import sklearn


def get_preprocessed_ohe(images, labels, pre_func=None):
    if pre_func is not None:
        images = pre_func(labels)
    
    oh_labels = to_categorical(labels)
    return images, oh_labels

def get_train_valid_test_set(train_images, train_labels, test_images, test_labels, valid_size=0.5, random_state=2021):

    train_images, train_oh_labels = get_preprocessed_ohe(train_images, train_labels)
    test_images, test_oh_labels = get_preprocessed_ohe(test_images, test_labels)

    tr_images, val_images, tr_oh_labels, val_oh_labels = train_test_split(train_images, train_oh_labels, test_size=valid_size, random_state=2021)

    return (tr_images, tr_oh_labels), (val_images, val_oh_labels), (test_images, test_oh_labels)

class CIFAR_Dataset(Sequence):

    def __init__(self, images_array, labels, batch_size=BATCH_SIZE, augmentor=None, shuffle=False, pre_func=None):
        self.images_array = images_array
        self.labels = labels
        self.batch_size = batch_size
        self.augmentor = augmentor
        self.pre_func = pre_func
        self.shuffle = shuffle
        if self.shuffle:
            self.on_epoch_end()

    def __len__(self):
        
        return int(np.ceil(len(self.labels)/self.batch_size))


    def __getitem__(self, index):

        images_fetch = self.images_array[index*self.batch_size:(index+1)*self.batch_size]
        if self.labels is not None:
            label_batch = self.labels[index*self.batch_size:(index+1)*self.batch_size]
        
        image_batch = np.zeros((images_fetch.shape[0], IMAGE_SIZE, IMAGE_SIZE, 3), dtype='float32')

        for image_index in range(images_fetch.shape[0]):
            image = cv2.resize(images_fetch[image_index], (IMAGE_SIZE, IMAGE_SIZE))

            if self.augmentor is not None:
                image = self.augmentor(image=image)['image']

            if self.pre_func is not None:
                image = self.pre_func(image)

            image_batch[image_index] = image

        return image_batch, label_batch

    def on_epoch_end(self):
        if(self.shuffle):
            self.image_array, self.labels = sklearn.utils.shuffle(self.image_array, self.labels)
        else:
            pass


In [None]:
# cifar10 데이터 가져오기

(train_images, train_labels), (test_images, test_labels) = cifar10.load_data()
print(train_images.shape, train_labels.shape, test_images.shape, test_labels.shape)

(tr_images, tr_oh_labels), (val_images, val_oh_labels), (test_image, test_oh_labels) = \
    get_train_valid_test_set(train_images, train_labels, test_images, test_labels, valid_size=0.5, random_state=2021)
print(tr_images.shape, tr_oh_labels.shape, val_images.shape, val_oh_labels.shape, test_images.shape, test_oh_labels.shape)

(50000, 32, 32, 3) (50000, 1) (10000, 32, 32, 3) (10000, 1)
(25000, 32, 32, 3) (25000, 10) (25000, 32, 32, 3) (25000, 10) (10000, 32, 32, 3) (10000, 10)


In [None]:
# 학습용, 검증용 데이터셋 제너레이터 생성

from tensorflow.keras.applications.resnet50 import preprocess_input as resnet_preprocess

tr_ds = CIFAR_Dataset(tr_images, tr_oh_labels, batch_size = BATCH_SIZE, augmentor = None, shuffle = True, pre_func=resnet_preprocess)
val_ds = CIFAR_Dataset(val_images, val_oh_labels, batch_size=BATCH_SIZE, augmentor = None, shuffle = False, pre_func=resnet_preprocess)

print(next(iter(tr_ds))[0].shape, next(iter(val_ds))[0].shape)
print(next(iter(tr_ds))[1].shape, next(iter(val_ds))[1].shape)
print(next(iter(tr_ds))[0][0])

(64, 128, 128, 3) (64, 128, 128, 3)
(64, 10) (64, 10)
[[[ -92.939 -104.779  -55.68 ]
  [ -92.939 -104.779  -55.68 ]
  [ -91.939 -103.779  -52.68 ]
  ...
  [ -97.939 -111.779 -116.68 ]
  [-100.939 -116.779 -123.68 ]
  [-100.939 -116.779 -123.68 ]]

 [[ -92.939 -104.779  -55.68 ]
  [ -92.939 -104.779  -55.68 ]
  [ -92.939 -103.779  -52.68 ]
  ...
  [ -97.939 -111.779 -116.68 ]
  [-100.939 -116.779 -123.68 ]
  [-100.939 -116.779 -123.68 ]]

 [[ -92.939 -103.779  -52.68 ]
  [ -92.939 -103.779  -52.68 ]
  [ -91.939 -102.779  -49.68 ]
  ...
  [ -97.939 -111.779 -117.68 ]
  [-100.939 -116.779 -123.68 ]
  [-100.939 -116.779 -123.68 ]]

 ...

 [[ -86.939  -90.779  -95.68 ]
  [ -86.939  -90.779  -95.68 ]
  [ -86.939  -88.779  -91.68 ]
  ...
  [ -85.939  -82.779  -58.68 ]
  [ -86.939  -83.779  -59.68 ]
  [ -86.939  -83.779  -59.68 ]]

 [[ -85.939  -90.779  -97.68 ]
  [ -85.939  -90.779  -97.68 ]
  [ -85.939  -89.779  -93.68 ]
  ...
  [ -84.939  -83.779  -61.68 ]
  [ -85.939  -83.779  -62.68 ]
  [

In [None]:
model = create_resnet(in_shape=(128,128,3), n_classes=10)

model.compile(optimizer=Adam, loss='categorical_crossentropy', metrics=['accuracy'])

history = model.fit(tr_ds, epochs=10, validation_data = val_ds)

In [None]:
test_ds = CIFAR_Dataset(test_images, test_oh_labels, batch_size=BATCH_SIZE, augmentor=None, shuffle=False, pre_func=resnet_preprocess)
model.evaluate(test_ds)