<a href="https://colab.research.google.com/github/Pulsar-kkaturi/DL-Education/blob/master/VisionDL_Lecture/Lecture6_Classification_TF.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Classification

* Library Import

In [None]:
import os, matplotlib, csv, shutil, json, random
import numpy as np
from matplotlib import pyplot as plt
import matplotlib.cm as cm
import pandas as pd
from IPython.display import Image
import skimage
from skimage import io as skio
from skimage import transform as skit

### Tensorflow 2.0 ###
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras import Input
from tensorflow.keras import layers
from tensorflow.keras import models
from tensorflow.keras.preprocessing import image
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras import losses
from tensorflow.keras import optimizers
from tensorflow.keras import metrics
from tensorflow.keras import regularizers
from tensorflow.keras import utils
from tensorflow.keras.callbacks import ModelCheckpoint, LearningRateScheduler
from keras import backend as K

# scikit-learn
import sklearn.metrics
from sklearn.metrics import roc_curve, auc
from sklearn.metrics import confusion_matrix
import seaborn as sns

In [None]:
# GPU 확인
import tensorflow as tf
tf.config.list_physical_devices()

## 1. ResNet

### 1.1. Data Loading
* Kaggle Garbage Classification Dataset
  - Reference link: https://www.kaggle.com/datasets/asdasdasasdas/garbage-classification

In [None]:
# Dataset Download
!git clone https://github.com/Pulsar-kkaturi/DL-Education.git

In [None]:
# 데이터 압축 풀기
!tar -zxf ./DL-Education/dataset/garbage_cls_2d.tar.gz

In [None]:
data_path = './garbage_cls_2d'
label_list = list(sorted(os.listdir(data_path)))
print(label_list)

In [None]:
# data number check
for c in label_list:
  cn = 0
  cls_path = os.path.join(data_path, c)
  for f in os.listdir(cls_path):
    cn += 1
  print(f'{c} number = {cn}')

In [None]:
# Data visulization
plt.figure(figsize=(12,6))
for i, c in enumerate(label_list):
  cls_path = os.path.join(data_path, c)
  sam_path = os.path.join(cls_path, os.listdir(cls_path)[0])
  sam_arr = skio.imread(sam_path)
  plt.subplot(2,3,i+1)
  plt.title(c)
  plt.imshow(sam_arr)

### 1.2. Data Pre-processing

#### 1.2.1. 데이터 변환

In [None]:
# one-hot encoding
for c in label_list:
  onehot = [0]*len(label_list)
  onehot[label_list.index(c)] = 1
  print(c, label_list.index(c), onehot)

In [None]:
# Numpy array로 데이터 변환
data_dic, label_dic = {}, {}
for key in label_list:
  data_dic[key] = []
  label_dic[key] = []

for i, c in enumerate(label_list):
  cls_path = os.path.join(data_path, c)
  for f in sorted(os.listdir(cls_path)):
    file_path = os.path.join(cls_path, f)
    data_arr = skio.imread(file_path)
    onehot = [0]*len(label_list)
    onehot[label_list.index(c)] = 1
    data_dic[c].append(data_arr)
    label_dic[c].append(onehot)

In [None]:
plt.figure(figsize=(12,6))
for i, k in enumerate(data_dic.keys()):
  print(f'{k}: data_number = {len(data_dic[k])}, label_number = {len(label_dic[k])}')
  plt.subplot(2,3,i+1)
  plt.title(f'{k}: {label_dic[k][0]}')
  plt.imshow(data_dic[k][0])

#### 1.2.2. 데이터셋 세팅

In [None]:
# Train-Validation-Test settig & resizing (256, 256)
img_size = (128, 128)
val_num = 50
test_num = 5

train_x_list, train_y_list = [], []
val_x_list, val_y_list = [], []
test_x_list, test_y_list = [], []

for k in label_list:
  vn, tn = 0, 0
  for i, arr in enumerate(data_dic[k]):
    rs_arr = skit.resize(arr, img_size, anti_aliasing=True)
    if tn < test_num:
      test_x_list.append(rs_arr)
      test_y_list.append(label_dic[k][i])
      tn += 1
    elif vn < val_num:
      val_x_list.append(rs_arr)
      val_y_list.append(label_dic[k][i])
      vn += 1
    else:
      train_x_list.append(rs_arr)
      train_y_list.append(label_dic[k][i])

