In [1]:
# 导⼊相关的⼯具包
import tensorflow as tf
from tensorflow.keras import layers, activations

# 残差块

In [6]:
# 定义ResNet的残差块
class Residual(tf.keras.Model):
    # 指明残差块的通道数，是否使⽤1*1卷积，步⻓
    def __init__(self, num_channels, use_1x1conv=False, strides=1):
        super(Residual, self).__init__()
        # 卷积层：指明卷积核个数，padding,卷积核⼤⼩，步⻓
        self.conv1 = layers.Conv2D(num_channels,padding='same',kernel_size=3,strides=strides)
        # 卷积层：指明卷积核个数，padding,卷积核⼤⼩，步⻓
        self.conv2 = layers.Conv2D(num_channels, kernel_size=3,padding='same')
        if use_1x1conv:
            self.conv3 = layers.Conv2D(num_channels,kernel_size=1,strides=strides)
        else:
            self.conv3 = None
        # 指明BN层
        self.bn1 = layers.BatchNormalization()
        self.bn2 = layers.BatchNormalization()

    # 定义前向传播过程
    def call(self, X):
        # 卷积，BN，激活
        Y = activations.relu(self.bn1(self.conv1(X)))
        # 卷积，BN
        Y = self.bn2(self.conv2(Y))
        # 对输⼊数据进⾏1*1卷积保证通道数相同
        if self.conv3:
            X = self.conv3(X)
        # 返回与输⼊相加后激活的结果
        return activations.relu(Y + X)

# 残差模块

In [7]:
# ResNet⽹络中模块的构成
class ResnetBlock(tf.keras.layers.Layer):
    # ⽹络层的定义：输出通道数（卷积核个数），模块中包含的残差块个数，
    def __init__(self,num_channels, num_residuals, first_block=False):
        super(ResnetBlock, self).__init__()
        # 存储残差块
        self.listLayers=[]
        # 遍历模块中所有的层
        for i in range(num_residuals):
            # 若为第⼀个残差块并且不是第⼀个模块，则使⽤1*1卷积，步长为2
            if i == 0 and not first_block:
                self.listLayers.append(Residual(num_channels,use_1x1conv=True,strides=2))
            # 否则不使⽤1*1卷积，步⻓为1
            else:
                self.listLayers.append(Residual(num_channels))

    # 定义前向传播过程
    def call(self, X):
        # 所有层依次向前传播即可
        for layer in self.listLayers.layers:
            X = layer(X)
        return X

# ResNet网络

In [8]:
# 构建ResNet⽹络
class ResNet(tf.keras.Model):
    # 初始化：指定每个模块中的残差快的个数
    def __init__(self,num_blocks):
        super(ResNet, self).__init__()
        # 输⼊层：7*7卷积，步⻓为2
        self.conv=layers.Conv2D(64, kernel_size=7, strides=2,padding='same')
        # BN层
        self.bn=layers.BatchNormalization()
        # 激活层
        self.relu=layers.Activation('relu')
        # 最⼤池化层
        self.mp=layers.MaxPool2D(pool_size=3, strides=2, padding='same')
        # 第⼀个block，通道数为64
        self.resnet_block1=ResnetBlock(64,num_blocks[0], first_block=True)
        # 第⼆个block，通道数为128
        self.resnet_block2=ResnetBlock(128,num_blocks[1])
        # 第三个block，通道数为256
        self.resnet_block3=ResnetBlock(256,num_blocks[2])
        # 第四个block，通道数为512
        self.resnet_block4=ResnetBlock(512,num_blocks[3])
        # 全局平均池化
        self.gap=layers.GlobalAvgPool2D()
        # 全连接层：分类
        self.fc=layers.Dense(units=10,activation=tf.keras.activations.softmax)

    # 前向传播过程
    def call(self, x):
        # 卷积
        x=self.conv(x)
        # BN
        x=self.bn(x)
        # 激活
        x=self.relu(x)
        # 最⼤池化
        x=self.mp(x)
        # 残差模块
        x=self.resnet_block1(x)
        x=self.resnet_block2(x)
        x=self.resnet_block3(x)
        x=self.resnet_block4(x)
        # 全局平均池化
        x=self.gap(x)
        # 全链接层
        x=self.fc(x)
        return x


In [9]:
# 模型实例化：指定每个block中的残差块个数
mynet = ResNet([2,2,2,2])
X = tf.random.uniform(shape=(1, 224, 224 , 1))
y = mynet(X)
mynet.summary()

Model: "res_net_1"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d_20 (Conv2D)           multiple                  3200      
_________________________________________________________________
batch_normalization_17 (Batc multiple                  256       
_________________________________________________________________
activation_1 (Activation)    multiple                  0         
_________________________________________________________________
max_pooling2d_1 (MaxPooling2 multiple                  0         
_________________________________________________________________
resnet_block_4 (ResnetBlock) multiple                  148736    
_________________________________________________________________
resnet_block_5 (ResnetBlock) multiple                  526976    
_________________________________________________________________
resnet_block_6 (ResnetBlock) multiple                  21

# 手写数字识别

In [14]:
import numpy as np
# 数据集
from tensorflow.keras.datasets import mnist

# 获取⼿写数字数据集
(train_images, train_labels), (test_images, test_labels) = mnist.load_data()
# 训练集数据维度的调整：N H W C
train_images = np.reshape(train_images,(train_images.shape[0],train_images.shape[1],train_images.shape[2],1))
# 测试集数据维度的调整：N H W C
test_images = np.reshape(test_images,(test_images.shape[0],test_images.shape[1],test_images.shape[2],1))

In [15]:
# 定义两个⽅法随机抽取部分样本演示
# 获取训练集数据
def get_train(size):
 # 随机⽣成要抽样的样本的索引
 index = np.random.randint(0, np.shape(train_images)[0],size)
 # 将这些数据resize成22*227⼤⼩
 resized_images = tf.image.resize_with_pad(train_images[index],224,224,)
 # 返回抽取的
 return resized_images.numpy(), train_labels[index]

# 获取测试集数据
def get_test(size):
 # 随机⽣成要抽样的样本的索引
 index = np.random.randint(0, np.shape(test_images)[0], size)
 # 将这些数据resize成224*224⼤⼩
 resized_images = tf.image.resize_with_pad(test_images[index],224,224,)
 # 返回抽样的测试样本
 return resized_images.numpy(), test_labels[index]

In [16]:
# 获取训练样本和测试样本
train_images,train_labels = get_train(256)
test_images,test_labels = get_test(128)

In [None]:
# 模型编译
# 指定优化器，损失函数和评价指标
optimizer = tf.keras.optimizers.SGD(learning_rate=0.01, momentum=0.0)

mynet.compile(optimizer=optimizer,
                loss='sparse_categorical_crossentropy',
                metrics=['accuracy'])

In [None]:
# 模型训练：指定训练数据，batchsize,epoch,验证集
mynet.fit(train_images,train_labels,batch_size=128,epochs=3,verbose=1,validation_split=0.1)

In [None]:
# 指定测试数据
mynet.evaluate(test_images,test_labels,verbose=1)