In [1]:
# 几个用来绘制灰度和 RGB 图像的函数
def plot_image(image):
    # "nearest" 分辨率不匹配时,只显示图像而不尝试在像素之间进行插值
    plt.imshow(image, cmap="gray", interpolation="nearest")
    plt.axis("off")


def plot_color_image(image):
    plt.imshow(image, interpolation="nearest")
    plt.axis("off")

In [2]:
import numpy as np
import pandas as pd
import tensorflow as tf
import tensorflow.keras as keras
import os
import time
import matplotlib.pyplot as plt
import matplotlib as mpl
import seaborn as sns

from functools import partial


def initialization():
    keras.backend.clear_session()
    np.random.seed(42)
    tf.random.set_seed(42)

In [3]:
from matplotlib import font_manager
my_font = font_manager.FontProperties(fname='./Fonts/SourceHanSerifSC-Medium.otf', size=12)

# CNN架构 CNN Architectures

<img src="./images/other/14-6.png" width="500">

## ResNet

#### 实现残差结构

-  架构形式应用于`18-layer`和`34-layer`:基础残差模块
    <img src="./images/other/14-37.png">

In [4]:
# 定义残差模块-基本形式
class Residual_Basic(keras.layers.Layer):
    expansion = 1  # 扩展系数  -> 默认不使用虚线结构

    def __init__(self, out_channel, strides=1, downsample=None, **kwargs):
        super(Residual_Basic, self).__init__(**kwargs)
        self.conv1 = Conv2D(filters=out_channel,
                            kernel_size=3,
                            strides=strides,
                            padding="SAME",
                            use_bias=False)
        # 卷积之后，如果要接BN操作，最好是不设置偏置，因为不起作用，
        self.bn1 = BatchNormalization(momentum=0.9, epsilon=1e-5)

        self.conv2 = Conv2D(filters=out_channel,
                            kernel_size=3,
                            strides=1,
                            padding="SAME",
                            use_bias=False)
        self.bn2 = BatchNormalization(momentum=0.9, epsilon=1e-5)
        # down_sample：使用改变特征图大小核深度的跳过连接
        self.downsample = downsample
        self.relu = ReLU()
        self.add = Add()

    def call(self, inputs, training=False):
        # 跳过连接分支
        skip_Z = inputs

        # 使用改变特征图大小核深度的跳过连接分支
        if self.downsample is not None:
            skip_Z = self.downsample(inputs)

        # 主分支
        Z = self.conv1(inputs)
        Z = self.bn1(Z, training=training)
        Z = self.relu(Z)

        Z = self.conv2(Z)
        Z = self.bn2(Z, training=training)

        Z = self.add([Z, skip_Z])
        Z = self.relu(Z)

        return Z

-  架构形式应用于`50-layer`,`101-layer`和`152-layer`:使用瓶颈层的残差模块
    <img src="./images/other/14-38.png" width=700px>

In [5]:
# 定义残差模块-瓶颈层形式
class Residual_Bottleneck(keras.layers.Layer):
    expansion = 4  # 扩展系数

    def __init__(self, out_channel, strides=1, downsample=None, **kwargs):
        super(Residual_Bottleneck, self).__init__(**kwargs)
        # 1×1卷积核:降低特征维度
        self.conv1 = Conv2D(filters=out_channel,
                            kernel_size=1,
                            use_bias=False,
                            name="conv1")
        # 名字用于在迁移学习中与预训练模型的层进行匹配
        self.bn1 = BatchNormalization(momentum=0.9,
                                      epsilon=1e-5,
                                      name="conv1/BatchNorm")

        self.conv2 = Conv2D(filters=out_channel,
                            kernel_size=3,
                            strides=strides,
                            padding="SAME",
                            use_bias=False,
                            name="conv2")
        self.bn2 = BatchNormalization(momentum=0.9,
                                      epsilon=1e-5,
                                      name="conv2/BatchNorm")

        # 1×1卷积核:升高特征维度
        self.conv3 = Conv2D(filters=out_channel * self.expansion,
                            # 64->256  128->512  ...
                            kernel_size=1,
                            use_bias=False,
                            name="conv3")
        self.bn3 = BatchNormalization(momentum=0.9,
                                      epsilon=1e-5,
                                      name="conv3/BatchNorm")
        # down_sample：使用改变特征图大小核深度的跳过连接
        self.downsample = downsample
        self.relu = ReLU()
        self.add = Add()

    def call(self, inputs, training=False):
        # 跳过连接分支
        skip_Z = inputs

        # 使用改变特征图大小核深度的跳过连接分支
        if self.downsample is not None:
            skip_Z = self.downsample(inputs)

        # 主分支
        Z = self.conv1(inputs)
        Z = self.bn1(Z, training=training)
        Z = self.relu(Z)

        Z = self.conv2(Z)
        Z = self.bn2(Z, training=training)
        Z = self.relu(Z)

        Z = self.conv3(Z)
        Z = self.bn3(Z, training=training)

        Z = self.add([Z, skip_Z])
        Z = self.relu(Z)

        return Z

