# **数据增强、正则化、学习率衰减**
**以resnet为例**

数据增强

数据增强与数字图像处理有着相似之处，不妨以数字图像处理领域常用的美女蕾娜照片来研究

In [1]:
from PIL import Image
import requests
from io import BytesIO
response = requests.get('https://github.com/1083639122/pictures_data/blob/main/lenna.png?raw=true')
im = Image.open(BytesIO(response.content))
im

In [2]:
from torchvision import transforms as tfs

In [3]:
# 比例缩放
print('before scale, shape: {}'.format(im.size))
new_im = tfs.Resize((200, 200))(im)
print('after scale, shape: {}'.format(new_im.size))
new_im

In [4]:
# 随机裁剪出 100 x 100 的区域
random_im1 = tfs.RandomCrop(100)(im)
random_im1

In [5]:
# 中心裁剪出 100 x 100 的区域
center_im = tfs.CenterCrop(100)(im)
center_im

In [6]:
# 随机水平翻转
h_filp = tfs.RandomHorizontalFlip()(im)
h_filp

In [7]:
# 随机竖直翻转
v_flip = tfs.RandomVerticalFlip()(im)
v_flip

In [8]:
rot_im = tfs.RandomRotation(45)(im)
rot_im

In [9]:
# 亮度
bright_im = tfs.ColorJitter(brightness=1)(im) # 随机从 0 ~ 2 之间亮度变化，1 表示原图
bright_im

In [10]:
# 对比度
contrast_im = tfs.ColorJitter(contrast=1)(im) # 随机从 0 ~ 2 之间对比度变化，1 表示原图
contrast_im

In [11]:
# 颜色
color_im = tfs.ColorJitter(hue=0.5)(im) # 随机从 -0.5 ~ 0.5 之间对颜色变化
color_im

上面我们讲了这么图片增强的方法，其实这些方法都不是孤立起来用的，可以联合起来用，比如先做随机翻转，然后随机截取，再做对比度增强等等，torchvision 里面有个非常方便的函数能够将这些变化合起来，就是 torchvision.transforms.Compose()

In [12]:
im_aug = tfs.Compose([
    tfs.Resize(120),
    tfs.RandomHorizontalFlip(),
    tfs.RandomCrop(96),
    tfs.ColorJitter(brightness=0.5, contrast=0.5, hue=0.5)
])

In [13]:
import matplotlib.pyplot as plt
%matplotlib inline

In [14]:
nrows = 3
ncols = 3
figsize = (8, 8)
_, figs = plt.subplots(nrows, ncols, figsize=figsize)
for i in range(nrows):
    for j in range(ncols):
        figs[i][j].imshow(im_aug(im))
        figs[i][j].axes.get_xaxis().set_visible(False)
        figs[i][j].axes.get_yaxis().set_visible(False)
plt.show()

可以看到每次做完增强之后的图片都有一些变化，所以这就是我们前面讲的，增加了一些'新'数据

下面我们使用图像增强进行训练网络

In [15]:
import numpy as np
import torch
from torch import nn
from torch.autograd import Variable
import torch.nn.functional as F
from torchvision.datasets import mnist
from torch.utils.data import DataLoader

In [16]:
from datetime import datetime

def get_acc(output, label):
    total = output.shape[0]
    _, pred_label = output.max(1)
    num_correct = (pred_label == label).sum().data
    return num_correct / total


def train(net, train_data, valid_data, num_epochs, optimizer, criterion):
    if torch.cuda.is_available():
        net = net.cuda()
    prev_time = datetime.now()
    for epoch in range(num_epochs):
        train_loss = 0
        train_acc = 0
        net = net.train()
        for im, label in train_data:
            if torch.cuda.is_available():
                im = Variable(im.cuda())  # (bs, 3, h, w)
                label = Variable(label.cuda())  # (bs, h, w)
            else:
                im = Variable(im)
                label = Variable(label)
            # forward
            output = net(im)
            loss = criterion(output, label)
            # backward
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            train_loss += loss.data
            train_acc += get_acc(output, label)

        cur_time = datetime.now()
        h, remainder = divmod((cur_time - prev_time).seconds, 3600)
        m, s = divmod(remainder, 60)
        time_str = "Time %02d:%02d:%02d" % (h, m, s)
        if valid_data is not None:
            valid_loss = 0
            valid_acc = 0
            net = net.eval()
            for im, label in valid_data:
                if torch.cuda.is_available():
                    im = Variable(im.cuda())
                    label = Variable(label.cuda())
                else:
                    im = Variable(im)
                    label = Variable(label)
                output = net(im)
                loss = criterion(output, label)
                valid_loss += loss.data
                valid_acc += get_acc(output, label)
            epoch_str = (
                "Epoch %d. Train Loss: %f, Train Acc: %f, Valid Loss: %f, Valid Acc: %f, "
                % (epoch, train_loss / len(train_data),
                   train_acc / len(train_data), valid_loss / len(valid_data),
                   valid_acc / len(valid_data)))
        else:
            epoch_str = ("Epoch %d. Train Loss: %f, Train Acc: %f, " %
                         (epoch, train_loss / len(train_data),
                          train_acc / len(train_data)))
        prev_time = cur_time
        print(epoch_str + time_str)

