In [2]:
# 导入必需的模块

import numpy as np
import tensorflow as tf

from tensorflow.keras.layers import Input, Add, Dense, Activation, ZeroPadding2D, BatchNormalization, Flatten, \
    Conv2D, AveragePooling2D, MaxPooling2D, GlobalMaxPooling2D
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.preprocessing import image
from tensorflow.keras.applications.imagenet_utils import preprocess_input
import pydot
from IPython.display import SVG
from tensorflow.keras.utils import plot_model
from resnets_utils import *

import scipy.misc

from matplotlib.pyplot import imshow
%matplotlib inline

## 构建残差网络

残差网络是由残差块组成的。

左图是传统的网络块，右边的是残差网络块，其实就是在传统的网络块上添加一条小路，以便让激活值和梯度值可以跳层传播，以此来避免梯度消失和爆炸。

<img src="images/skip_connection_kiank.png" style="width:650px;height:250px;">

在实现残差块时，会有两种情况：

1. 跳跃传递的矩阵与目标层矩阵的维度一致
2. 跳传矩阵与目标层矩阵不一致时，不一致时就需要变换矩阵的维度

### 维度一致时的残差块

小路的左边的网络层的激活值与小路右边的网络层的激活值的维度是一致的，所以可以直接跳过去。就是说$a^{[l]}$)与$a^{[l+2]}$的维度是一致的。从下图中可以看出有两条路，一条直线主路，一条弧线小路:

<img src="images/idblock2_kiank.png" style="width:650px;height:150px;">

上图是跳了2层，其实还可以跳更多层，下图就跳了3层: 

<img src="images/idblock3_kiank.png" style="width:650px;height:150px;">
<caption><center> <u> <font> 图 4 </u><font></center></caption>


In [3]:
### 维度相同时的残差块

def identity_block(X, f, filters, stage, block, debug=False):
    '''
    实现图 4 展示的残差块

    参数：
    X -- 要跳跃的激活值矩阵
    f -- 整型，指示卷积层的窗口大小
    filters -- 整型数组，指示残差块中的卷积层的过滤器的个数
    stage -- 整型，辅助给网络层取名
    block -- 字符串，辅助给网络层取名

    返回：
    X -- 残差块的最终输出矩阵
    '''

    # 取名
    conv_name_base = 'res' + str(stage) + block + '_branch'
    bn_name_base = 'bn' + str(stage) + block + '_branch'

    # 获取每一个卷积层对应的过滤器的个数
    F1, F2, F3 = filters

    # 定义参数初始化的方法
    initializer = tf.keras.initializers.GlorotUniform

    # 保存输入的激活值，用于插入后面的网络层中
    X_shortcut = X

    # 使用 functional API 构建模型
    # 主路：第一组网络层，图 4 的第一组绿橙黄小方块
    X = Conv2D(filters=F1, kernel_size=(1, 1), strides=(1, 1), padding='valid', 
                name=conv_name_base + '2a', kernel_initializer=initializer(seed=0))(X)
    debug and print('#1 X.shape', X.shape)
    X = BatchNormalization(axis=3, name=bn_name_base + '2a')(X)
    X = Activation('relu')(X)

    # 主路：第二组网络层，图 4 的第二组绿橙黄小方块
    X = Conv2D(filters=F2, kernel_size=(f, f), strides=(1, 1), padding='same', 
                name=conv_name_base + '2b', kernel_initializer=initializer(seed=0))(X)
    debug and print('#2 X.shape', X.shape)
    X = BatchNormalization(axis=3, name=bn_name_base + '2b')(X)
    X = Activation('relu')(X)

    # 主路：第三组网络层，图 4 的第二组绿橙小方块
    X = Conv2D(filters=F3, kernel_size=(1, 1), strides=(1, 1), padding='valid', 
                name=conv_name_base + '2c', kernel_initializer=initializer(seed=0))(X)
    debug and print('#3 X.shape', X.shape)
    X = BatchNormalization(axis=3, name=bn_name_base + '2c')(X)

    # 小路：合并 X_shortcut 和 第三组网络层的输出激活值
    X = Add()([X, X_shortcut])
    X = Activation('relu')(X)

    return X

In [8]:
# Unit test
np.random.seed(1)
X = tf.cast(np.random.randn(3, 4, 4, 6),tf.float32)
A = identity_block(X, f=2, filters=[2, 4, 6], stage=1, block='a', debug=True)
tf.print("out = ", A[1][1][0])


#1 X.shape (3, 4, 4, 2)
#2 X.shape (3, 4, 4, 4)
#3 X.shape (3, 4, 4, 6)
out =  [0 0 1.34544396 2.03178668 0 1.32464457]


### 维度不同时的情况

当维度不同时，我们就不能直接将前面的激活值和后面的激活值矩阵合并在一起，所以需要在小路上加个卷积层来改变前面的激活矩阵的维度。如下图所示,小路上加了一个conv2d卷积层: 