#### 生成一系列的残差结构

In [6]:
def make_conv_x(block, block_num, in_channel, unit1_channel, name, strides=1):
    """
    :param block: 可选择 Residual_Basic 或 Residual_Bottleneck
    :param block_num: 残差结构数量
    :param in_channel: 上一层输出特征矩阵的通道数
    :param unit1_channel: 本残差模块第一个单元的卷积层的的通道数
    """
    # 使用改变特征图大小核深度的跳过连接分支(虚线结构)
    # 1. 当strides大于1时需要：高宽/2,深度加深
    # 2. 对于18和34-layer: 第一层不需要虚线结构
    # 3. 对于50,101和152-layer: 第一层需要虚线结构：调整特征矩阵的深度，高宽不变.
    #                                           ->kernel_size=1
    skipLayer = None
    out_channel = unit1_channel * block.expansion  # conv3_channel
    if (strides != 1) or (in_channel != out_channel):
        skipLayer = Sequential([
            Conv2D(filters=out_channel, kernel_size=1, strides=strides,
                   use_bias=False, name="conv1"),
            BatchNormalization(momentum=0.9, epsilon=1.001e-5, name="BatchNorm")
        ], name="shortcut")  # 跳过层即捷径层

    layersList = []
    # 首先针对第一个单元进行处理
    layersList.append(block(out_channel=unit1_channel, strides=strides,
                            downsample=skipLayer,
                            name="unit_1"))
    # 然后针对其他单元进行处理
    for index in range(1, block_num):  # 3 -> 1, 2
        layersList.append(block(out_channel=unit1_channel, strides=1,
                                downsample=None,
                                name="unit_" + str(index + 1)))

    return Sequential(layersList, name=name)

因为`Conv1`中刚刚对网络输入进行了卷积和最大池化，还没有进行残差学习，此时直接下采样会损失大量信息；而后3个`ConvN_x`直接进行下采样时，前面的网络已经进行过残差学习了，所以可以直接进行下采样。

#### 定义ResNet网络结构

In [7]:
def resnet(block,
           block_num_list,
           height=224,
           width=224,
           num_classes=1000,
           include_top=True):
    """
    :param block: 可选择 Residual_Basic 或 Residual_Bottleneck
    :param block_num_list: 残差结构数量 输入为列表
    :param height: 输入高度像素
    :param width: 输入宽度像素
    :param num_classes:  标签的类别数量
    :param include_top: 
    :return: 
    """
    input = Input(shape=[height, width, 3], dtype="float32")
    # ---------------------
    Z = Conv2D(filters=64,
               kernel_size=7,
               strides=2,
               padding="SAME",
               use_bias=False,
               name="conv1")(input)
    Z = BatchNormalization(momentum=0.9, epsilon=1e-5,
                           name="conv1/BatchNorm")(Z)
    Z = ReLU()(Z)

    Z = MaxPool2D(pool_size=3, strides=2, padding="SAME")(Z)
    # ---------------------
    # 每调用一次make_layer()就生成对应`convN_x`的一系列残差结构
    # Z.shape对应上一层输出特征矩阵的shape对应[batch, height, weight, channel]
    # Z.shape[-1]代表 channel 深度
    Z = make_conv_x(block=block,
                    block_num=block_num_list[0],
                    in_channel=Z.shape[-1],
                    unit1_channel=64,
                    name="block1")(Z)
    Z = make_conv_x(block=block,
                    block_num=block_num_list[1],
                    in_channel=Z.shape[-1],
                    unit1_channel=128,
                    name="block2",
                    strides=2)(Z)
    Z = make_conv_x(block, block_num_list[2], Z.shape[-1], 256, "block3", 2)(Z)
    Z = make_conv_x(block, block_num_list[3], Z.shape[-1], 512, "block4", 2)(Z)
    # ---------------------
    if include_top:  # 不使用迁移学习
        Z = GlobalAvgPool2D()(Z)  # 全局平局池化:结合了pool和flatten的功能
        Z = Dense(units=num_classes, name="logits")(Z)
        predict = Softmax()(Z)
    else:  # 使用迁移学习 可以在后面自定义所需要的层
        predict = Z

    model = Model(inputs=input, outputs=predict)
    return model

