In [10]:
import tensorflow as tf
from keras import layers,activations
import numpy as np
from keras.datasets import mnist


In [11]:
class Residual(tf.keras.Model):
    # 定义网络结构
    def __init__(self,num_channels,use_1x1conv=False,strides=1):
        super(Residual,self).__init__()
        # 卷积层
        self.conv1 = layers.Conv2D(num_channels,padding='same',kernel_size=3,strides=strides)
        # 卷积层
        self.conv2 = layers.Conv2D(num_channels,kernel_size=3,padding='same')
        # 是否使用1*1的卷积
        if use_1x1conv:
            self.conv3 = layers.Conv2D(num_channels,kernel_size=1,strides=strides)
        else:
            self.conv3 = None
        # BN层
        self.bn1 = layers.BatchNormalization()
        self.bn2 = layers.BatchNormalization()
    # 定义前向传播过程  
    def call(self,x):
        Y = activations.relu(self.bn1(self.conv1(x)))
        Y = self.bn2(self.conv2(Y))
        if self.conv3:
            x = self.conv3(x)
        outputs = activations.relu(Y+x)
        return outputs

In [12]:
class ResnetBlock(tf.keras.layers.Layer):
    # 定义所需的网络结构
    def __init__(self,num_channels,num_res,first_block=False):
        super(ResnetBlock,self).__init__()
        # 存储残差块
        self.listLayers=[]
        # 遍历残差数目生成模块
        for i in range(num_res):
            # 如果是第一个残差块而且不是第一个模块时
            if i ==0 and not first_block:
                self.listLayers.append(Residual(num_channels,use_1x1conv=True,strides=2))
            else:
                self.listLayers.append(Residual(num_channels))
    # 定义前向传播
    def call(self,X):
        for layer in self.listLayers.layers:
            X = layer(X)
        return X

In [13]:
class ResNet(tf.keras.Model):
    # 定义网络的构成
    def __init__(self, num_blocks):
        super(ResNet, self).__init__()
        # 输入层
        self.conv = layers.Conv2D(64, kernel_size=7, strides=2, padding='same')
        # BN 层
        self.bn = layers.BatchNormalization()
        # 激活层
        self.relu = layers.Activation('relu')
        # 池化
        self.mp = layers.MaxPool2D(pool_size=3, strides=2, padding="same")
        # 残差模块
        self.res_block1 = ResnetBlock(64, num_blocks[0], first_block=True)
        self.res_block2 = ResnetBlock(128, num_blocks[1])
        self.res_block3 = ResnetBlock(256, num_blocks[2])
        self.res_block4 = ResnetBlock(512, num_blocks[3])
        # GAP
        self.gap = layers.GlobalAvgPool2D()
        # 全连接层
        self.fc = layers.Dense(
            units=10, activation=tf.keras.activations.softmax)
    # 定义前向传播过程

    def call(self, x):
        # 输入部分的传输过程
        x = self.conv(x)
        x = self.bn(x)
        x = self.relu(x)
        x = self.mp(x)
        # block
        x = self.res_block1(x)
        x = self.res_block2(x)
        x = self.res_block3(x)
        x = self.res_block4(x)
        # 输出部分的传输
        x = self.gap(x)
        x = self.fc(x)
        return x

In [14]:
mynet = ResNet([2,2,2,2])
X = tf.random.uniform((1,224,224,1))
y = mynet(X)
mynet.summary()

Model: "res_net_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d_20 (Conv2D)          multiple                  3200      
                                                                 
 batch_normalization_17 (Bat  multiple                 256       
 chNormalization)                                                
                                                                 
 activation_1 (Activation)   multiple                  0         
                                                                 
 max_pooling2d_1 (MaxPooling  multiple                 0         
 2D)                                                             
                                                                 
 resnet_block_4 (ResnetBlock  multiple                 148736    
 )                                                               
                                                         

In [15]:
(train_images, train_labels), (test_images, test_labels) = mnist.load_data()
# 数据维度的调整：N H W C
train_images = np.reshape(train_images,(train_images.shape[0],train_images.shape[1],train_images.shape[2],1))
test_images = np.reshape(test_images,(test_images.shape[0],test_images.shape[1],test_images.shape[2],1))

In [16]:
def get_train(size):
    index = np.random.randint(0, np.shape(train_images)[0], size)
    resized_images = tf.image.resize_with_pad(train_images[index],224,224,)
    
    return resized_images.numpy(), train_labels[index]

def get_test(size):
    index = np.random.randint(0, np.shape(test_images)[0], size)
    resized_images = tf.image.resize_with_pad(test_images[index],224,224,)
    
    return resized_images.numpy(), test_labels[index]

In [17]:
train_images,train_labels = get_train(256)
test_images,test_labels = get_test(128)

In [18]:
optimizer = tf.keras.optimizers.SGD(learning_rate=0.01, momentum=0.0)

mynet.compile(optimizer=optimizer,
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])

mynet.fit(train_images,train_labels,batch_size=128,epochs=3,verbose=1,validation_split=0.1)

loss,accuracy = mynet.evaluate(test_images,test_labels,verbose=1)
print("损失值",loss)
print("准确率", accuracy)

Epoch 1/3
Epoch 2/3
Epoch 3/3
损失值 4.913150787353516
准确率 0.1015625