print(len(train_x_list), len(val_x_list), len(test_x_list))
print(len(train_y_list), len(val_y_list), len(test_y_list))

In [None]:
random.Random(1814).shuffle(train_x_list)
random.Random(1814).shuffle(train_y_list)
random.Random(1814).shuffle(val_x_list)
random.Random(1814).shuffle(val_y_list)
random.Random(1814).shuffle(test_x_list)
random.Random(1814).shuffle(test_y_list)

train_x = np.array(train_x_list, dtype=np.float32)
train_y = np.array(train_y_list, dtype=np.uint8)
val_x = np.array(val_x_list, dtype=np.float32)
val_y = np.array(val_y_list, dtype=np.uint8)
test_x = np.array(test_x_list, dtype=np.float32)
test_y = np.array(test_y_list, dtype=np.uint8)

print(train_x.shape, train_y.shape)
print(val_x.shape, val_y.shape)
print(test_x.shape, test_y.shape)

In [None]:
plt.figure(figsize=(10,10))
for i in range(9):
  plt.subplot(3,3,i+1)
  plt.title(f'{label_list[train_y[i].argmax()]}: {train_y[i]}')
  plt.imshow(train_x[i])

### 1.3. Model Build

#### 1.3.1. Keras application

In [None]:
from tensorflow.keras.applications import ResNet50
model = ResNet50(weights='imagenet', include_top=False, input_shape=(32, 32, 3))
model.summary()

#### 1.3.2. ResNet Block

In [None]:
class ResNetBlock2D:
    def conv1_block(self, lr_conv):
        # layer blcok
        lr_conv = layers.ZeroPadding2D(3)(lr_conv)
        lr_conv = layers.Conv2D(64, 7, 2, activation=None,
                                    padding='valid', kernel_initializer='he_normal')(lr_conv)
        lr_conv = layers.BatchNormalization(axis=-1)(lr_conv)
        lr_conv = layers.Activation('relu')(lr_conv)
        lr_conv = layers.ZeroPadding2D(1)(lr_conv)
        lr_conv = layers.MaxPool2D(3, 2, padding='valid')(lr_conv)
        return lr_conv
    def res_conv_block(self, lr_io, ker_size, block_num, reg_weight, act_func, mode=None):
        for i in range(block_num):
            if mode == 'hold':
                fstr = 1
            else:
                fstr = 2
            # layer block
            if i == 0:
                lr_conv1 = layers.Conv2D(ker_size, 1, fstr, padding='same', kernel_initializer='he_normal',
                                         kernel_regularizer=regularizers.l2(reg_weight))(lr_io)
            else:
                lr_conv1 = layers.Conv2D(ker_size, 1, 1, padding='same', kernel_initializer='he_normal',
                                         kernel_regularizer=regularizers.l2(reg_weight))(lr_io)
            lr_conv1 = layers.BatchNormalization(axis=-1)(lr_conv1)
            lr_conv1 = layers.Activation(act_func)(lr_conv1)
            lr_conv2 = layers.Conv2D(ker_size, 3, 1, padding='same', kernel_initializer='he_normal',
                                         kernel_regularizer=regularizers.l2(reg_weight))(lr_conv1)
            lr_conv2 = layers.BatchNormalization(axis=-1)(lr_conv2)
            lr_conv2 = layers.Activation(act_func)(lr_conv2)
            lr_conv3 = layers.Conv2D(4*ker_size, 1, 1, padding='same', kernel_initializer='he_normal',
                                         kernel_regularizer=regularizers.l2(reg_weight))(lr_conv2)
            lr_conv3 = layers.BatchNormalization(axis=-1)(lr_conv3)
            if i == 0:
                lr_conv0 = layers.Conv2D(4*ker_size, 1, fstr, padding='same', kernel_initializer='he_normal',
                                         kernel_regularizer=regularizers.l2(reg_weight))(lr_io)
                lr_conv0 = layers.BatchNormalization(axis=-1)(lr_conv0)
                lr_add = layers.Add()([lr_conv0, lr_conv3])
            else:
                lr_add = layers.Add()([lr_io, lr_conv3])
            lr_io = layers.Activation(act_func)(lr_add)
        return lr_io
    def gap_block(self, lr_dense, act_func, drop_rate):
        lr_dense = layers.GlobalAveragePooling2D()(lr_dense)
        lr_dense = layers.Dropout(drop_rate)(lr_dense)
        lr_dense = layers.Dense(1000, activation=act_func)(lr_dense)
        lr_dense = layers.Dropout(drop_rate)(lr_dense)
        return lr_dense


