In [1]:
import numpy as np
import pandas as pd
import os

### CIFAR10 Dataset Preprocessing and Split

In [2]:
from tensorflow.keras.datasets import cifar10

(train_images, train_labels), (test_images, test_labels) = \
    cifar10.load_data()

print('train dataset shape: ', train_images.shape, train_labels.shape)
print('test dataset shape: ', test_images.shape, test_labels.shape)

Downloading data from https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz
train dataset shape:  (50000, 32, 32, 3) (50000, 1)
test dataset shape:  (10000, 32, 32, 3) (10000, 1)


In [3]:
from tensorflow.keras.utils import to_categorical
from sklearn.model_selection import train_test_split
from tensorflow.keras.datasets import cifar10

def zero_one_scaler(image):
    return image/255.0

def get_preprocessed_ohe(images, labels, pre_func=None):
    if pre_func is not None:
        image = pre_func(images)
    oh_labels = to_categorical(labels)
    return images, oh_labels

def get_train_valid_test_set(train_images, train_labels, test_images, test_labels, valid_size=0.15, random_state=2021):
    train_images, train_oh_labels = get_preprocessed_ohe(train_images, train_labels)
    test_images, test_oh_labels = get_preprocessed_ohe(test_images, test_labels)
    
    tr_images, val_images, tr_oh_labels, val_oh_labels = train_test_split(train_images, train_oh_labels, test_size=valid_size, random_state=random_state)
    
    return (tr_images, tr_oh_labels), (val_images, val_oh_labels), (test_images, test_oh_labels)

In [4]:
(tr_images, tr_oh_labels), (val_images, val_oh_labels), (test_images, test_oh_labels) = \
    get_train_valid_test_set(train_images, train_labels, test_images, test_labels, valid_size=0.2, random_state=2021)

print('train dataset shape: ', tr_images.shape, tr_oh_labels.shape)
print('validation dataset shape: ', val_images.shape, val_oh_labels.shape)
print('test dataset shape: ', test_images.shape, test_oh_labels.shape)

train dataset shape:  (40000, 32, 32, 3) (40000, 10)
validation dataset shape:  (10000, 32, 32, 3) (10000, 10)
test dataset shape:  (10000, 32, 32, 3) (10000, 10)