#### 定义不同的ResNet架构

In [8]:
# 定义ResNet-18
def resnet18(height=224, width=224, num_classes=1000, include_top=True):
    model = resnet(block=Residual_Basic,
                   block_num_list=[2, 2, 2, 2],
                   height=height,
                   width=width,
                   num_classes=num_classes,
                   include_top=include_top)
    return model

In [9]:
# 定义ResNet-34
def resnet34(height=224, width=224, num_classes=1000, include_top=True):
    model = resnet(Residual_Basic, [3, 4, 6, 3], height, width, num_classes,
                   include_top)
    return model

In [10]:
# 定义ResNet-50
def resnet50(height=224, width=224, num_classes=1000, include_top=True):
    model = resnet(Residual_Bottleneck, [3, 4, 6, 3], height, width,
                   num_classes, include_top)
    return model

In [11]:
# 定义ResNet-101
def resnet101(height=224, width=224, num_classes=1000, include_top=True):
    model = resnet(Residual_Bottleneck, [3, 4, 23, 3], height, width,
                   num_classes, include_top)
    return model

In [12]:
# 定义ResNet-152
def resnet152(height=224, width=224, num_classes=1000, include_top=True):
    model = resnet(Residual_Bottleneck, [3, 8, 36, 3], height, width,
                   num_classes, include_top)
    return model

- 查看`ResNet-34`模型结构

In [13]:
model = resnet34(num_classes=10)
model.summary()

NameError: name 'Input' is not defined

#### 使用迁移学习训练ResNet-50

1. 加载数据集

In [None]:
import tensorflow_datasets as tfds

dataset, info = tfds.load("tf_flowers", as_supervised=True, with_info=True)

In [None]:
info

In [None]:
info.splits

In [None]:
class_names = info.features["label"].names
class_names

In [None]:
n_classes = info.features["label"].num_classes
n_classes

In [None]:
dataset_size = info.splits["train"].num_examples
dataset_size

`tf_flowers`数据集详细信息:https://tensorflow.google.cn/datasets/catalog/tf_flowers


<img src="./images/other/14-52.png" width="500">

2. 拆分数据集

    由于该数据集只有一个`train`数据集,没有验证集和测试集,因此需要拆分数据集.

In [None]:
test_set_raw, valid_set_raw, train_set_raw = tfds.load(
    "tf_flowers",
    split=["train[:10%]", "train[10%:25%]", "train[25%:]"],
    as_supervised=True)

In [None]:
plt.figure(figsize=(10, 8))
index = 0
for image, label in train_set_raw.take(9):
    index += 1
    plt.subplot(3, 3, index)
    plt.imshow(image)
    plt.title("Class: {}".format(class_names[label]))
    plt.axis("off")
plt.show()

3. 图像预处理

- 基本预处理:对训练集进行乱序，并为所有数据集添加批处理和预取

In [None]:
from keras.utils import np_utils


def preprocess(image, label):
    resized_image = tf.image.resize(image, [224, 224])
    label = tf.cast(label, dtype=tf.int32)
    label = tf.squeeze(label)  # tf.squeeze():用于从张量形状中移除大小为1的维度
    label = tf.one_hot(label, depth=10)
    return resized_image, label

- 进一步预处理:基本预处理的基础上,执行数据增强,即向训练图像添加一些随机变换

In [None]:
# 中央裁切
def central_crop(image):
    shape = tf.shape(image)  # height, width
    min_dim = tf.reduce_min([shape[0], shape[1]])

    # 从图像左上到右下
    top_crop = (shape[0] - min_dim) // 4
    bottom_crop = shape[0] - top_crop

    left_crop = (shape[1] - min_dim) // 4
    right_crop = shape[1] - left_crop

    return image[top_crop:bottom_crop, left_crop:right_crop]

In [None]:
# 随机裁切
def random_crop(image):
    shape = tf.shape(image)
    min_dim = tf.reduce_min([shape[0], shape[1]]) * 90 // 100

    # 如果一个维度不应该被裁剪，则传递该维度的完整大小
    cropped_image = tf.image.random_crop(image, size=[min_dim, min_dim, 3])
    return cropped_image

