# 在预训练resnet18模型中加入CBAM注意力机制进而到数据增广的cifar10上微调实现精度较高的分类任务

In [1]:
! pip install d2l

In [2]:
%matplotlib inline
import os
import torch
import torchvision
from torch import nn
from d2l import torch as d2l
os.environ["KMP_DUPLICATE_LIB_OK"]="TRUE"
import torch.nn.functional as F

In [3]:
finetune_net = torchvision.models.resnet18(pretrained=True)
b0 = nn.Linear(1000, 256)
b1 = nn.ReLU()
b2 = nn.Linear(256,10)
finetune_net.add_module("fc0",b0)
finetune_net.add_module("fc1",b1)
finetune_net.add_module("fc2",b2)
nn.init.xavier_uniform_(finetune_net.fc0.weight)
nn.init.xavier_uniform_(finetune_net.fc2.weight)
print(finetune_net)

- 加入CBAM注意力机制

In [4]:
class CALayer(nn.Module):  # Channel Attention (CA) Layer
    def __init__(self, in_channels, reduction=16, pool_types=['avg', 'max']):
        super().__init__()
        self.pool_list = ['avg', 'max']
        self.pool_types = pool_types
        self.in_channels = in_channels
        self.Pool = [nn.AdaptiveAvgPool2d(
            1), nn.AdaptiveMaxPool2d(1, return_indices=False)]
        self.conv_ca = nn.Sequential(
            nn.Conv2d(in_channels, in_channels //
                      reduction, 1, padding=0, bias=True),
            nn.ReLU(inplace=True),
            nn.Conv2d(in_channels // reduction,
                      in_channels, 1, padding=0, bias=True)
        )

    def forward(self, x):
        for (i, pool_type) in enumerate(self.pool_types):
            pool = self.Pool[self.pool_list.index(pool_type)](x)
            channel_att_raw = self.conv_ca(pool)
            if i == 0:
                channel_att_sum = channel_att_raw
            else:
                channel_att_sum += channel_att_raw
        scale = F.sigmoid(channel_att_sum)
        return x * scale


class SALayer(nn.Module):  # Spatial Attention Layer
    def __init__(self):
        super().__init__()
        self.conv_sa = nn.Sequential(
            nn.Conv2d(2, 1, 3, 1, 1, bias=False),
            nn.BatchNorm2d(1, momentum=0.01),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        x_compress = torch.cat(
            (torch.max(x, 1, keepdim=True)[0], torch.mean(x, dim=1, keepdim=True)), dim=1)
        scale = self.conv_sa(x_compress)
        return x * scale


class CBAM(nn.Module):
    def __init__(self, in_channels, reduction=2, pool_types=['avg', 'max']):
        super().__init__()
        self.CALayer = CALayer(
            in_channels, reduction, pool_types)
        self.SALayer = SALayer()

    def forward(self, x):
        x_out = self.CALayer(x)
        x_out = self.SALayer(x_out)
        return x_out

In [5]:
finetune_net.add_module("CBAM",CBAM(10))

In [6]:
# 使用RGB通道的均值和标准差，以标准化每个通道
normalize = torchvision.transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
# Normalize a tensor image with mean and standard deviation. 
# This transform does not support PIL Image. Given mean: (mean[1],...,mean[n]) and std: (std[1],..,std[n]) for n channels, this transform will normalize each channel of the input torch.*Tensor i.e., output[channel] = (input[channel] - mean[channel]) / std[channel]
train_augs = torchvision.transforms.Compose([
    torchvision.transforms.Resize(256),
    torchvision.transforms.RandomResizedCrop(224),
    torchvision.transforms.RandomHorizontalFlip(),
    torchvision.transforms.ToTensor(),normalize])

test_augs = torchvision.transforms.Compose([
    torchvision.transforms.Resize(256),
    torchvision.transforms.CenterCrop(224),
    torchvision.transforms.ToTensor(),normalize])

In [7]:
def load_cifar10(is_train, augs, batch_size):
    dataset = torchvision.datasets.CIFAR10(root="./data", train=is_train,
                                           transform=augs, download=True)
    dataloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size,
                    shuffle=is_train, num_workers=d2l.get_dataloader_workers())
    return dataloader