rb = ResNetBlock2D()

#### 1.3.3. ResNet50

In [None]:
def ResNet50_2D(input_size, channel_num, par_dic):
    # parameters
    reg_weight = par_dic['reg_weight']
    act_func = par_dic['act_func']
    drop_rate = par_dic['drop_rate']
    output_count = par_dic['output_count']
    output_act = par_dic['output_act']
    # code block
    inputs = layers.Input(shape=(input_size, input_size, channel_num))
    conv1 = rb.conv1_block(inputs) # Block1
    conv2 = rb.res_conv_block(conv1, 64, 3, reg_weight, act_func, mode='hold') # Block2
    conv3 = rb.res_conv_block(conv2, 128, 4, reg_weight, act_func) # Block3
    conv4 = rb.res_conv_block(conv3, 256, 6, reg_weight, act_func) # Block4
    conv5 = rb.res_conv_block(conv4, 512, 3, reg_weight, act_func) # Block5
    dens = rb.gap_block(conv5, act_func, drop_rate)
    outputs = layers.Dense(output_count, activation=output_act)(dens)
    model = Model(inputs, outputs)
    return model

In [None]:
def ResNet101_2D(input_size, channel_num, par_dic):
    # parameters
    reg_weight = par_dic['reg_weight']
    act_func = par_dic['act_func']
    drop_rate = par_dic['drop_rate']
    output_count = par_dic['output_count']
    output_act = par_dic['output_act']
    # code block
    inputs = layers.Input(shape=(input_size, input_size, channel_num))
    conv1 = rb.conv1_block(inputs) # Blcok1
    conv2 = rb.res_conv_block(conv1, 64, 3, reg_weight, act_func, mode='hold') # Block2
    conv3 = rb.res_conv_block(conv2, 128, 4, reg_weight, act_func) # Block3
    conv4 = rb.res_conv_block(conv3, 256, 23, reg_weight, act_func) # Block4
    conv5 = rb.res_conv_block(conv4, 512, 3, reg_weight, act_func) # Block5
    dens = rb.gap_block(conv5, act_func, drop_rate)
    outputs = layers.Dense(output_count, activation=output_act)(dens)
    model = Model(inputs, outputs)
    return model

In [None]:
def ResNet152_2D(input_size, channel_num, par_dic):
    # parameters
    reg_weight = par_dic['reg_weight']
    act_func = par_dic['act_func']
    drop_rate = par_dic['drop_rate']
    output_count = par_dic['output_count']
    output_act = par_dic['output_act']
    # code block
    inputs = layers.Input(shape=(input_size, input_size, channel_num))
    conv1 = rb.conv1_block(inputs) # Blcok1
    conv2 = rb.res_conv_block(conv1, 64, 3, reg_weight, act_func, mode='hold') # Block2
    conv3 = rb.res_conv_block(conv2, 128, 8, reg_weight, act_func) # Block3
    conv4 = rb.res_conv_block(conv3, 256, 36, reg_weight, act_func) # Block4
    conv5 = rb.res_conv_block(conv4, 512, 3, reg_weight, act_func) # Block5
    dens = rb.gap_block(conv5, act_func, drop_rate)
    outputs = layers.Dense(output_count, activation=output_act)(dens)
    model = Model(inputs, outputs)
    return model

In [None]:
resnet_param = {'reg_weight': None,
               'act_func': 'relu',
               'drop_rate': 0.5,
               'output_count': 6,
               'output_act': 'softmax'}

