In [2]:
# -*-conding: utf8 -*-
# !/usr/bin/python
# Author: Selvaria

# 残差网络

import tensorflow as tf
import numpy as np
print(tf.__version__)

2.1.0


In [4]:
#残差块的实现如下。它可以设定输出通道数、是否使用额外的1×1卷积层来修改通道数以及卷积层的步幅。

from tensorflow.keras import layers,activations

class Residual(tf.keras.Model):
    def __init__(self, num_channels, use_1x1conv=False, strides=1, **kwargs):
        super(Residual, self).__init__(**kwargs)
        self.conv1 = layers.Conv2D(num_channels,
                                   padding='same',
                                   kernel_size=3,
                                   strides=strides)
        self.conv2 = layers.Conv2D(num_channels, kernel_size=3,padding='same')
        if use_1x1conv: #使用额外的1×1卷积层来修改(输出的)通道数以及卷积层的步幅
            self.conv3 = layers.Conv2D(num_channels,
                                       kernel_size=(1,1),
                                       strides=strides)
        else:
            self.conv3 = None
        self.bn1 = layers.BatchNormalization()
        self.bn2 = layers.BatchNormalization()

    def call(self, X):
        Y = activations.relu(self.bn1(self.conv1(X)))
        Y = self.bn2(self.conv2(Y))
        if self.conv3:
            X = self.conv3(X)
        return activations.relu(Y + X)

# 此时输入和输出形状一致
blk = Residual(3)
X = tf.random.uniform((4, 6, 6, 3))
blk(X).shape#TensorShape([4, 6, 6, 3])


TensorShape([4, 6, 6, 3])

In [7]:
# 此时输出形状长宽减半（靠步幅。正常情况是靠卷积核），输出通道数量也有调整
blk = Residual(6, use_1x1conv=True, strides=2)
blk(X).shape

TensorShape([4, 3, 3, 6])

In [29]:
# ResNet的前两层：

net = tf.keras.models.Sequential(
    [layers.Conv2D(64, kernel_size=7, strides=2, padding='same', input_shape=(224, 224, 1), activation='relu'),
    layers.BatchNormalization(),
    layers.MaxPool2D(pool_size=3, strides=2, padding='same')])

net.summary()