<img src="images/convblock_kiank.png" style="width:650px;height:200px;">
<caption><center> <u> <font > 图 5 </u></center></caption>

In [5]:
# 实现图 5 中的残差块

def convolutional_block(X, f, filters, stage, block, s=2, debug=False):
    conv_name_base = 'res' + str(stage) + block + '_branch'
    bn_name_base = 'bn' + str(stage) + block + '_branch'

    F1, F2, F3 = filters

    initializer = tf.keras.initializers.GlorotUniform

    X_shortcut = X

    # 1
    X = Conv2D(filters=F1, kernel_size=(1, 1), strides=(s, s), padding='valid', 
                name=conv_name_base + '2a', kernel_initializer=initializer(seed=0))(X)
    debug and print('#1 X.shape', X.shape)
    X = BatchNormalization(axis=3, name=bn_name_base + '2a')(X)
    X = Activation('relu')(X)

    # 2
    X = Conv2D(filters=F2, kernel_size=(f, f), strides=(1, 1), padding='same', 
                name=conv_name_base + '2b', kernel_initializer=initializer(seed=0))(X)
    debug and print('#2 X.shape', X.shape)
    X = BatchNormalization(axis=3, name=bn_name_base + '2b')(X)
    X = Activation('relu')(X)

    # 3
    X = Conv2D(filters=F3, kernel_size=(1, 1), strides=(1, 1), padding='valid', 
                name=conv_name_base + '2c', kernel_initializer=initializer(seed=0))(X)
    debug and print('#3 X.shape', X.shape)
    X = BatchNormalization(axis=3, name=bn_name_base + '2c')(X)

    # 在小路上加上一个卷积层和一个 BatchNormalization
    # 卷积层会改变 X_shortcut 的值，就可以与 X 矩阵合并在一起了
    X_shortcut = Conv2D(filters=F3, kernel_size=(1, 1), strides=(s, s), padding='valid',
                        name=conv_name_base + '1', kernel_initializer=initializer(seed=0))(X_shortcut)
    X_shortcut = BatchNormalization(axis=3, name=bn_name_base + '1')(X_shortcut)

    # 将变维度后的X_shortcut 与 X 合并在一起
    X = Add()([X, X_shortcut])
    X = Activation('relu')(X)

    return X

In [6]:
# Unit test
np.random.seed(1)
X = tf.cast(np.random.randn(3, 4, 4, 6),dtype=tf.float32)
A = convolutional_block(X, f=2, filters=[2, 4, 6], stage=1, block='a', debug=True)
tf.print("out = ", (A[1][1][0]))

#1 X.shape (3, 2, 2, 2)
#2 X.shape (3, 2, 2, 4)
#3 X.shape (3, 2, 2, 6)
out =  [0 0 0 0.892983675 0 0.198875308]


## 构建 ResNet50 网络
> 一个 50 层的残差网络

 - ID BLOCK 是指维度相同时的残差块，ID BLOCK x3 表示有 3 组这样的残差块。
 - CONV BLOCK 是指维度不同时的残差块

<img src="images/resnet_kiank.png" style="width:850px;height:150px;">
<caption><center> <u> <font> 图 6: ResNet-50模型 </center></caption>

In [9]:
# 实现 ResNet50

def ResNet50(input_shape=(64, 64, 3), classes=6):
    '''
    参数：
    input_shape -- 输入的图像矩阵的维度
    classes -- 类别数量

    Returns:
    model -- 网络模型
    '''

    # 根据输入维度定义一个输入变量
    X_input = Input(input_shape)

    initializer = tf.keras.initializers.GlorotUniform

    # 零填充
    X = ZeroPadding2D((3, 3))(X_input)

    # Stage 1
    X = Conv2D(filters=64, kernel_size=(7, 7), strides=(2, 2), name='conv1', 
                kernel_initializer=initializer(seed=0))(X)
    X = BatchNormalization(axis=3, name='bn_conv1')(X)
    X = Activation('relu')(X)
    X = MaxPooling2D(pool_size=(3, 3), strides=(2, 2))(X)

    # Stage 2
    X = convolutional_block(X, f=3, filters=[64, 64, 256], stage=2, block='a', s=1)
    X = identity_block(X, 3, [64, 64, 256], stage=2, block='b')
    X = identity_block(X, 3, [64, 64, 256], stage=2, block='c')

    # Stage 3
    X = convolutional_block(X, f=3, filters=[128, 128, 512], stage=3, block='a', s=2)
    X = identity_block(X, 3, [128, 128, 512], stage=3, block='b')
    X = identity_block(X, 3, [128, 128, 512], stage=3, block='c')
    X = identity_block(X, 3, [128, 128, 512], stage=3, block='d')

    # Stage 4
    X = convolutional_block(X, f=3, filters=[256, 256, 1024], stage=4, block='a', s=2)
    X = identity_block(X, 3, [256, 256, 1024], stage=4, block='b')
    X = identity_block(X, 3, [256, 256, 1024], stage=4, block='c')
    X = identity_block(X, 3, [256, 256, 1024], stage=4, block='d')
    X = identity_block(X, 3, [256, 256, 1024], stage=4, block='e')
    X = identity_block(X, 3, [256, 256, 1024], stage=4, block='f')

    # Stage 5
    X = convolutional_block(X, f=3, filters=[512, 512, 2048], stage=5, block='a', s=2)
    X = identity_block(X, 3, [512, 512, 2048], stage=5, block='b')
    X = identity_block(X, 3, [512, 512, 2048], stage=5, block='c')

    # 平均池化层
    X = AveragePooling2D(pool_size=(2, 2), padding='same')(X)

    # 扁平化
    X = Flatten()(X)
    
    # 对接全连接层
    X = Dense(classes, activation='softmax', name='fc' + str(classes), 
                kernel_initializer=initializer(seed=0))(X)

    # 构建模型
    model = Model(inputs=X_input, outputs=X, name='ResNet50')

    return model

