<a href="https://colab.research.google.com/github/SeongilHeo/hufs_ai_camp/blob/master/Day6_2_resnet_fashionmnist_classification.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Fashion MNIST Classification 문제 (ResNet)

### Tensorboard Extension 로드

In [None]:
%load_ext tensorboard

### 텐서플로와 다른 라이브러리 임포트

In [None]:
try:
  # %tensorflow_version only exists in Colab.
  %tensorflow_version 2.x
except Exception:
  pass

### 텐서플로와 다른 라이브러리 임포트

In [None]:
# tensorflow와 tf.keras를 임포트합니다
import tensorflow as tf
from tensorflow import keras

# 헬퍼(helper) 라이브러리를 임포트합니다
import numpy as np
import matplotlib.pyplot as plt

print(tf.__version__)

### GPU 메모리 설정 (프로그램에서 필요한만큼만 할당하는 방식)

In [None]:
# 런타임에서 할당하는데 필요한 양만큼의 GPU 메모리를 할당
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
  try:
    tf.config.experimental.set_memory_growth(gpus[0], True)
  except RuntimeError as e:
    # 프로그램 시작시에 메모리 증가가 설정되어야만 합니다
    print(e)

### 패션 MNIST 데이터셋 임포트하기

load_data() 함수를 호출해서 Fashion MNIST 로딩 : 네 개의 넘파이(NumPy) 배열이 반환

In [None]:
fashion_mnist = keras.datasets.fashion_mnist

(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()

In [None]:
train_images = train_images.reshape(train_images.shape[0], 28, 28, 1).astype('float32')
test_images = test_images.reshape(test_images.shape[0], 28, 28, 1).astype('float32')

In [None]:
class_names = ['T-shirt/top', 'Trouser', 'Pullover', 'Dress', 'Coat',
               'Sandal', 'Shirt', 'Sneaker', 'Bag', 'Ankle boot']

### 데이터 전처리

In [None]:
plt.figure()
plt.imshow(np.reshape(train_images[0],(28,28)))
plt.colorbar()
plt.grid(False)
plt.show()

픽셀 값을 [0,255]에서 [0,1]로 조정

In [None]:
train_images = train_images / 255.0
test_images = test_images / 255.0

Traning Set 25개 이미지, 클래스 이름 출력

In [None]:
plt.figure(figsize=(10,10))
for i in range(25):
    plt.subplot(5,5,i+1)
    plt.xticks([])
    plt.yticks([])
    plt.grid(False)
    plt.imshow(np.reshape(train_images[i],(28,28)), cmap=plt.cm.binary)
    plt.xlabel(class_names[train_labels[i]])
plt.show()

### ResNet 모델 구성

In [None]:
class ResNet_MNIST():
    def build(self, input, num_classes=10):

        x = input
        # conv1 3x3 (relu, padding=‘same’), (28, 28, 16) batch normalization
        x = keras.layers.Conv2D(16, (3, 3), padding='same', activation='relu')(x)
        x = keras.layers.BatchNormalization()(x)
        assert x.shape[1:] == [28, 28, 16]

        # conv2 (28, 28, 16) x 3 conv
        x = self.make_layer(x, channels=16, num_blocks=3)
        assert x.shape[1:] == [28, 28, 16]
        
        # conv3 (14, 14, 32) x 3 conv
        x = self.make_layer(x, channels=32, num_blocks=3, subsampling=True)
        assert x.shape[1:] == [14, 14, 32]
        
        # conv4 (7, 7, 64) x 3 conv
        x = self.make_layer(x, channels=64, num_blocks=3, subsampling=True)
        assert x.shape[1:] == [7, 7, 64]
        
        # dense average pool, fc, softmax
        x = keras.layers.GlobalAveragePooling2D()(x)
        x = keras.layers.Dense(num_classes, activation='softmax')(x)
        assert x.shape[1:] == [10]

        return x

    # subsampling is performed by conv3_1, conv4_1, and conv5_1 with a stride of 2.
    def make_layer(self, x, channels, num_blocks, subsampling=False):
        for block_no in range(num_blocks):
            # residual block 호출
            x = self.residual_block(x, channels, subsampling)
            
            # 첫번째 residual block에서만 down sampling 수행
            if subsampling : subsampling = False
                
        return x
            
    def residual_block(self, input, channels, subsampling=False, kernel_size=3):
        x = input
        
        # down sampling 수행 시에는 stride를 2로 설정
        first_conv_stride = (2, 2) if subsampling else (1, 1)
        
        # first 3x3 conv
        x = tf.keras.layers.Conv2D(channels, kernel_size, strides=first_conv_stride, padding='SAME')(x)
        x = tf.keras.layers.BatchNormalization()(x)
        x = tf.keras.layers.ReLU()(x)

        # second 3x3 conv
        x = tf.keras.layers.Conv2D(channels, kernel_size, strides=(1, 1), padding='SAME')(x)
        x = tf.keras.layers.BatchNormalization()(x)
        x = x + self.residual_connection(subsampling, input,channels)
        
        return tf.keras.layers.ReLU()(x)
    
    def residual_connection(self, subsampling, input, channels):
        skip_x = input
        if subsampling : # 첫번째 residual block인 경우 크기를 줄여줌
            stride = (2, 2)
            skip_x = tf.keras.layers.Conv2D(channels, (1, 1), strides=stride, padding='SAME')(skip_x)
            skip_x = tf.keras.layers.BatchNormalization()(skip_x)
            
        return skip_x

In [None]:
input = tf.keras.Input(shape=(28, 28, 1))
output = ResNet_MNIST().build(input)

model = tf.keras.Model(inputs=input, outputs=output)
model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])


