In [4]:
import tensorflow as tf
from tensorflow.keras import layers, activations

# 1 残差块

In [5]:
class Residual(tf.keras.Model):
    def __init__(self, num_channels, use_1x1conv = False, strides = 1):
        super(Residual,self).__init__()
        self.conv1 = layers.Conv2D(num_channels,
                                   kernel_size = 3,
                                   padding = 'same',
                                   strides = strides)

        self.conv2 = layers.Conv2D(num_channels,
                                   strides = 1,
                                   kernel_size = 3,
                                   padding = 'same')
        if use_1x1conv:
            self.conv3 = layers.Conv2D(num_channels,
                                       kernel_size = 1,
                                       strides = strides)
        else:
            self.conv3 = None

        self.bn1 = layers.BatchNormalization()
        self.bn2 = layers.BatchNormalization()

    # 正向传播
    def call(self, x):
        Y = activations.relu(self.bn1(self.conv1(x)))
        Y = self.bn2(self.conv2(Y))
        if self.conv3:
            x = self.conv3(x)
        outputs = activations.relu(Y + x)

        return outputs

# 2 ResNet模型

## (1)定义残差模块
- 第一个模块需做特殊处理

In [6]:
class ResnetBlock(tf.keras.layers.Layer):
    def __init__(self, num_channels, num_res, first_block = False):
        super(ResnetBlock, self).__init__()
        # 存储残差块
        self.listLayers = []
        # 遍历残差数目生成模块
        for i in range(num_res):
            if i == 0 and not first_block:
                self.listLayers.append(Residual(num_channels, use_1x1conv= True, strides=2))
            else:
                self.listLayers.append(Residual(num_channels))
    # 前向传播
    def call(self, X):
        for layer in self.listLayers:
            X = layer(X)

        return X

## (2)构建Resnet网络

In [7]:
class Resnet(tf.keras.Model):
    def __init__(self, num_blocks):
        super(Resnet, self).__init__()
        # 输入层
        self.conv = layers.Conv2D(
            filters = 64,
            kernel_size = 7,
            padding = 'same',
            strides = 2
        )
        # BN层
        self.bn = layers.BatchNormalization()
        # 激活层
        self.relu = layers.Activation('relu')
        # 池化
        self.mp = layers.MaxPool2D(
            pool_size = 3,
            strides = 2,
            padding = 'same'
        )
        # 残差模块
        self.res_block1 = ResnetBlock(64, num_blocks[0], first_block=True)
        self.res_block2 = ResnetBlock(128, num_blocks[1])
        self.res_block3 = ResnetBlock(256, num_blocks[2])
        self.res_block4 = ResnetBlock(512, num_blocks[3])
        # GAP
        self.gap = layers.GlobalAvgPool2D()
        # 全连接层
        self.fc = layers.Dense(10, activation = 'softmax') # 也可写成（units = 10, activation = tf.keras.activations.softmax）

    # 定义前向传播过程
    def call(self, x):
        # 输入部分传输过程
        x = self.conv(x)
        x = self.bn(x)
        x = self.relu(x)
        x = self.mp(x)
        # block
        x = self.res_block1(x)
        x = self.res_block2(x)
        x = self.res_block3(x)
        x = self.res_block4(x)
        # 输出部分的传输
        x = self.gap(x)
        x = self.fc(x)

        return x

In [8]:
# 实例化
mynet = Resnet([2, 2, 2, 2])
x = tf.random.uniform((1, 224, 224, 1))
y = mynet(x)
mynet.summary()

# 3 手写数字识别
## （1）数据读取

In [9]:
import numpy as np
from tensorflow.keras.datasets import mnist

(train_images, train_labels), (test_images, test_labels) = mnist.load_data()

In [16]:
train_images.shape

(60000, 28, 28, 1)

In [17]:
test_images.shape

(10000, 28, 28, 1)

In [15]:
# N H W C
train_images = np.reshape(train_images, (train_images.shape[0], train_images.shape[1], train_images.shape[2], 1))

test_images = np.reshape(test_images, (test_images.shape[0], test_images.shape[1], test_images.shape[2], 1))

In [18]:
# 定义两个方法随机抽取部分样本演示

def get_train(size):
    index = np.random.randint(0, np.shape(train_images)[0], size)

    resize_images = tf.image.resize_with_pad(train_images[index], 224, 224, )

    return resize_images.numpy(), train_labels[index]

def get_test(size):
    index = np.random.randint(0, np.shape(test_images)[0], size)

    resize_images = tf.image.resize_with_pad(test_images[index], 224, 224, )

    return resize_images.numpy(), test_labels[index]

In [11]:
# 获取训练样本和测试样本
train_image, train_label = get_train(256)
test_image, test_label = get_test(128)

## (2)模型编译

In [19]:
# 指定优化器，损失函数和评价指标
optimizer = tf.keras.optimizers.SGD(learning_rate = 0.01, momentum = 0.0)

mynet.compile(
    optimizer = optimizer,
    loss = 'sparse_categorical_crossentropy',
    metrics = ['accuracy']
)

## (3)模型训练

In [20]:
# 模型训练：指定训练数据集，batchsize, epoch, 验证集
mynet.fit(train_images,
        train_labels,
        batch_size = 128,
        epochs = 3,
        verbose = 1, # 显示整个训练的log
        validation_split = 0.2) # 验证集

Epoch 1/3


TypeError: Exception encountered when calling Conv2D.call().

[1mValue passed to parameter 'input' has DataType uint8 not in list of allowed values: float16, bfloat16, float32, float64, int32[0m

Arguments received by Conv2D.call():
  • inputs=tf.Tensor(shape=(128, 28, 28, 1), dtype=uint8)

## (4)模型评估

In [14]:
mynet.evaluate(test_images, test_labels, verbose = 1)

TypeError: Exception encountered when calling Conv2D.call().

[1mValue passed to parameter 'input' has DataType uint8 not in list of allowed values: float16, bfloat16, float32, float64, int32[0m

Arguments received by Conv2D.call():
  • inputs=tf.Tensor(shape=(None, 28, 28, 1), dtype=uint8)