In [1]:
import tensorflow as tf

### 搭建残差网络

#### 搭建层

In [9]:
class RestNetLayer(tf.keras.layers.Layer):
    def __init__(self,fitler_num,stride=1):
        super(RestNetLayer,self).__init__()
        self.Conv1 = tf.keras.layers.Conv2D(fitler_num,(3,3),strides=stride,padding="same")
        self.Bn1 = tf.keras.layers.BatchNormalization()
        self.relu = tf.keras.layers.Activation('relu')
        
        self.Conv2 = tf.keras.layers.Conv2D(fitler_num,(3,3),strides=1,padding="same")
        self.Bn2 = tf.keras.layers.BatchNormalization()
        
        # 残差层
        if stride != 1:
            self.downSample = tf.keras.Sequential()
            self.downSample.add(tf.keras.layers.Conv2D(fitler_num,(1,1),strides = stride))
        else:
            self.downSample = lambda x:x
    
    def call(self,inputs,training = None):
        
        # [b ,h ,w ,c] # batch_size 大小的 h高度 w宽度 c通道的图片数据集
        out = self.Conv1(inputs)
        out = self.Bn1(out)
        out = self.relu(out)
        
        out = self.Conv2(out)
        out = self.Bn2(out)
        
        identity = self.downSample(inputs)
        
        output = tf.keras.layers.add([out,identity])
        output = tf.nn.relu(output)
        
        return output

#### 搭建网络

In [10]:
class RestNetModel(tf.keras.models.Model):
    def __init__(self,layer_dims,num_classser = 100): #卷积维度C chanels 和分类总数 
        super(RestNetModel,self).__init__()
        self.item = tf.keras.Sequential([
            tf.keras.layers.Conv2D(64,kernel_size=(3,3),strides=(1,1)),
            tf.keras.layers.BatchNormalization(),
            tf.keras.layers.Activation("relu"),
            tf.keras.layers.MaxPool2D(pool_size=(2,2),strides=(1,1),padding="same")
        ]) # 预处理层
        
        self.unit1 = self.build_resblock(64,layer_dims[0])
        self.unit2 = self.build_resblock(128,layer_dims[1],stride = 2)
        self.unit3= self.build_resblock(256,layer_dims[2],stride = 2)
        self.unit4= self.build_resblock(512,layer_dims[3],stride = 2)
        
        # 全连接层
        # 先处理【B,C,H,W】
        self.avgpool = tf.keras.layers.GlobalAvgPool2D()
        self.fc = tf.keras.layers.Dense(num_classser) # 最后转换成100类
        
        
    def call(self,inputs,training =None):
        # 前向传播
        # 【b,w,h,c】 =>[b,64.h.w]
        x = self.item(inputs)
        x = self.unit1(x)
        x = self.unit2(x)
        x = self.unit3(x)
        x = self.unit4(x)
        
        # =>[b,c]
        x = self.avgpool(x)
        
        # =>[b,100]
        x = self.fc(x)
        
        return x
    
    def build_resblock(self,fitler_num,blocks,stride=1):
        res_block = tf.keras.Sequential()
        res_block.add(RestNetLayer(fitler_num,stride))
        
        for _ in range(1,blocks):
            res_block.add(RestNetLayer(fitler_num,1))
            
        return res_block
    
def RestNet18():
    return RestNetModel([2,2,2,2])

In [11]:
# 数据准备
(x,y),(x_test,y_test) = tf.keras.datasets.cifar100.load_data()
def preprocess(x,y):
    x = tf.cast(x,tf.float32)/255.
    y = tf.cast(y,tf.int32)
    return x,y
batchsize = 128
train_db = tf.data.Dataset.from_tensor_slices((x,y))
train_db = train_db.map(preprocess).shuffle(10000).batch(batchsize)
test_db = tf.data.Dataset.from_tensor_slices((x_test,y_test))
test_db = test_db.map(preprocess).batch(batchsize)

sample = next(iter(train_db))
print(sample[0].shape,sample[1].shape)

(128, 32, 32, 3) (128, 1)


### 训练神经网络

In [12]:
# 模型搭建
model = RestNet18()

# 可以先加载文件在继续训练
optimizer = tf.keras.optimizers.Adam(1e-3)

In [17]:
def train(echops):
    for echops in range(echops):
        for step,(x,y) in enumerate(train_db):
            # 维度变换
            with tf.GradientTape() as tape:
                # [b,h,w,c] =>[b,c_,h_,w_]
                out = model(x)

                # 损失函数
                y = tf.reshape(y,[y.shape[0]])
                y_hot = tf.one_hot(y,depth=100)

                loss = tf.keras.losses.categorical_crossentropy(y_hot,out,from_logits=True)
                loss = tf.reduce_mean(loss)

            grads = tape.gradient(loss,model.trainable_variables)
            # 优化器
            optimizer.apply_gradients(zip(grads,model.trainable_variables))

            if step % 10 == 0:
                print(step)
                print("loss:",loss)
            

In [18]:
train(10)

0
loss: tf.Tensor(4.6140394, shape=(), dtype=float32)
10
loss: tf.Tensor(4.6096735, shape=(), dtype=float32)



KeyboardInterrupt



In [19]:
def test():
    correct_num,total_num = 0,0
    for _,(x,y) in enumerate(test_db):
        logits = model(x)
        prob = tf.nn.softmax(logits,axis=1)
        pred = tf.argmax(prob,axis=1)
        pred = tf.cast(pred,dtype=tf.int32)
        
        y = tf.reshape(y,[y.shape[0]])
        correct = tf.cast(tf.equal(y,pred),dtype=tf.int32)
        correct = tf.reduce_sum(correct)
        
        correct_num+=int(correct)
        total_num+=x.shape[0]
    acc = correct_num/total_num
            