In [17]:
# 使用数据增强
def train_tf(x):
    im_aug = tfs.Compose([
        tfs.Resize(120),
        tfs.RandomHorizontalFlip(),
        tfs.RandomCrop(96),
        tfs.ToTensor(),
        tfs.Normalize(0.5, 0.5)
    ])
    x = im_aug(x)
    return x

def test_tf(x):
    im_aug = tfs.Compose([
        tfs.Resize(96),
        tfs.ToTensor(),
        tfs.Normalize(0.5,  0.5)
    ])
    x = im_aug(x)
    return x


from torchvision.datasets import mnist # 导入 pytorch 内置的 mnist 数据
train_set = mnist.MNIST('./data', train=True, transform=train_tf,download=True) # 重新载入数据集，申明定义的数据变换
test_set = mnist.MNIST('./data', train=False, transform=test_tf,download=True)
train_data = torch.utils.data.DataLoader(train_set, batch_size=64, shuffle=True)
test_data = torch.utils.data.DataLoader(test_set, batch_size=128, shuffle=False)

查看一个batch数据的尺寸，便于网络的设计

In [18]:
x=torch.tensor([])
for i,j in train_data:
#     print(i[0].shape) 
#     print(i[0])
    x=i
    break
print(x.shape)

接下来设计实现一个 residual block

In [19]:
# 利用pytorch的卷积函数，固定kernel_size参数，以定义一个3*3的卷积函数
def conv3x3(in_channel, out_channel, stride=1):
    return nn.Conv2d(in_channel, out_channel, 3, stride=stride, padding=1, bias=False)

In [20]:
class residual_block(nn.Module):
    def __init__(self, in_channel, out_channel, same_shape=True):
        super(residual_block, self).__init__()
        
#         如果要保持tensor的shape不变，则卷积核的滑动步长改变为2，可达到一个通道数倍增，图像长宽减半的效果
        self.same_shape = same_shape
        stride=1 if self.same_shape else 2
        
        self.conv1 = conv3x3(in_channel, out_channel, stride=stride)
#         每次卷积后进行一次批标准化
        self.bn1 = nn.BatchNorm2d(out_channel)
        
        self.conv2 = conv3x3(out_channel, out_channel)
        self.bn2 = nn.BatchNorm2d(out_channel)
        if not self.same_shape:
            self.conv3 = nn.Conv2d(in_channel, out_channel, 1, stride=stride)
        
    def forward(self, x):
        out = self.conv1(x)
        out = F.relu(self.bn1(out), True)
        out = self.conv2(out)
        out = F.relu(self.bn2(out), True)
        
        if not self.same_shape:
            x = self.conv3(x)
        return F.relu(x+out, True)

ResNet是 residual block 模块的堆叠

In [21]:
class resnet(nn.Module):
    def __init__(self, in_channel, num_classes, verbose=False):
        super(resnet, self).__init__()
        self.verbose = verbose
        
        self.block1 = nn.Conv2d(in_channel, 32, 3,1)
        
        self.block2 = nn.Sequential(
            nn.MaxPool2d(2, 2),
            residual_block(32, 32),
            residual_block(32, 32)
        )
        
        self.block3 = nn.Sequential(
            residual_block(32, 64, False),
            residual_block(64, 64)
        )
        
        self.block4 = nn.Sequential(
            residual_block(64, 128, False),
            residual_block(128, 128)
        )
        
        self.block5 = nn.Sequential(
            residual_block(128, 256, False),
            residual_block(256, 256),
            nn.AvgPool2d(4)
        )
        
        self.classifier = nn.Linear(256, num_classes)
        
    def forward(self, x):
        x=x.float()
        x = self.block1(x)
        if self.verbose:
            print('block 1 output: {}'.format(x.shape))
        x = self.block2(x)
        if self.verbose:
            print('block 2 output: {}'.format(x.shape))
        x = self.block3(x)
        if self.verbose:
            print('block 3 output: {}'.format(x.shape))
        x = self.block4(x)
        if self.verbose:
            print('block 4 output: {}'.format(x.shape))
        x = self.block5(x)
        if self.verbose:
            print('block 5 output: {}'.format(x.shape))
        x = x.view(x.shape[0], -1)
        x = self.classifier(x)
        return x

