## 本筆記將建立ResNet與DenseNet模型。我們練習的資料集為CIFAR10。


#### 核心概念: 

* 以```subclassing API```來構築模型。
* 學會建構和訓練ResNet, DenseNet。

---

CIFAR10資料集：https://www.cs.toronto.edu/~kriz/cifar.html

# <a name=00>索引</a>

* [載入圖片至電腦記憶體](#01)
* [將圖片做resize以及normalization](#02)
* [建造```conv block``` 並疊加成為CNN模型](#03)
* [建造```residual block``` 並疊加成為ResNet](#04)
* [微調```residual block```成為```dense block```，並建立出DenseNet](#05)

---

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

import pandas as pd

from sklearn.metrics import classification_report
import json
import pickle

import os

sns.set()
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
import tensorflow as tf

## <a id='01'>載入圖片至電腦記憶體 </a>

首先看一下包含資料集的資料夾有什麼內容：

In [None]:
! ls -hl ../datasets/cifar-10-batches-py/

data_batch_1, data_batch_2,..data_batch_5以及test_batch是以binary的方式儲存在硬碟裡。以下我們寫幾個函數，用以載入這些binary格式的圖檔至電腦內的記憶體中，並且將圖的以矩陣的方式儲存。這些圖矩陣的shape為(Number of figures,Width,Height,Channel)。

In [None]:
def load_batch(fpath):
    """This function extract a batch of CIFAR10 data
       from the chosen binary file.
       This function is a simplified version of
       https://github.com/keras-team/keras/blob/master/keras/datasets/cifar.py
    """
    with open(fpath, 'rb') as f:
        d = pickle.load(f, encoding='bytes')
        # Keys are in the "byte" format. Let's decode them into utf8 strings.
        d_decoded = {}
        for k, v in d.items():
            d_decoded[k.decode('utf8')] = v
        d = d_decoded
    data = d['data']
    labels = d['labels']
    data = data.reshape(data.shape[0], 3, 32, 32)
    data = data.transpose(0,2,3,1)
    return data,labels

def load_data(path):
    '''
    載入以binary方式儲存的影像至電腦內記憶體。
    '''
    num_train_samples = 50000

    x_train = np.zeros((num_train_samples, 32,32,3), dtype='uint8')
    y_train = np.zeros((num_train_samples,), dtype='uint8')

    for i in range(1, 6):
        fpath = os.path.join(path, 'data_batch_') + str(i)
        data, labels = load_batch(fpath)
        x_train[(i - 1) * 10000:i * 10000, :, :, :] = data
        y_train[(i - 1) * 10000:i * 10000] = labels

    fpath = os.path.join(path, 'test_batch')
    x_test, y_test = load_batch(fpath)

    return (x_train, y_train), (np.array(x_test), np.array(y_test,dtype="uint8"))

In [None]:
(x_train, y_train), (x_test, y_test) = load_data('../datasets/cifar-10-batches-py')

In [None]:
print(x_train.shape,y_train.shape)
print(x_test.shape,y_test.shape)

以上，我們得到了x_train, x_test, y_train,y_test四個放置圖片的矩陣，其shape均為(Number of figures,Width,Height,Channel)。

接著，我們抽出幾張圖來看，稍微了解一下這些資料大概的樣貌：

In [None]:
with open("../datasets/cifar-10-batches-py/labels.txt") as reader:
    fig_labels = reader.read()
fig_labels = fig_labels.split("\n")[:-1]

In [None]:
idx_to_label = {}
for idx, fig_labels in enumerate(fig_labels):
    idx_to_label[idx] = fig_labels

In [None]:
#隨機抽取12張圖來看一下
num_figures_display = 12
fig_indexes = np.random.choice(x_train.shape[0], num_figures_display)

fig,axes = plt.subplots(2,6)
for fig_idx,axis in zip(fig_indexes,axes.reshape(-1) ):
    axis.axis('off')
    axis.imshow(x_train[fig_idx])
    axis.set_title(idx_to_label[ y_train[fig_idx] ])

[回索引](#00)

## <a id='02'> 將圖片做resize以及normalization </a>

In [None]:
import cv2
from tensorflow.keras.utils import to_categorical

# 做normalization。
# 一個簡單的方式，是將x直接除以255，使得x內的所有值均分佈於[0,1]之間。
x_train = x_train / 255.
x_test = x_test / 255.

# 調整x_train每張圖的大小從(32,32)乘以三倍，變成(96,96)。
x_train_resized = np.zeros((50000, 96, 96, 3), dtype=np.float32)
for idx,img in enumerate(x_train):
    if idx%10000==0:
        print(idx)
    x_train_resized[idx,:] = cv2.resize(img, None, fx=3, fy=3, interpolation = cv2.INTER_AREA)

# 調整x_test每張圖的大小從(32,32)乘以三倍，變成(96,96)。
x_test_resized = np.zeros((10000, 96, 96, 3), dtype=np.float32)
for idx,img in enumerate(x_test):
    if idx%1000==0:
        print(idx)
    x_test_resized[idx,:] = cv2.resize(img, None, fx=3, fy=3, interpolation = cv2.INTER_AREA)

# 將y轉換成為one hot的形式
y_train_one_hot = to_categorical(y_train, num_classes=10)
y_test_one_hot = to_categorical(y_test, num_classes=10)

[回索引](#00)

---

## <a id='03'> 建造```conv block``` 並疊加成為CNN模型 </a>

In [None]:
class ConvBlock(tf.keras.layers.Layer):
    """Class for the `BN-ReLU-Conv` block.
    """
    def __init__(self, filters, kernel_size=3, strides=1, use_bias=False, axis=-1,
                 epsilon=1e-3, l2_strength=1.E-5, **kwargs):
        """Initialize all the necessary ingredients of a `Conv` block.

        Args:
            filters: Integer, number of filters of the Conv layer.
            kernel_size: Integer, kernel size of the Conv layer.
            strides: Integer, number of strides of the Conv layer.
            use_bias: Boolean, if `True`, the constructed Conv layer will have a trainable bias.
            axis: Int, if `axis=-1 or 3`, the input tensor has the format of (N,H,W,C);
             if `axis=1 or -3`, the input tensor has the format of (N,C,H,W).
        """
        super(ConvBlock, self).__init__(**kwargs)

        if axis == -1 or axis == 3:
            data_format = "channels_last"
        elif axis == 1 or axis == -3:
            data_format = "channels_firt"
        else:
            raise ValueError("Data format invalid.")

        self.conv = tf.keras.layers.Conv2D(filters, 
                                           kernel_size=kernel_size, 
                                           strides=strides,
                                           use_bias=use_bias, 
                                           data_format=data_format,
                                           kernel_regularizer=tf.keras.regularizers.l2(l2_strength),
                                           padding="SAME")

        norm_params = {'epsilon': epsilon, "axis": axis}
        self.bn = tf.keras.layers.BatchNormalization(**norm_params)
        
    def build(self, input_shape):
        built = True

    def compute_output_shape(self, input_shape):
        return input_shape

    def call(self, x, training=None):

        return self.conv(
            tf.nn.relu(
                self.bn(x, training=training)))
    
# 測試: 正向傳遞 (forward propagation)
fake_data = np.random.normal(0, 1, (5,32,32,3)).astype(np.float32)
cv_blk = ConvBlock(filters=128)
print(cv_blk(fake_data).shape)

In [None]:
# 取得模型
model = tf.keras.models.Sequential()
model.add(ConvBlock(filters=32, input_shape=(32,32,3)))
model.add(ConvBlock(filters=32))
model.add(ConvBlock(filters=64))
model.add(ConvBlock(filters=64))
model.add(tf.keras.layers.Flatten())
model.add(tf.keras.layers.Dense(10, activation="softmax"))

# 檢視模型摘要
model.summary()

In [None]:
# 編譯模型：給定模型目標和訓練方式。
model.compile(loss="categorical_crossentropy", optimizer="Adam", metrics=["accuracy"])

# 訓練模型
history = model.fit(x=x_train, y=y_train_one_hot, validation_data=(x_test,y_test_one_hot),
                    epochs=10, batch_size=128)

#畫出訓練過程
plt.plot(history.history['acc'], ms=5, marker='o', label='accuracy')
plt.plot(history.history['val_acc'], ms=5, marker='o', label='val accuracy')
plt.legend()
plt.show()

[回索引](#00)

---

## <a id='04'> 建造```residual block``` 並疊加成為ResNet </a>

In [None]:
class Residual(tf.keras.layers.Layer):
    """Class for the building block of ResNet V2.

    Reference:
      "Identity Mappings in Deep Residual Networks"; https://arxiv.org/abs/1603.05027
    Code in principle follows:
      https://github.com/apache/incubator-mxnet/blob/master/python/mxnet/gluon/model_zoo/vision/resnet.py
    """
    def __init__(self, filters, downsample=False, strides=1, use_bias=True,
                 epsilon=1e-3, axis=-1, l2_strength=1.E-5,  **kwargs):
        """Initialize all the necessary layers and parameters for the `Residual` block.

        Args:
            filters: Number of filters of all the used Conv layers.
            downsample: If to reduce size and/or channels of the input feature map.
            strides: Strides of the first Conv layers. If >1, the input feature map will be down-sampled.
            use_bias: If `True`, all the Conv layers will have trainable biases.
        """
        super(Residual, self).__init__(**kwargs)

        
        if axis == -1 or axis == 3:
            data_format = "channels_last"
        elif axis == 1 or axis == -3:
            data_format = "channels_firt"
        else:
            raise ValueError("Data format invalid.")

        self.conv1 = tf.keras.layers.Conv2D(filters, 
                                            kernel_size=3, 
                                            strides=strides,
                                            use_bias=use_bias, 
                                            data_format=data_format,
                                            kernel_regularizer=tf.keras.regularizers.l2(l2_strength),
                                            padding="SAME")

        self.conv2 = tf.keras.layers.Conv2D(filters, 
                                            kernel_size=3, 
                                            strides=1,
                                            use_bias=use_bias, 
                                            data_format=data_format,
                                            kernel_regularizer=tf.keras.regularizers.l2(l2_strength),
                                            padding="SAME")

        self.downsample = downsample
        if self.downsample:
            self.conv_down = tf.keras.layers.Conv2D(filters, 
                                                    kernel_size=1, 
                                                    strides=strides,
                                                    use_bias=use_bias, 
                                                    data_format=data_format,
                                                    kernel_regularizer=tf.keras.regularizers.l2(l2_strength),
                                                    padding="SAME")

        norm_params = {'epsilon': epsilon, "axis": axis}
        self.norm1 = tf.keras.layers.BatchNormalization(**norm_params)
        self.norm2 = tf.keras.layers.BatchNormalization(**norm_params)

    def call(self, x, training=None):

        # 實作練習
        # ...
        # ...
        # ...

        return x + residual

#### 測試: 正向傳遞 (forward propagation)

In [None]:
# 維持張量大小
# input tensor shape = (N,H,W,C) = output tensor shape

fake_data = np.random.normal(0, 1, (5,32,32,128)).astype(np.float32)
res_blk = Residual(filters=128)
print(res_blk(fake_data).shape)

In [None]:
# 張量通道數減半
# input tensor shape = (N,H,W,C)
# output tensor shape = (N,H,W,C/2)

fake_data = np.random.normal(0, 1, (5,32,32,128)).astype(np.float32)
res_blk = Residual(filters=64, downsample=True)
print(res_blk(fake_data).shape)

In [None]:
# 張量長寬各減半
# input tensor shape = (N,H,W,C)
# output tensor shape = (N,H/2,W/2,C)

fake_data = np.random.normal(0, 1, (5,32,32,128)).astype(np.float32)
res_blk = Residual(filters=128, downsample=True, strides=2)
print(res_blk(fake_data).shape)

#### 開始建立和訓練模型

In [None]:
# 取得模型
model = tf.keras.models.Sequential()
model.add(ConvBlock(filters=32, input_shape=(32,32,3)))
model.add(Residual(filters=32))
model.add(Residual(filters=32))
model.add(Residual(filters=64, downsample=True, strides=2))
model.add(Residual(filters=64))
model.add(tf.keras.layers.Flatten())
model.add(tf.keras.layers.Dense(10, activation="softmax"))

# 檢視模型摘要
model.summary()

# 編譯模型：給定模型目標和訓練方式。
model.compile(loss="categorical_crossentropy", optimizer="Adam", metrics=["accuracy"])

# 訓練模型
history = model.fit(x=x_train, y=y_train_one_hot, validation_data=(x_test,y_test_one_hot),
                    epochs=10, batch_size=128)

#畫出訓練過程
plt.plot(history.history['acc'], ms=5, marker='o', label='accuracy')
plt.plot(history.history['val_acc'], ms=5, marker='o', label='val accuracy')
plt.legend()
plt.show()

[回索引](#00)

---

## <a id='05'> 微調```residual block```成為```dense block```，並建立出DenseNet </a>

In [None]:
# class DenseBlock(...):
#     ...
#     ...
#     ...

In [None]:
# 取得模型
model = tf.keras.models.Sequential()
model.add(ConvBlock(filters=32, input_shape=(32,32,3)))
model.add(DenseBlock(filters=32))
model.add(DenseBlock(filters=32))
model.add(DenseBlock(filters=32))
model.add(DenseBlock(filters=32))
model.add(tf.keras.layers.Flatten())
model.add(tf.keras.layers.Dense(10, activation="softmax"))

# 檢視模型摘要
model.summary()

# 編譯模型：給定模型目標和訓練方式。
model.compile(loss="categorical_crossentropy", optimizer="Adam", metrics=["accuracy"])

# 訓練模型
history = model.fit(x=x_train, y=y_train_one_hot, validation_data=(x_test,y_test_one_hot),
                    epochs=10, batch_size=128)

#畫出訓練過程
plt.plot(history.history['acc'], ms=5, marker='o', label='accuracy')
plt.plot(history.history['val_acc'], ms=5, marker='o', label='val accuracy')
plt.legend()
plt.show()

[回索引](#00)