# **RESNET34**

In [None]:
import tensorflow as tf
from tensorflow.keras import layers,optimizers,datasets,Sequential
import os
import numpy as np
import datetime
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' 
len(tf.config.experimental.list_physical_devices('GPU'))

## 数据读取

In [None]:
def preprocess(x,y):
    x=2 * tf.cast(x,dtype=tf.float32)/255.-1
    y=tf.cast(y,dtype=tf.int32)
    return x,y

(x,y),(x_test,y_test) = datasets.cifar100.load_data()
x = x.astype(np.float32)
x_test = x_test.astype(np.float32)
y=tf.squeeze(y,axis=1)
y_test=tf.squeeze(y_test,axis=1)

batch_size = 50

train_db=tf.data.Dataset.from_tensor_slices((x,y))
train_db=train_db.map(preprocess).batch(batch_size)

test_db=tf.data.Dataset.from_tensor_slices((x_test,y_test))
test_db=test_db.map(preprocess).batch(batch_size)

## 数据增强

In [None]:
def cutout_mask(img,label):
    length = 16
    img = img.numpy()
    label = label.numpy()
  
    batch_size,h,w,channel_num = img.shape
    for i in range(0,batch_size,2):
        y = np.random.randint(h)
        x = np.random.randint(w)

        y1 = np.clip(y - length // 2, 0, h)
        y2 = np.clip(y + length // 2, 0, h)
        x1 = np.clip(x - length // 2, 0, w)
        x2 = np.clip(x + length // 2, 0, w)
        
        img[i, y1: y2, x1: x2, :] = 0 
    
    return img, label

def mixup_mask(img,label):
    img = img.numpy()
    label = label.numpy()
    batch_size,h,w,channel_num = img.shape
    
    for i in range(0,batch_size,2):
        mixup_idx = np.random.randint(0,batch_size)
        lamda = np.random.uniform()
        img[i,:,:,:] = lamda*img[i,:,:,:] + (1-lamda) * img[mixup_idx,:,:,:]
        label[i,:] = lamda*label[i,:] + (1-lamda) * label[mixup_idx,:]
    return img,label

def cutmix_mask(img,label):
    length = 16
    img = img.numpy()
    label = label.numpy()
    batch_size,h,w,channel_num = img.shape
    for i in range(0,batch_size,2):
        mixup_idx = np.random.randint(0,batch_size)
        y = np.random.randint(h)
        x = np.random.randint(w)

        y1 = np.clip(y - length // 2, 0, h)
        y2 = np.clip(y + length // 2, 0, h)
        x1 = np.clip(x - length // 2, 0, w)
        x2 = np.clip(x + length // 2, 0, w)
        
        lamda = 1 - (y2-y1)*(x2 - x1)/(h*w)
        img[i, y1: y2, x1: x2, :] = img[mixup_idx, y1: y2, x1: x2, :]
        label[i,:] = lamda*label[i,:] + (1-lamda) * label[mixup_idx,:]
    return img,label

## ResNet构建

In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers,Sequential

class BasicBlock(layers.Layer):
    #初始化函数
    #filter_num:理解为卷积核通道的数目，也就是channel的通道数
    #stride = 1意味着对图片不进行采样
    def __init__(self,filter_num,strides=1):
        #调用母类的初始化方法
        super(BasicBlock,self).__init__()
        #filter_num：卷积核通道的数目.(3,3):卷积核的size
        #padding='same'如果stride等于1，那么输出等于输入。
        #如果stride大于等于2的话，padding=same,会自动补全，
        # 如果等于2的话，输入是32x32,可能输出是14x14,那么如果padding=same
        #会padding输入的大小，使得输出是16x16


        self.conv1=layers.Conv2D(filter_num,(3,3),strides=strides,padding='same')
        self.bn1=layers.BatchNormalization()
        #非线性激活函数
        self.relu=layers.Activation('relu')

        #那么这里设置stride=1,就始终保持一样
        self.conv2=layers.Conv2D(filter_num,(3,3),strides=1,padding='same')
        self.bn2=layers.BatchNormalization()

        if strides != 1:
            #下采样
            self.downsample=Sequential()
            self.downsample.add(layers.Conv2D(filter_num,(1,1),strides=strides))
        else:
            self.downsample=lambda x:x



    def call(self,inputs,training=None):
        #[b,h,w,c]
        out=self.conv1(inputs)
        out=self.bn1(out)
        out=self.relu(out)

        out=self.conv2(out)
        out=self.bn2(out)

        identify=self.downsample(inputs)
        output=layers.add([out,identify])
        #使用tf的函数功能
        output=tf.nn.relu(output)

        return output


class ResNet(keras.Model):
    def __init__(self,layer_dims,num_classes=100):
        #layer_dims:resnet18里面有[2,2,2,2]，也就是四个resblock
        #这里指定了一共有多少个resblock层，每个层有多少个basicblock
        #后面在设置blocks的数量的时候，就是用的这里的层的个数
      #一个resblock里面包含了两层basicblock
        #num_classes = 100:就是我们设置的输出的类的个数
        super(ResNet, self).__init__()

        #实现预处理层
        self.stem=Sequential([layers.Conv2D(64,(3,3),strides=(1,1)),
                              layers.BatchNormalization(),
                              layers.Activation('relu'),
                              #layers.MaxPool2D(pool_size=(2,2),strides=(1,1),padding='same')
                              ])
        #创建4个res_block
        #这里blocks的数量是layer_dims[0]
        #这里创建的四个res_block与前面的layer_dims:[2,2,2,2]对应
        #将stride设置为2是为了让feature_size越来越小
        self.layer1=self.build_resblock(64,layer_dims[0])
        self.layer2=self.build_resblock(128,layer_dims[1],strides=2)
        self.layer3=self.build_resblock(256,layer_dims[2],strides=2)
        self.layer4=self.build_resblock(512,layer_dims[3],strides=2)


        #out:[b,512,h,w]
        #经过运算之后不能得到h和w的值，
        #使用自适应的方法得到h,w
        #GlobalAveragePooling2D:就是不管你的长和宽是多少
        #会在某个channel上面的长和宽加起来，取均值
        self.avgpool=layers.GlobalAveragePooling2D()
        #创建全连接层
        #这里的Dense是用来分类的,这里输出是之前输出的类别，num_classes
        #self.flatten = layers.Flatten()
        self.fc=layers.Dense(num_classes,activation = 'softmax')



    def call(self,inputs,training=None):
        #完成前向运算过程
        x = self.stem(inputs)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        #这里已经变成[b,c]的shape,不需要reshape了
        x=self.avgpool(x)
        #这里输出是[b,100]
        x=self.fc(x)

        return x



    def build_resblock(self,filter_num,blocks,strides=1):
        res_blocks=Sequential()
        #添加第一层basicblock
        #可能有下采样的功能的
        res_blocks.add(BasicBlock(filter_num,strides))
        #但是对于后面的basicblock不让有下采样功能
        #从1开始，一直到blocks个
        for _ in range(1,blocks):
            #这样只会在第一个下采样，后面的不在下采样，保持shape不变
            res_blocks.add(BasicBlock(filter_num,strides=1))
        return res_blocks

def resnet18():
    return ResNet([2,2,2,2])

def resnet34():
    return ResNet([3, 4, 6, 3])  #4个Res Block，第1个包含3个Basic Block,第2为4，第3为6，第4为3


## 模型构建

In [None]:
model = resnet34()
model.build(input_shape=(None,32,32,3))
current_time = datetime.datetime.now().strftime("%Y%m%d-%H%M%S") + '_baseline'
train_log_dir = 'logs/gradient_tape/' + current_time + '/train'
test_log_dir = 'logs/gradient_tape/' + current_time + '/test'
train_summary_writer = tf.summary.create_file_writer(train_log_dir)
test_summary_writer = tf.summary.create_file_writer(test_log_dir)

## 训练

In [None]:
lr = 1e-3
optimizer=optimizers.Adam(lr=lr)

train_loss = tf.keras.metrics.Mean('train_loss', dtype=tf.float32)
train_accuracy = tf.keras.metrics.Mean('train_accuracy', dtype=tf.float32)
test_loss = tf.keras.metrics.Mean('test_loss', dtype=tf.float32)
test_accuracy = tf.keras.metrics.Mean('test_accuracy', dtype=tf.float32)

for epoch in range(50):
#     if epoch % 5 == 4:
#         lr/=10
#         optimizer.lr = lr
    for step,(x,y) in enumerate(train_db):
        #这里做一个前向循环,将需要求解梯度放进来
        with tf.GradientTape() as tape:
            y_onehot=tf.one_hot(y,depth=100)
            logits=model(x)
            #compute loss
            loss=tf.losses.categorical_crossentropy(y_onehot,logits)
            loss=tf.reduce_mean(loss)
            
        pred=tf.argmax(logits,axis=1)
        pred=tf.cast(pred,dtype=tf.int32)
        correct=tf.cast(tf.equal(pred,y),dtype=tf.int32)
        correct=tf.reduce_sum(correct)
        acc = correct/x.shape[0]
        
        train_loss(loss)
        train_accuracy(acc)
        #计算gradient
        grads=tape.gradient(loss,model.trainable_variables)
        #传给优化器两个参数：grads和variable，完成梯度更新
        optimizer.apply_gradients(zip(grads,model.trainable_variables))

        if step % 100 == 0:
            print(epoch,step,'losses:',float(loss))
            
    
    with train_summary_writer.as_default():
        tf.summary.scalar('loss', train_loss.result(), step=epoch)
        tf.summary.scalar('accuracy', train_accuracy.result(), step=epoch)
        
    total_num=0
    total_correct=0
    for x,y in test_db:
        logits=model(x)
        y_onehot = tf.one_hot(y,depth = 100)
        loss=tf.losses.categorical_crossentropy(y_onehot,logits)
        loss=tf.reduce_mean(loss)
        
        #prob=tf.nn.softmax(logits,axis=1)
        pred=tf.argmax(logits,axis=1)
        pred=tf.cast(pred,dtype=tf.int32)
        correct=tf.cast(tf.equal(pred,y),dtype=tf.int32)
        correct=tf.reduce_sum(correct)

        total_num += x.shape[0]
        total_correct += int(correct)
        
        test_accuracy(correct/x.shape[0])
        test_loss(loss)
    
    with test_summary_writer.as_default():
        tf.summary.scalar('loss', test_loss.result(), step=epoch)
        tf.summary.scalar('accuracy', test_accuracy.result(), step=epoch)
            

    template = 'Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, Test Accuracy: {}'
    print(template.format(epoch+1,
                         train_loss.result(), 
                         train_accuracy.result()*100,
                         test_loss.result(), 
                         test_accuracy.result()*100))

    
    train_loss.reset_states()
    test_loss.reset_states()
    train_accuracy.reset_states()
    test_accuracy.reset_states()

model.save_weights('baseline.h5')

## CUTOUT

In [None]:
model = resnet34()
model.build(input_shape=(None,32,32,3))
current_time = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")+'_cutout'
train_log_dir = 'logs/gradient_tape/' + current_time + '/train'
test_log_dir = 'logs/gradient_tape/' + current_time + '/test'
train_summary_writer = tf.summary.create_file_writer(train_log_dir)
test_summary_writer = tf.summary.create_file_writer(test_log_dir)

model.build(input_shape=(None,32,32,3))
lr = 1e-4
optimizer=optimizers.Adam(lr=lr)

train_loss = tf.keras.metrics.Mean('train_loss', dtype=tf.float32)
train_accuracy = tf.keras.metrics.Mean('train_accuracy', dtype=tf.float32)
test_loss = tf.keras.metrics.Mean('test_loss', dtype=tf.float32)
test_accuracy = tf.keras.metrics.Mean('test_accuracy', dtype=tf.float32)

for epoch in range(50):
    # if epoch % 5 == 4:
    #     lr/=10
    #     optimizer.lr = lr
    for step,(x,y) in enumerate(train_db):
        #这里做一个前向循环,将需要求解梯度放进来
        with tf.GradientTape() as tape:
            y_onehot=tf.one_hot(y,depth=100)
            x, y_onehot = cutout_mask(x,y_onehot)
            #[b,32,32,3] => [b,100]
            logits=model(x)
            #[b] => [b,100]
            #compute loss
            loss=tf.losses.categorical_crossentropy(y_onehot,logits)
    
            loss=tf.reduce_mean(loss)
            
        pred=tf.argmax(logits,axis=1)
        pred=tf.cast(pred,dtype=tf.int32)
        correct=tf.cast(tf.equal(pred,y),dtype=tf.int32)
        correct=tf.reduce_sum(correct)
        acc = correct/x.shape[0]
        
        train_loss(loss)
        train_accuracy(acc)
        #计算gradient
        grads=tape.gradient(loss,model.trainable_variables)
        #传给优化器两个参数：grads和variable，完成梯度更新
        optimizer.apply_gradients(zip(grads,model.trainable_variables))

        if step % 100 == 0:
            print(epoch,step,'losses:',float(loss))
            
    
    with train_summary_writer.as_default():
        tf.summary.scalar('loss', train_loss.result(), step=epoch)
        tf.summary.scalar('accuracy', train_accuracy.result(), step=epoch)
        
    total_num=0
    total_correct=0
    for x,y in test_db:
        logits=model(x)
        y_onehot = tf.one_hot(y,depth = 100)
        loss=tf.losses.categorical_crossentropy(y_onehot,logits)
        loss=tf.reduce_mean(loss)
        
        #prob=tf.nn.softmax(logits,axis=1)
        pred=tf.argmax(logits,axis=1)
        pred=tf.cast(pred,dtype=tf.int32)
        correct=tf.cast(tf.equal(pred,y),dtype=tf.int32)
        correct=tf.reduce_sum(correct)

        total_num += x.shape[0]
        total_correct += int(correct)
        
        test_accuracy(correct/x.shape[0])
        test_loss(loss)
    
    with test_summary_writer.as_default():
        tf.summary.scalar('loss', test_loss.result(), step=epoch)
        tf.summary.scalar('accuracy', test_accuracy.result(), step=epoch)
            

    template = 'Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, Test Accuracy: {}'
    print(template.format(epoch+1,
                         train_loss.result(), 
                         train_accuracy.result()*100,
                         test_loss.result(), 
                         test_accuracy.result()*100))

    
    train_loss.reset_states()
    test_loss.reset_states()
    train_accuracy.reset_states()
    test_accuracy.reset_states()

model.save_weights('cutout.h5')

## MIXUP

In [None]:
model = resnet34()
model.build(input_shape=(None,32,32,3))
current_time = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")+'_mixup'
train_log_dir = 'logs/gradient_tape/' + current_time + '/train'
test_log_dir = 'logs/gradient_tape/' + current_time + '/test'
train_summary_writer = tf.summary.create_file_writer(train_log_dir)
test_summary_writer = tf.summary.create_file_writer(test_log_dir)

model.build(input_shape=(None,32,32,3))
lr = 1e-4
optimizer=optimizers.Adam(lr=lr)

train_loss = tf.keras.metrics.Mean('train_loss', dtype=tf.float32)
train_accuracy = tf.keras.metrics.Mean('train_accuracy', dtype=tf.float32)
test_loss = tf.keras.metrics.Mean('test_loss', dtype=tf.float32)
test_accuracy = tf.keras.metrics.Mean('test_accuracy', dtype=tf.float32)

for epoch in range(50):
    # if epoch % 5 == 4:
    #     lr/=10
    #     optimizer.lr = lr
    for step,(x,y) in enumerate(train_db):
        #这里做一个前向循环,将需要求解梯度放进来
        with tf.GradientTape() as tape:
            y_onehot=tf.one_hot(y,depth=100)
            x, y_onehot = mixup_mask(x,y_onehot)
            #[b,32,32,3] => [b,100]
            logits=model(x)
            #[b] => [b,100]
            #compute loss
            loss=tf.losses.categorical_crossentropy(y_onehot,logits)
    
            loss=tf.reduce_mean(loss)
            
        pred=tf.argmax(logits,axis=1)
        pred=tf.cast(pred,dtype=tf.int32)
        correct=tf.cast(tf.equal(pred,y),dtype=tf.int32)
        correct=tf.reduce_sum(correct)
        acc = correct/x.shape[0]
        
        train_loss(loss)
        train_accuracy(acc)
        #计算gradient
        grads=tape.gradient(loss,model.trainable_variables)
        #传给优化器两个参数：grads和variable，完成梯度更新
        optimizer.apply_gradients(zip(grads,model.trainable_variables))

        if step % 100 == 0:
            print(epoch,step,'losses:',float(loss))
            
    
    with train_summary_writer.as_default():
        tf.summary.scalar('loss', train_loss.result(), step=epoch)
        tf.summary.scalar('accuracy', train_accuracy.result(), step=epoch)
        
    total_num=0
    total_correct=0
    for x,y in test_db:
        logits=model(x)
        y_onehot = tf.one_hot(y,depth = 100)
        loss=tf.losses.categorical_crossentropy(y_onehot,logits)
        loss=tf.reduce_mean(loss)
        
        #prob=tf.nn.softmax(logits,axis=1)
        pred=tf.argmax(logits,axis=1)
        pred=tf.cast(pred,dtype=tf.int32)
        correct=tf.cast(tf.equal(pred,y),dtype=tf.int32)
        correct=tf.reduce_sum(correct)

        total_num += x.shape[0]
        total_correct += int(correct)
        
        test_accuracy(correct/x.shape[0])
        test_loss(loss)
    
    with test_summary_writer.as_default():
        tf.summary.scalar('loss', test_loss.result(), step=epoch)
        tf.summary.scalar('accuracy', test_accuracy.result(), step=epoch)
            

    template = 'Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, Test Accuracy: {}'
    print(template.format(epoch+1,
                         train_loss.result(), 
                         train_accuracy.result()*100,
                         test_loss.result(), 
                         test_accuracy.result()*100))

    
    train_loss.reset_states()
    test_loss.reset_states()
    train_accuracy.reset_states()
    test_accuracy.reset_states()

model.save_weights('mixup.h5')

## CUTMIX

In [None]:
model = resnet34()
model.build(input_shape=(None,32,32,3))
current_time = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")+'_cutmix'
train_log_dir = 'logs/gradient_tape/' + current_time + '/train'
test_log_dir = 'logs/gradient_tape/' + current_time + '/test'
train_summary_writer = tf.summary.create_file_writer(train_log_dir)
test_summary_writer = tf.summary.create_file_writer(test_log_dir)

model.build(input_shape=(None,32,32,3))
lr = 1e-4
optimizer=optimizers.Adam(lr=lr)

train_loss = tf.keras.metrics.Mean('train_loss', dtype=tf.float32)
train_accuracy = tf.keras.metrics.Mean('train_accuracy', dtype=tf.float32)
test_loss = tf.keras.metrics.Mean('test_loss', dtype=tf.float32)
test_accuracy = tf.keras.metrics.Mean('test_accuracy', dtype=tf.float32)

for epoch in range(50):
    # if epoch % 5 == 4:
    #     lr/=10
    #     optimizer.lr = lr
    for step,(x,y) in enumerate(train_db):
        #这里做一个前向循环,将需要求解梯度放进来
        with tf.GradientTape() as tape:
            y_onehot=tf.one_hot(y,depth=100)
            x, y_onehot = cutmix_mask(x,y_onehot)
            #[b,32,32,3] => [b,100]
            logits=model(x)
            #[b] => [b,100]
            #compute loss
            loss=tf.losses.categorical_crossentropy(y_onehot,logits)
    
            loss=tf.reduce_mean(loss)
            
        pred=tf.argmax(logits,axis=1)
        pred=tf.cast(pred,dtype=tf.int32)
        correct=tf.cast(tf.equal(pred,y),dtype=tf.int32)
        correct=tf.reduce_sum(correct)
        acc = correct/x.shape[0]
        
        train_loss(loss)
        train_accuracy(acc)
        #计算gradient
        grads=tape.gradient(loss,model.trainable_variables)
        #传给优化器两个参数：grads和variable，完成梯度更新
        optimizer.apply_gradients(zip(grads,model.trainable_variables))

        if step % 100 == 0:
            print(epoch,step,'losses:',float(loss))
            
    
    with train_summary_writer.as_default():
        tf.summary.scalar('loss', train_loss.result(), step=epoch)
        tf.summary.scalar('accuracy', train_accuracy.result(), step=epoch)
        
    total_num=0
    total_correct=0
    for x,y in test_db:
        logits=model(x)
        y_onehot = tf.one_hot(y,depth = 100)
        loss=tf.losses.categorical_crossentropy(y_onehot,logits)
        loss=tf.reduce_mean(loss)
        
        #prob=tf.nn.softmax(logits,axis=1)
        pred=tf.argmax(logits,axis=1)
        pred=tf.cast(pred,dtype=tf.int32)
        correct=tf.cast(tf.equal(pred,y),dtype=tf.int32)
        correct=tf.reduce_sum(correct)

        total_num += x.shape[0]
        total_correct += int(correct)
        
        test_accuracy(correct/x.shape[0])
        test_loss(loss)
    
    with test_summary_writer.as_default():
        tf.summary.scalar('loss', test_loss.result(), step=epoch)
        tf.summary.scalar('accuracy', test_accuracy.result(), step=epoch)
            

    template = 'Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, Test Accuracy: {}'
    print(template.format(epoch+1,
                         train_loss.result(), 
                         train_accuracy.result()*100,
                         test_loss.result(), 
                         test_accuracy.result()*100))

    
    train_loss.reset_states()
    test_loss.reset_states()
    train_accuracy.reset_states()
    test_accuracy.reset_states()

model.save_weights('cutmix.h5')