### Identity block Create Function
![](https://raw.githubusercontent.com/chulminkw/CNN_PG/main/utils/images/residual_block_small.png)

In [5]:
from tensorflow.keras.layers import Conv2D, Dense, BatchNormalization, Activation
from tensorflow.keras.layers import add, Add

def identity_block(input_tensor, middle_kernel_size, filters, stage, block):
    
    filter1, filter2, filter3 = filters
    
    conv_name_base = 'res' + str(stage) + block + '_branch'
    bn_name_base = 'bn' + str(stage) + block + '_branch'
    
    # 1x1 Conv -> BN -> ReLU
    x = Conv2D(filters=filter1, kernel_size=(1, 1), kernel_initializer='he_normal', name=conv_name_base + '2a')(input_tensor)
    x = BatchNormalization(axis=-1, name=bn_name_base + '2a')(x)
    x = Activation('relu')(x)
    
    # 3x3 Conv -> BN -> ReLU
    x = Conv2D(filters=filter2, kernel_size=middle_kernel_size, padding='same', kernel_initializer='he_normal', name=conv_name_base + '2b')(x)
    x = BatchNormalization(axis=-1, name=bn_name_base + '2b')(x)
    x = Activation('relu')(x)
    
    # 1x1 Conv -> BN
    x = Conv2D(filters=filter3, kernel_size=(1, 1), kernel_initializer='he_normal', name=conv_name_base + '2c')(x)
    x = BatchNormalization(axis=-1, name=bn_name_base + '2c')(x)
    
    # Add
    x = Add()([input_tensor, x])
    

    # Identity block Activation
    x = Activation('relu')(x)
    
    return x

### Identity block Structure Confirm

In [6]:
from tensorflow.keras.layers import Input
from tensorflow.keras.models import Model

input_tensor = Input(shape=(56, 56, 256), name='test_input')

filters = [64, 64, 256]

kernel_size = (3, 3)

stage = 2

block = 'a'

output = identity_block(input_tensor, kernel_size, filters, stage, block)

identity_layers = Model(inputs=input_tensor, outputs=output)

identity_layers.summary()

Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
test_input (InputLayer)         [(None, 56, 56, 256) 0                                            
__________________________________________________________________________________________________
res2a_branch2a (Conv2D)         (None, 56, 56, 64)   16448       test_input[0][0]                 
__________________________________________________________________________________________________
bn2a_branch2a (BatchNormalizati (None, 56, 56, 64)   256         res2a_branch2a[0][0]             
__________________________________________________________________________________________________
activation (Activation)         (None, 56, 56, 64)   0           bn2a_branch2a[0][0]              
______________________________________________________________________________________________

### One Stage Consist of continuous identity blocks

In [7]:
input_tensor = Input(shape=(56, 56, 256), name='test_input')

x = identity_block(
    input_tensor,
    middle_kernel_size=3,
    filters=[64, 64, 256],
    stage=2,
    block='a'
)

x = identity_block(
    x,
    middle_kernel_size=3,
    filters=[64, 64, 256],
    stage=2,
    block='b'
)

output = identity_block(
    x,
    middle_kernel_size=3,
    filters=[64, 64, 256],
    stage=2,
    block='c'
)

identity_layers = Model(inputs=input_tensor, outputs=output)

identity_layers.summary()

Model: "model_1"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
test_input (InputLayer)         [(None, 56, 56, 256) 0                                            
__________________________________________________________________________________________________
res2a_branch2a (Conv2D)         (None, 56, 56, 64)   16448       test_input[0][0]                 
__________________________________________________________________________________________________
bn2a_branch2a (BatchNormalizati (None, 56, 56, 64)   256         res2a_branch2a[0][0]             
__________________________________________________________________________________________________
activation_3 (Activation)       (None, 56, 56, 64)   0           bn2a_branch2a[0][0]              
____________________________________________________________________________________________

### Conv block Create Function
- 각 stage 내의 첫 번째 identity block에서 입력 feature map의 크기를 1/2로 줄이는 block을 생성

In [13]:
def conv_block(input_tensor, middle_kernel_size, filters, stage, block, strides=(2, 2)):
    
    filter1, filter2, filter3 = filters
    
    conv_name_base = 'res' + str(stage) + block + '_branch'
    bn_name_base = 'bn' + str(stage) + block + '_branch'
    
    # 1x1 Conv -> BN -> ReLU
    # input feature map size x 1/2 -> strides
    x = Conv2D(filters=filter1, kernel_size=(1, 1), strides=strides, kernel_initializer='he_normal', name=conv_name_base+'2a')(input_tensor)
    x = BatchNormalization(axis=-1, name=bn_name_base+'2a')(x)
    x = Activation('relu')(x)
    
    # 3x3 Conv -> BN -> ReLU
    x = Conv2D(filters=filter2, kernel_size=middle_kernel_size, padding='same', kernel_initializer='he_normal', name=conv_name_base+'2b')(x)
    x = BatchNormalization(axis=-1, name=bn_name_base+'2b')(x)
    x = Activation('relu')(x)
    
    # 1x1 Conv -> BN
    x = Conv2D(filters=filter3, kernel_size=(1, 1), kernel_initializer='he_normal', name=conv_name_base+'2c')(x)
    x = BatchNormalization(axis=-1, name=bn_name_base+'2c')(x)
    
    # shortcut
    shortcut = Conv2D(filter3, (1, 1), strides=strides, kernel_initializer='he_normal', name=conv_name_base+'1')(input_tensor)
    shortcut = BatchNormalization(axis=-1, name=bn_name_base+'1')(shortcut)
    
    # ADD
    x = add([x, shortcut])
    
    # Activation
    x = Activation('relu')(x)
    
    return x

### One stage consists of conv_block() and identity_block()

In [14]:
input_tensor = Input(shape=(56, 56, 256), name='test_input')

x = conv_block(
    input_tensor,
    middle_kernel_size=3,
    filters=[64, 64, 256],
    strides=2,
    stage=2,
    block='a'
)

x = identity_block(
    x,
    middle_kernel_size=3,
    filters=[64, 64, 256],
    stage=2,
    block='b'
)

output = identity_block(
    x,
    middle_kernel_size=3,
    filters=[64, 64, 256],
    stage=2,
    block='c'
)

identity_layers = Model(inputs=input_tensor, outputs=output)

identity_layers.summary()

Model: "model_2"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
test_input (InputLayer)         [(None, 56, 56, 256) 0                                            
__________________________________________________________________________________________________
res2a_branch2a (Conv2D)         (None, 28, 28, 64)   16448       test_input[0][0]                 
__________________________________________________________________________________________________
bn2a_branch2a (BatchNormalizati (None, 28, 28, 64)   256         res2a_branch2a[0][0]             
__________________________________________________________________________________________________
activation_16 (Activation)      (None, 28, 28, 64)   0           bn2a_branch2a[0][0]              
____________________________________________________________________________________________

### do_first_conv

In [15]:
from tensorflow.keras.layers import ZeroPadding2D, MaxPooling2D

def do_first_conv(input_tensor):
    x = ZeroPadding2D(padding=(3, 3), name='conv1_pad')(input_tensor)
    x = Conv2D(64, (7, 7), strides=(2, 2), padding='valid', kernel_initializer='he_normal', name='conv')(x)
    x = BatchNormalization(axis=-1, name='bn_conv1')(x)
    x = Activation('relu')(x)
    x = ZeroPadding2D(padding=(1, 1), name='pool1_pad')(x)
    x = MaxPooling2D((3, 3), strides=(2, 2))(x)
    return x

In [16]:
input_tensor = Input(shape=(224, 224, 3))
output = do_first_conv(input_tensor)
model = Model(inputs=input_tensor, outputs=output)
model.summary()

Model: "model_3"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_1 (InputLayer)         [(None, 224, 224, 3)]     0         
_________________________________________________________________
conv1_pad (ZeroPadding2D)    (None, 230, 230, 3)       0         
_________________________________________________________________
conv (Conv2D)                (None, 112, 112, 64)      9472      
_________________________________________________________________
bn_conv1 (BatchNormalization (None, 112, 112, 64)      256       
_________________________________________________________________
activation_25 (Activation)   (None, 112, 112, 64)      0         
_________________________________________________________________
pool1_pad (ZeroPadding2D)    (None, 114, 114, 64)      0         
_________________________________________________________________
max_pooling2d (MaxPooling2D) (None, 56, 56, 64)        0   

### ResNet50 Model Create

In [17]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense , Conv2D , Dropout , Flatten , Activation, MaxPooling2D , GlobalAveragePooling2D
from tensorflow.keras.optimizers import Adam , RMSprop 
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.callbacks import ReduceLROnPlateau , EarlyStopping , ModelCheckpoint , LearningRateScheduler

def create_resnet(in_shape=(224, 224, 3), n_classes=10):
    input_tensor = Input(shape=in_shape)
    
    #첫번째 7x7 Conv와 Max Polling 적용.  
    x = do_first_conv(input_tensor)
    
    # stage 2의 conv_block과 identity block 생성. stage2의 첫번째 conv_block은 strides를 1로 하여 크기를 줄이지 않음. 
    x = conv_block(x, 3, [64, 64, 256], stage=2, block='a', strides=(1, 1))
    x = identity_block(x, 3, [64, 64, 256], stage=2, block='b')
    x = identity_block(x, 3, [64, 64, 256], stage=2, block='c')
    
    # stage 3의 conv_block과 identity block 생성. stage3의 첫번째 conv_block은 strides를 2(default)로 하여 크기를 줄임 
    x = conv_block(x, 3, [128, 128, 512], stage=3, block='a')
    x = identity_block(x, 3, [128, 128, 512], stage=3, block='b')
    x = identity_block(x, 3, [128, 128, 512], stage=3, block='c')
    x = identity_block(x, 3, [128, 128, 512], stage=3, block='d')

    # stage 4의 conv_block과 identity block 생성. stage4의 첫번째 conv_block은 strides를 2(default)로 하여 크기를 줄임
    x = conv_block(x, 3, [256, 256, 1024], stage=4, block='a')
    x = identity_block(x, 3, [256, 256, 1024], stage=4, block='b')
    x = identity_block(x, 3, [256, 256, 1024], stage=4, block='c')
    x = identity_block(x, 3, [256, 256, 1024], stage=4, block='d')
    x = identity_block(x, 3, [256, 256, 1024], stage=4, block='e')
    x = identity_block(x, 3, [256, 256, 1024], stage=4, block='f')

    # stage 5의 conv_block과 identity block 생성. stage5의 첫번째 conv_block은 strides를 2(default)로 하여 크기를 줄임
    x = conv_block(x, 3, [512, 512, 2048], stage=5, block='a')
    x = identity_block(x, 3, [512, 512, 2048], stage=5, block='b')
    x = identity_block(x, 3, [512, 512, 2048], stage=5, block='c')
    
    # classification dense layer와 연결 전 GlobalAveragePooling 수행 
    x = GlobalAveragePooling2D(name='avg_pool')(x)
    x = Dropout(rate=0.5)(x)
    x = Dense(200, activation='relu', name='fc_01')(x)
    x = Dropout(rate=0.5)(x)
    output = Dense(n_classes, activation='softmax', name='fc_final')(x)
    
    model = Model(inputs=input_tensor, outputs=output, name='resnet50')
    model.summary()
    
    return model

### CIFAR10 Sequence Dataset

In [18]:
IMAGE_SIZE = 128
BATCH_SIZE = 64

In [19]:
from tensorflow.keras.utils import Sequence
import cv2
import sklearn

class CIFAR_Dataset(Sequence):
    
    def __init__(self, images_array, labels, batch_size=BATCH_SIZE, augmentor=None, shuffle=False, pre_func=None):
        self.images_array = images_array
        self.labels = labels
        self.batch_size = batch_size
        self.augmentor = augmentor
        self.pre_func = pre_func
        self.shuffle = shuffle
        if self.shuffle:
            pass
        
    def __len__(self):
        return int(np.ceil(len(self.labels)/self.batch_size))
    
    def __getitem__(self, index):
        
        images_fetch = self.images_array[index*self.batch_size:(index+1)*self.batch_size]
        
        if self.labels is not None:
            label_batch = self.labels[index*self.batch_size:(index+1)*self.batch_size]
        
        image_batch = np.zeros((images_fetch.shape[0], IMAGE_SIZE, IMAGE_SIZE, 3), dtype='float32')
        
        for image_index in range(images_fetch.shape[0]):
            image = cv2.resize(images_fetch[image_index], (IMAGE_SIZE, IMAGE_SIZE))
            if self.augmentor is not None:
                image = self.augmentor(image=image)['image']
            if self.pre_func is not None:
                image = self.pre_func(image)
            
            image_batch[image_index] = image
        
        return image_batch, label_batch
    
    def on_epoch_end(self):
        if (self.shuffle):
            self.images_array, self.labels = sklearn.utils.shuffle(self.images_array, self.labels)
        else:
            pass

In [21]:
from tensorflow.keras.applications.resnet50 import preprocess_input as resnet_preprocess

tr_ds = CIFAR_Dataset(tr_images, tr_oh_labels, batch_size=BATCH_SIZE, augmentor=None, shuffle=True, pre_func=resnet_preprocess)
val_ds = CIFAR_Dataset(val_images, val_oh_labels, batch_size=BATCH_SIZE, augmentor=None, shuffle=False, pre_func=resnet_preprocess)

In [22]:
print('train batch shape: ', next(iter(tr_ds))[0].shape, next(iter(tr_ds))[1].shape)
print('validation batch shape: ', next(iter(val_ds))[0].shape, next(iter(val_ds))[1].shape)

train batch shape:  (64, 128, 128, 3) (64, 10)
validation batch shape:  (64, 128, 128, 3) (64, 10)


### ResNet50 Model Create and Fit and Evaluation

In [23]:
from tensorflow.keras.callbacks import ReduceLROnPlateau , EarlyStopping , ModelCheckpoint , LearningRateScheduler


resnet_model = create_resnet(in_shape=(128, 128, 3), n_classes=10)

resnet_model.compile(
    optimizer=Adam(lr=0.001),
    loss='categorical_crossentropy',
    metrics=['accuracy']
)

rlr_cb = ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.2,
    patience=3,
    mode='min',
    verbose=1
)

ely_cb = EarlyStopping(
    monitor='val_loss',
    patience=10,
    mode='min',
    verbose=1
)

history = resnet_model.fit(
    tr_ds,
    epochs=30,
    validation_data=val_ds,
    callbacks=[rlr_cb, ely_cb]
)

Model: "resnet50"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_2 (InputLayer)            [(None, 128, 128, 3) 0                                            
__________________________________________________________________________________________________
conv1_pad (ZeroPadding2D)       (None, 134, 134, 3)  0           input_2[0][0]                    
__________________________________________________________________________________________________
conv (Conv2D)                   (None, 64, 64, 64)   9472        conv1_pad[0][0]                  
__________________________________________________________________________________________________
bn_conv1 (BatchNormalization)   (None, 64, 64, 64)   256         conv[0][0]                       
___________________________________________________________________________________________

In [None]:
test_ds = CIFAR_Dataset(
    test_images,
    test_oh_labels,
    batch_size=BATCH_SIZE,
    augmentor=None,
    shuffle=False,
    pre_func=resnet_preprocess
)

resnet_model.evaluate(test_ds)