In [10]:
model = ResNet50(input_shape=(64, 64, 3), classes=6)

In [11]:
# 编程模型
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

开始训练模型，首先加载数据集

<img src="images/signs_data_kiank.png" style="width:600px;height:300px;">
<caption><center> <u> <font > 图7: 手势数据集 </center></caption>

In [12]:
X_train_orig, Y_train_orig, X_test_orig, Y_test_orig, classes = load_dataset()

X_train = X_train_orig / 255
X_test = X_test_orig / 255

Y_train = convert_to_one_hot(Y_train_orig, 6).T
Y_test = convert_to_one_hot(Y_test_orig, 6).T

print("number of training examples = " + str(X_train.shape[0]))
print("number of test examples = " + str(X_test.shape[0]))
print("X_train shape: " + str(X_train.shape))
print("Y_train shape: " + str(Y_train.shape))
print("X_test shape: " + str(X_test.shape))
print("Y_test shape: " + str(Y_test.shape))

number of training examples = 1080
number of test examples = 120
X_train shape: (1080, 64, 64, 3)
Y_train shape: (1080, 6)
X_test shape: (120, 64, 64, 3)
Y_test shape: (120, 6)


In [13]:
# 训练模型，由于使用 CPU 训练，只训练 2 个 epoch 
model.fit(X_train, Y_train, epochs = 2, batch_size = 32)

2022-07-02 17:48:04.159933: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:176] None of the MLIR Optimization Passes are enabled (registered 2)
2022-07-02 17:48:04.160823: I tensorflow/core/platform/profile_utils/cpu_utils.cc:114] CPU Frequency: 2500000000 Hz


Epoch 1/2
Epoch 2/2


<tensorflow.python.keras.callbacks.History at 0x7faf428fcf70>

In [17]:
# 训练模型，使用 GPU 训练，训练 100 个 epoch 
model.fit(X_train, Y_train, epochs = 100, batch_size = 32)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 23/100
Epoch 24/100
Epoch 25/100
Epoch 26/100
Epoch 27/100
Epoch 28/100
Epoch 29/100
Epoch 30/100
Epoch 31/100
Epoch 32/100
Epoch 33/100
Epoch 34/100
Epoch 35/100
Epoch 36/100
Epoch 37/100
Epoch 38/100
Epoch 39/100
Epoch 40/100
Epoch 41/100
Epoch 42/100
Epoch 43/100
Epoch 44/100
Epoch 45/100
Epoch 46/100
Epoch 47/100
Epoch 48/100
Epoch 49/100
Epoch 50/100
Epoch 51/100
Epoch 52/100
Epoch 53/100
Epoch 54/100
Epoch 55/100
Epoch 56/100
Epoch 57/100
Epoch 58/100
Epoch 59/100
Epoch 60/100
Epoch 61/100
Epoch 62/100
Epoch 63/100
Epoch 64/100
Epoch 65/100
Epoch 66/100
Epoch 67/100
Epoch 68/100
Epoch 69/100
Epoch 70/100
Epoch 71/100
Epoch 72/100
Epoch 73/100
Epoch 74/100
Epoch 75/100
Epoch 76/100
Epoch 77/100
Epoch 78

<tensorflow.python.keras.callbacks.History at 0x7fad5006a070>

In [14]:
# 测试模型的精确度 - 2 epoch
preds = model.evaluate(X_test, Y_test)
print("Loss = " + str(preds[0]))
print("Test Accuracy = " + str(preds[1]))

Loss = 5.987757205963135
Test Accuracy = 0.1666666716337204


In [18]:
# 测试模型的精确度 - 100 epoch
preds = model.evaluate(X_test, Y_test)
print("Loss = " + str(preds[0]))
print("Test Accuracy = " + str(preds[1]))

Loss = 0.1468881219625473
Test Accuracy = 0.949999988079071