In [8]:
#@save
def train_batch_ch13(net, X, y, loss, trainer, devices):
    """用多GPU进行小批量训练"""
    if isinstance(X, list):
        # 微调BERT中所需（稍后讨论）
        X = [x.to(devices[0]) for x in X]
    else:
        X = X.to(devices[0])
    y = y.to(devices[0])
    net.train()
    trainer.zero_grad()
    pred = net(X)
    l = loss(pred, y)
    l.sum().backward()
    trainer.step()
    train_loss_sum = l.sum()
    train_acc_sum = d2l.accuracy(pred, y)
    return train_loss_sum, train_acc_sum

#@save
def train_ch13(net, train_iter, test_iter, loss, trainer, num_epochs,devices=d2l.try_all_gpus()):
    """用多GPU进行模型训练"""
    timer, num_batches = d2l.Timer(), len(train_iter)
    animator = d2l.Animator(xlabel='epoch', xlim=[1, num_epochs], ylim=[0, 1],
                            legend=['train loss', 'train acc', 'test acc'])
    net = nn.DataParallel(net, device_ids=devices).to(devices[0])
    scheduler = torch.optim.lr_scheduler.StepLR(trainer, 4, 0.9)#加入调学习率的优化器
    for epoch in range(num_epochs):
        # 4个维度：储存训练损失，训练准确度，实例数，特点数
        metric = d2l.Accumulator(4)
        for i, (features, labels) in enumerate(train_iter):
            timer.start()
            l, acc = train_batch_ch13(net, features, labels, loss, trainer, devices)
            metric.add(l, acc, labels.shape[0], labels.numel())
            timer.stop()
            if (i + 1) % (num_batches // 5) == 0 or i == num_batches - 1:
                animator.add(epoch + (i + 1) / num_batches,
                             (metric[0] / metric[2], metric[1] / metric[3],
                              None))
        test_acc = d2l.evaluate_accuracy_gpu(net, test_iter)
        animator.add(epoch + 1, (None, None, test_acc))
        scheduler.step()
    print(f'loss {metric[0] / metric[2]:.3f}, train acc '
          f'{metric[1] / metric[3]:.3f}, test acc {test_acc:.3f}')
    print(f'{metric[2] * num_epochs / timer.sum():.1f} examples/sec on '
          f'{str(devices)}')

In [9]:
def train_fine_tuning(net, learning_rate, batch_size=512, num_epochs=25,
                      param_group=True):
    train_iter = load_cifar10(True, train_augs, batch_size)
    test_iter = load_cifar10(False, test_augs, batch_size)
    devices = d2l.try_all_gpus()
    loss = nn.CrossEntropyLoss(reduction="none")
    if param_group:
        params_1x = [param for name, param in net.named_parameters() 
                     if name not in ["fc0.weight", "fc0.bias",
                                     "fc1.weight", "fc1.bias",
                                     "fc2.weight", "fc2.bias",
                             "CBAM.CALayer.conv_ca.0.weight","CBAM.CALayer.conv_ca.0.bias",
                             "CBAM.CALayer.conv_ca.2.weight","CBAM.CALayer.conv_ca.2.bias",
                             "CBAM.SALayer.conv_sa.0.weight",
                             "CBAM.SALayer.conv_sa.1.weight",
                             "CBAM.SALayer.conv_sa.1.bias"]
                    ]
        trainer = torch.optim.SGD([{'params': params_1x},
                                   {'params': net.fc0.parameters(),'lr': learning_rate * 40},
                                   {'params': net.fc1.parameters(),'lr': learning_rate * 30},
                                   {'params': net.fc2.parameters(),'lr': learning_rate * 20},
                                   {'params': net.CBAM.parameters(),'lr': learning_rate * 10}
                                  ],
                                lr=learning_rate, weight_decay=0.001)
    else:
        trainer = torch.optim.SGD(net.parameters(), lr=learning_rate,
                                  weight_decay=0.001)
    d2l.train_ch13(net, train_iter, test_iter, loss, trainer, num_epochs,
                   devices)

In [10]:
train_fine_tuning(finetune_net, 5e-5)

In [11]:
# scratch_net = torchvision.models.resnet18()
# scratch_net.fc = nn.Linear(scratch_net.fc.in_features, 10)
# train_fine_tuning(scratch_net, 5e-3, param_group=False)

# Runing on Kaggle

- The First

![result1](Cifar10_Result1.png)
![result2](Cifar10_Result2.png)

- The second

![result3](Cifar10_Result3.png)