如果使用`迁移学习`,需要在图像预处理部分减去`ImageNet`所有图像的均值,即**\[123.68, 116.78, 103.94\]** 如果使用别人的预训练模型参数,就必须和別人使用相同的预处理方法!

In [None]:
_R_MEAN = 123.68
_G_MEAN = 116.78
_B_MEAN = 103.94

num_classes = 5

In [None]:
def preprocess(image, label, randomize=False):
    if randomize:
        cropped_image = random_crop(image)
        # random_flip_left_right:随机水平翻转图像（从左到右）。
        cropped_image = tf.image.random_flip_left_right(cropped_image)
    else:
        cropped_image = central_crop(image)

    resized_image = tf.image.resize(image, [224, 224])
    final_image = resized_image - [_R_MEAN, _G_MEAN, _B_MEAN]
    return final_image, label

In [None]:
batch_size = 16

train_set = train_set_raw.shuffle(1000).repeat()
train_set = train_set.map(partial(
    preprocess, randomize=True)).batch(batch_size).prefetch(1)

valid_set = valid_set_raw.map(preprocess).batch(batch_size).prefetch(1)

test_set = test_set_raw.map(preprocess).batch(batch_size).prefetch(1)

In [None]:
plt.figure(figsize=(10, 8))
for X_batch, y_batch in train_set.take(1):
    for index in range(9):
        plt.subplot(3, 3, index + 1)
        plt.imshow(X_batch[index])

        plt.title("Class: {}".format(class_names[y_batch[index]]))
        plt.axis("off")

plt.show()

4. 使用迁移学习

In [None]:
feature = resnet50(num_classes=num_classes, include_top=False)

In [None]:
# 加载预训练模型的权重
pre_weight_path = './PTmodel/tf_resnet50_weights/pretrain_weights.ckpt'
feature.load_weights(pre_weight_path)
feature.trainable = False  # 冻结预训练模型的权重参数
feature.summary()

当我们将 `trainable`设置为 `False`时， `feature`的所有权重都会被冻结，训练过程中也无法在训练这些参数.

In [None]:
# 模型最后面添加全连接层
model = Sequential([
    feature,
    GlobalAvgPool2D(),
    Dropout(rate=0.5),
    Dense(1024, activation=keras.activations.relu),
    Dropout(rate=0.5),
    Dense(num_classes),
    Softmax()
])

model.summary()

5. 训练,评估模型

In [None]:
# 编译模型
optimizer = tf.keras.optimizers.Adam(learning_rate=0.0002)
model.compile(loss="sparse_categorical_crossentropy",
              optimizer=optimizer,
              metrics=["accuracy"])
# 训练模型
history = model.fit(train_set,
                    steps_per_epoch=int(0.75 * dataset_size / batch_size),
                    validation_data=valid_set,
                    validation_steps=int(0.15 * dataset_size / batch_size),
                    epochs=25)

In [None]:
model.save("my_ResNet50.ckpt")

In [None]:
model = keras.models.load_model("./models/my_ResNet50.ckpt")
model.evaluate(test_set)

In [None]:
history_dict = history.history
train_loss = history_dict["loss"]
train_accuracy = history_dict["accuracy"]
val_loss = history_dict["val_loss"]
val_accuracy = history_dict["val_accuracy"]

In [None]:
epochs = 25
# figure 1
plt.figure()
plt.plot(range(epochs), train_loss, label='train_loss')
plt.plot(range(epochs), val_loss, label='val_loss')
plt.legend()
plt.xlabel('epochs')
plt.ylabel('loss')

# figure 2
plt.figure()
plt.plot(range(epochs), train_accuracy, label='train_accuracy')
plt.plot(range(epochs), val_accuracy, label='val_accuracy')
plt.legend()
plt.xlabel('epochs')
plt.ylabel('accuracy')
plt.show()

### 使用Keras的预训练模型 Using Pretrained Models from Keras

1. 获取预训练模型
    
    通常，您不必手动实现像 `GoogLeNet` 或 `ResNet` 这样的标准模型，因为预训练的网络可以通过 `keras.applications` 包中获得。

In [None]:
# 创建一个ResNet-50模型并下载ImageNet数据集上预训练的权重
model = keras.applications.resnet50.ResNet50(weights="imagenet")
model.summary()

In [None]:
from sklearn.datasets import load_sample_image