# ResNet 50, 101, 152 중에서 원하는 모델 사용
model = ResNet50_2D(128, 3, resnet_param)
model.summary()

### 1.4. Train Model

In [None]:
model.compile(loss=losses.CategoricalCrossentropy(), optimizer=optimizers.Adam(learning_rate=1e-4), metrics=['accuracy'])

In [None]:
callback_list = [keras.callbacks.EarlyStopping(monitor='val_loss', patience=10),
                         keras.callbacks.ModelCheckpoint(filepath='resnet_model.h5',
                                                         monitor='val_loss', save_best_only=True),
                         keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=5)]

history = model.fit(train_x, train_y, epochs=50, batch_size=16,
                    validation_data=(val_x, val_y),
                    callbacks=callback_list, shuffle=True)

### 1.5. Train Result

#### 1.5.1. Loss & Accuracy

In [None]:
acc = history.history['accuracy']
val_acc = history.history['val_accuracy']
loss = history.history['loss']
val_loss = history.history['val_loss']
epochs = range(1,len(acc)+1)

In [None]:
plt.plot(epochs, acc, 'b', label='Training acc')
plt.plot(epochs, val_acc, 'r', label='Validation acc')
plt.title('Training and validation accuracy')
plt.legend()

plt.figure()

plt.plot(epochs, loss, 'b', label='Training loss')
plt.plot(epochs, val_loss, 'r', label='Validation loss')
plt.title('Training and validation loss')
plt.legend()

plt.show()

#### 1.5.2. Prediction

In [None]:
result = model.evaluate(test_x, test_y)
print('test loss = ', result[0])
print('test Accuracy = ', result[1])

In [None]:
scores = model.predict(test_x)
print(len(scores))
print(label_list[test_y[0].argmax()], test_y[0])
print(label_list[scores[0].argmax()], scores[0])

In [None]:
plt.figure(figsize=(10,10))
for i in range(9):
  gt = label_list[test_y[i].argmax()]
  pd = label_list[scores[i].argmax()]
  score = 'True' if gt == pd else 'False'
  plt.subplot(3,3,i+1)
  plt.title(f'{score}({pd}/{gt})')
  plt.imshow(test_x[i])

#### 1.5.3. Confusion Matrix

In [None]:
gt_list = [label_list[test_y[i].argmax()] for i in range(test_y.shape[0])]
pd_list = [label_list[scores[i].argmax()] for i in range(len(scores))]

print(gt_list)
print(pd_list)

conf = confusion_matrix(gt_list, pd_list, labels=label_list)
print(conf)

In [None]:
plt.figure(figsize=(10,10))
ax = sns.heatmap(conf, annot = True, cmap="coolwarm", vmax = 5,
                 annot_kws={"fontsize":20}, center=3, cbar=True,
                 xticklabels=label_list, yticklabels=label_list)

# labels, title and ticks
ax.set_xlabel('Predicted labels',fontsize=20)
ax.set_ylabel('True labels',fontsize=20)
ax.set_title('Confusion Matrix', fontsize=30)
ax.xaxis.set_ticklabels(label_list, fontsize=10)
ax.yaxis.set_ticklabels(label_list, fontsize=10)

#### 1.5.4. Case Analysis

In [None]:
# False Case
plt.figure(figsize=(10,10))
pn = 0
for i in range(len(scores)):
  gt = label_list[test_y[i].argmax()]
  pd = label_list[scores[i].argmax()]
  score = 'True' if gt == pd else 'False'
  if score == 'False' and pn < 9:
    pn += 1
    plt.subplot(3,3,pn)
    plt.title(f'{score}({pd}/{gt})')
    plt.imshow(test_x[i])

In [None]:
# True Case
plt.figure(figsize=(10,10))
pn = 0
for i in range(len(scores)):
  gt = label_list[test_y[i].argmax()]
  pd = label_list[scores[i].argmax()]
  score = 'True' if gt == pd else 'False'
  if score == 'True' and pn < 9:
    pn += 1
    plt.subplot(3,3,pn)
    plt.title(f'{score}({pd}/{gt})')
    plt.imshow(test_x[i])

수고하셨습니다!