### 모델 훈련

In [None]:
import datetime
import os

EPOCH = 30
batch_size = 32

# Tensorboard
log_dir= os.path.join(os.getcwd(), 'logs', datetime.datetime.now().strftime("%Y%m%d-%H%M%S"))
tensorboard_callback = keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1)

# patience 매개변수는 성능 향상을 체크할 에포크 횟수입니다
early_stop = keras.callbacks.EarlyStopping(monitor='val_loss', patience=10)

model.fit(train_images, train_labels,
          batch_size=batch_size,
          epochs=EPOCH,
          validation_split=0.1,
          callbacks=[early_stop,tensorboard_callback])

### Tensorboard Viewer로 확인

In [None]:
%tensorboard --logdir logs

### 테스트 성능 측정 (정확도)

In [None]:
test_loss, test_acc = model.evaluate(test_images,  test_labels, verbose=2)

print('\n테스트 정확도:', test_acc)

### 예측 만들기

In [None]:
predictions = model.predict(test_images)

In [None]:
def plot_image(i, predictions_array, true_label, img):
  predictions_array, true_label, img = predictions_array[i], true_label[i], img[i]
  plt.grid(False)
  plt.xticks([])
  plt.yticks([])

  plt.imshow(np.reshape(img,(28,28)), cmap=plt.cm.binary)

  predicted_label = np.argmax(predictions_array)
  if predicted_label == true_label:
    color = 'blue'
  else:
    color = 'red'

  plt.xlabel("{} {:2.0f}% ({})".format(class_names[predicted_label],
                                100*np.max(predictions_array),
                                class_names[true_label]),
                                color=color)

def plot_value_array(i, predictions_array, true_label):
  predictions_array, true_label = predictions_array[i], true_label[i]
  plt.grid(False)
  plt.xticks([])
  plt.yticks([])
  thisplot = plt.bar(range(10), predictions_array, color="#777777")
  plt.ylim([0, 1])
  predicted_label = np.argmax(predictions_array)

  thisplot[predicted_label].set_color('red')
  thisplot[true_label].set_color('blue')

In [None]:
# 처음 X 개의 테스트 이미지와 예측 레이블, 진짜 레이블을 출력합니다
# 올바른 예측은 파랑색으로 잘못된 예측은 빨강색으로 나타냅니다
num_rows = 5
num_cols = 3
num_images = num_rows*num_cols
plt.figure(figsize=(2*2*num_cols, 2*num_rows))
for i in range(num_images):
  plt.subplot(num_rows, 2*num_cols, 2*i+1)
  plot_image(i, predictions, test_labels, test_images)
  plt.subplot(num_rows, 2*num_cols, 2*i+2)
  plot_value_array(i, predictions, test_labels)
plt.show()