Model: "sequential_7"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d_140 (Conv2D)          (None, 112, 112, 64)      3200      
_________________________________________________________________
batch_normalization_119 (Bat (None, 112, 112, 64)      256       
_________________________________________________________________
max_pooling2d_13 (MaxPooling (None, 56, 56, 64)        0         
Total params: 3,456
Trainable params: 3,328
Non-trainable params: 128
_________________________________________________________________


In [16]:
# 之后是残差块，每个模块在第一个残差块里将上一个模块的通道数翻倍，并将高和宽减半。
# 注意，这里对第一个模块做了特别处理。

class ResnetBlock(tf.keras.layers.Layer):
    def __init__(self,num_channels, num_residuals, first_block=False,**kwargs):
        super(ResnetBlock, self).__init__(**kwargs)
        self.listLayers=[]
        for i in range(num_residuals):
            if i == 0 and not first_block: #第一层要减半再加，因为残差层本身含有卷积层，会将长宽减半
                self.listLayers.append(Residual(num_channels, use_1x1conv=True, strides=2))
            else:
                self.listLayers.append(Residual(num_channels))# 其余情况输入和输出形状一致，只是输出通道加倍      

    def call(self, X):
        for layer in self.listLayers.layers:
            X = layer(X)
        return X

In [22]:
# 接着我们为ResNet加入所有残差块。这里每个模块使用两个残差块。

class ResNet(tf.keras.Model):
    def __init__(self,num_blocks,i_shape,**kwargs):
        super(ResNet, self).__init__(**kwargs)
        self.conv=layers.Conv2D(64, kernel_size=7, strides=2, padding='same', input_shape=i_shape, activation='relu')
        self.bn=layers.BatchNormalization()
#         self.relu=layers.Activation('relu')
        self.mp=layers.MaxPool2D(pool_size=3, strides=2, padding='same')
        self.resnet_block1=ResnetBlock(64,num_blocks[0], first_block=True)
        self.resnet_block2=ResnetBlock(128,num_blocks[1])
        self.resnet_block3=ResnetBlock(256,num_blocks[2])
        self.resnet_block4=ResnetBlock(512,num_blocks[3])
        self.gap=layers.GlobalAvgPool2D()
        self.fc=layers.Dense(units=10,activation=tf.keras.activations.softmax)

    def call(self, x):
        x=self.conv(x)
        x=self.bn(x)
#         x=self.relu(x)
        x=self.mp(x)
        x=self.resnet_block1(x)
        x=self.resnet_block2(x)
        x=self.resnet_block3(x)
        x=self.resnet_block4(x)
        x=self.gap(x)
        x=self.fc(x)
        return x

mynet=ResNet([2,2,2,2],i_shape=(224, 224, 1)) # 这里每个模块使用两个残差块。

In [24]:
mynet= tf.keras.models.Sequential()
mynet.add(layers.Conv2D(64, kernel_size=7, strides=2, padding='same', input_shape=(224, 224, 1), activation='relu'))
mynet.add(layers.BatchNormalization())
mynet.add(layers.MaxPool2D(pool_size=3, strides=2, padding='same'))
mynet.add(ResnetBlock(64,2, first_block=True))
mynet.add(ResnetBlock(128,2))
mynet.add(ResnetBlock(256,2))
mynet.add(ResnetBlock(512,2))
mynet.add(layers.GlobalAvgPool2D())
mynet.add(layers.Dense(10,activation='softmax'))

mynet.summary()

Model: "sequential_4"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d_99 (Conv2D)           (None, 112, 112, 64)      3200      
_________________________________________________________________
batch_normalization_84 (Batc (None, 112, 112, 64)      256       
_________________________________________________________________
max_pooling2d_10 (MaxPooling (None, 56, 56, 64)        0         
_________________________________________________________________
resnet_block_16 (ResnetBlock (None, 56, 56, 64)        148736    
_________________________________________________________________
resnet_block_17 (ResnetBlock (None, 28, 28, 128)       526976    
_________________________________________________________________
resnet_block_18 (ResnetBlock (None, 14, 14, 256)       2102528   
_________________________________________________________________
resnet_block_19 (ResnetBlock (None, 7, 7, 512)        

In [26]:
mynet= tf.keras.models.Sequential()
mynet.add(layers.Conv2D(64, kernel_size=7, strides=2, padding='same', input_shape=(28, 28, 1), activation='relu'))
mynet.add(layers.BatchNormalization())
mynet.add(layers.MaxPool2D(pool_size=3, strides=2, padding='same'))
mynet.add(ResnetBlock(64,2, first_block=True))
mynet.add(ResnetBlock(128,2))
mynet.add(ResnetBlock(256,2))
mynet.add(ResnetBlock(512,2))
mynet.add(layers.GlobalAvgPool2D())
mynet.add(layers.Dense(10,activation='softmax'))

mynet.summary()

Model: "sequential_5"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d_119 (Conv2D)          (None, 14, 14, 64)        3200      
_________________________________________________________________
batch_normalization_101 (Bat (None, 14, 14, 64)        256       
_________________________________________________________________
max_pooling2d_11 (MaxPooling (None, 7, 7, 64)          0         
_________________________________________________________________
resnet_block_20 (ResnetBlock (None, 7, 7, 64)          148736    
_________________________________________________________________
resnet_block_21 (ResnetBlock (None, 4, 4, 128)         526976    
_________________________________________________________________
resnet_block_22 (ResnetBlock (None, 2, 2, 256)         2102528   
_________________________________________________________________
resnet_block_23 (ResnetBlock (None, 1, 1, 512)        

In [27]:
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.fashion_mnist.load_data()
x_train = x_train.reshape((60000, 28, 28, 1)).astype('float32') / 255
x_test = x_test.reshape((10000, 28, 28, 1)).astype('float32') / 255

mynet.compile(loss='sparse_categorical_crossentropy',
              optimizer=tf.keras.optimizers.Adam(),
              metrics=['accuracy'])

history = mynet.fit(x_train, y_train,
                    batch_size=64,
                    epochs=5,
                    validation_split=0.2)
test_scores = mynet.evaluate(x_test, y_test, verbose=2)

Train on 48000 samples, validate on 12000 samples
Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5
10000/10000 - 2s - loss: 0.2979 - accuracy: 0.9007