In [22]:
net = resnet(1, 10)
optimizer = torch.optim.SGD(net.parameters(), lr=1e-1)
criterion = nn.CrossEntropyLoss()

In [23]:
train(net, train_data, test_data, 20, optimizer, criterion)

数据增强使得学习器训练难度加大了，同时在训练轮数比较少时，测试集准确率就已经很高了

这里由于数据集本身过于简单，不能明显体现出数据增强的效果

正则化

正则化是机器学习中提出来的一种方法，有 L1 和 L2 正则化，目前使用较多的是 L2 正则化，引入正则化相当于在 loss 函数上面加上一项，比如

$$
f = loss + \lambda \sum_{p \in params} ||p||_2^2
$$

在 pytorch 中正则项就是通过这种方式来加入的，比如想在随机梯度下降法中使用正则项，或者说权重衰减，`torch.optim.SGD(net.parameters(), lr=0.1, weight_decay=1e-3)` 就可以了，这个 `weight_decay` 系数就是上面公式中的 $\lambda$，非常方便

In [24]:
net = resnet(1, 10)
optimizer = torch.optim.SGD(net.parameters(), lr=1e-1, weight_decay=1e-3)
criterion = nn.CrossEntropyLoss()

In [26]:
train(net, train_data, test_data,5, optimizer, criterion)

学习率衰减

对于基于一阶梯度进行优化的方法而言，开始的时候更新的幅度是比较大的，也就是说开始的学习率可以设置大一点，但是当训练集的 loss 下降到一定程度之后，，使用这个太大的学习率就会导致 loss 一直来回震荡

这个时候就需要对学习率进行衰减已达到 loss 的充分下降，而是用学习率衰减的办法能够解决这个矛盾，学习率衰减就是随着训练的进行不断的减小学习率。


In [27]:
def set_learning_rate(optimizer, lr):
    for param_group in optimizer.param_groups:
        param_group['lr'] = lr

In [29]:
def train(net, train_data, valid_data, num_epochs, optimizer, criterion):
    if torch.cuda.is_available():
        net = net.cuda()
    prev_time = datetime.now()
    for epoch in range(num_epochs):
#         加入学习率衰减机制
        if epoch == 15:
            set_learning_rate(optimizer, 0.01) # 15次修改学习率为 0.01
        
        train_loss = 0
        train_acc = 0
        net = net.train()
        for im, label in train_data:
            if torch.cuda.is_available():
                im = Variable(im.cuda())  # (bs, 3, h, w)
                label = Variable(label.cuda())  # (bs, h, w)
            else:
                im = Variable(im)
                label = Variable(label)
            # forward
            output = net(im)
            loss = criterion(output, label)
            # backward
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            train_loss += loss.data
            train_acc += get_acc(output, label)

        cur_time = datetime.now()
        h, remainder = divmod((cur_time - prev_time).seconds, 3600)
        m, s = divmod(remainder, 60)
        time_str = "Time %02d:%02d:%02d" % (h, m, s)
        if valid_data is not None:
            valid_loss = 0
            valid_acc = 0
            net = net.eval()
            for im, label in valid_data:
                if torch.cuda.is_available():
                    im = Variable(im.cuda())
                    label = Variable(label.cuda())
                else:
                    im = Variable(im)
                    label = Variable(label)
                output = net(im)
                loss = criterion(output, label)
                valid_loss += loss.data
                valid_acc += get_acc(output, label)
            epoch_str = (
                "Epoch %d. Train Loss: %f, Train Acc: %f, Valid Loss: %f, Valid Acc: %f, "
                % (epoch, train_loss / len(train_data),
                   train_acc / len(train_data), valid_loss / len(valid_data),
                   valid_acc / len(valid_data)))
        else:
            epoch_str = ("Epoch %d. Train Loss: %f, Train Acc: %f, " %
                         (epoch, train_loss / len(train_data),
                          train_acc / len(train_data)))
        prev_time = cur_time
        print(epoch_str + time_str)

In [None]:
net = resnet(1, 10)
optimizer = torch.optim.SGD(net.parameters(), lr=1e-1, weight_decay=1e-3)
criterion = nn.CrossEntropyLoss()
train(net, train_data, test_data,20, optimizer, criterion)

在实际应用中，做学习率衰减之前应该经过充分的训练，然后再做学习率衰减得到更好的结果，有的时候甚至需要做多次学习率衰减