china = load_sample_image("china.jpg") / 255.0
flower = load_sample_image("flower.jpg") / 255.0
images = np.array([china, flower])

plot_color_image(images[0])

2. 调整图像的大小

    要使用它，首先需要确保图像具有正确的大小。 `ResNet-50` 模型需要 224 × 224 像素的图像.

    使用`tf.image.resize()`函数来调整我们之前加载的图像的大小

In [None]:
images_resized = tf.image.resize(images, [224, 224])
plot_color_image(images_resized[0])

`tf.image.resize() `不保留宽高比,可以使用

- `tf.image.resize_with_pad()`:调整图像大小并将图像**填充**到目标宽度和高度。

In [None]:
images_resized = tf.image.resize_with_pad(image=images,
                                          target_height=224,
                                          target_width=224,
                                          antialias=True)  # 抗锯齿
plot_color_image(images_resized[0])

- `tf.image.resize_with_crop_or_pad()`:将图像**裁剪**/填充到目标宽度和高度。

In [None]:
images_resized = tf.image.resize_with_crop_or_pad(image=images,
                                                  target_height=224,
                                                  target_width=224)
plot_color_image(images_resized[0])

- `tf.image.crop_amd_resize()`:同时实现图像裁剪为适当宽高比和调整尺寸的操作.
```python
tf.image.crop_and_resize(
    image,     
    boxes,
    box_ind,
    crop_size,
    method='bilinear',
    extrapolation_value=0,
    name=None
)
```

    1. `image`:一个四维的张量,输入格式为`[batch, image_height, image_width, depth].`
    2. `boxes`:指需要划分的区域位置的百分比，输入格式为 `[[ymin,xmin,ymax,xmax]]` ,设crop的区域坐标为 `[ y1,x1,y2,x2 ]`,那么想得到相应正确的crop图形就一定要**归一化**，即图片长度为 [ W,H ],则实际输入的boxes为 [ $\frac{y_1}{H}$,$\frac{x_1}{W}$,$\frac{y_2}{H}$,$\frac{x_2}{W}$ ]。
        - 情况一:不超出1
        - 情况二:超出1--自动补齐,当全部超出1，那就是个黑色框了，因为超出1根据归一化截取不到图片任何内容.
    3. `box_ind`:用于索引
    4. `crop_size`:图像裁剪大小

        例:图像高度=427 , 图像宽度=640, 经过标准化后图像($\frac{427}{255.0}$,$\frac{640}{255.0}$).框选范围位置的百分比为`[0, 0.03, 1, 0.68]`,
            得 [ 0*427=0, 0.03*640=19.2, 1*427=427, 0.68*640=435.2 ],
        调整大小到`[224,224]`

In [None]:
china_box = [0, 0.03, 1, 0.68]
flower_box = [0.19, 0.26, 0.86, 0.7]

images_resized = tf.image.crop_and_resize(image=images,
                                          boxes=[china_box, flower_box],
                                          box_indices=[0, 1],
                                          crop_size=[224, 224])

In [None]:
plot_color_image(images_resized[0])

In [None]:
plot_color_image(images_resized[1])

3. 根据预训练模型来预处理图像
   
   每个模型都提供了一个 `preprocess_input()` 函数，可以使用它来预处理您的图像。**这些函数假定像素值的范围是 0 到 255**，由于之前的工作我们实现了归一化,因此我们必须将它们乘以 255.

In [None]:
inputs = keras.applications.resnet50.preprocess_input(images_resized * 255)

4. 预测
    
    输出 `y_proba` 是一个矩阵，每幅图像一行，每类一列。
    
    如果要显示前 K 个预测(包括类名和每个预测类的估计概率)使用 `decode_predictions()`。对于每个图像，它返回一个包含前 K 个预测的数组，其中每个预测都表示为一个**包含类标识符**（在 `ImageNet` 数据集中，每个图像都与 `WordNet `数据集中的一个词相关联：类ID是一个`WordNetID`）,它的名称，以及相应的置信度分数的数组.

In [None]:
y_proba = model.predict(inputs)
np.shape(y_proba)

In [None]:
top_K = keras.applications.resnet50.decode_predictions(y_proba, top=3)

for image_index in range(len(images)):  # 0,1
    print("Image #{}".format(image_index))
    for class_id, name, y_proba in top_K[image_index]:
        print("  {} - {:12s} {:.2f}%".format(class_id, name, y_proba * 100))
    print()

> Apotosome 01/25/22