# ResNet

到最后了还是忍不住想用ResNet试一下，因为ResNet给人的美感太强了，虽然自己已经交作业了，不过还是很想看看ResNet的效果。这个notebook估计自己想这样做，先自己从头训练一下看看效果，再用预训练的模型试一下。

## 从头开始的ResNet

In [1]:
import torch
from torch import nn
from torch.nn import functional as F

### 定义残差块

这里借鉴了李沐老师的动手学习深度学习的代码，架构如下：

![resnet](resnet-block.svg)


In [2]:
class Residual(nn.Module):  #@save
    def __init__(self, input_channels, num_channels,
                 use_1x1conv=False, strides=1):
        super().__init__()
        self.conv1 = nn.Conv2d(input_channels, num_channels,
                               kernel_size=3, padding=1, stride=strides)
        self.conv2 = nn.Conv2d(num_channels, num_channels,
                               kernel_size=3, padding=1)
        if use_1x1conv:
            self.conv3 = nn.Conv2d(input_channels, num_channels,
                                   kernel_size=1, stride=strides)
        else:
            self.conv3 = None
        self.bn1 = nn.BatchNorm2d(num_channels)
        self.bn2 = nn.BatchNorm2d(num_channels)

    def forward(self, X):
        Y = F.relu(self.bn1(self.conv1(X)))
        Y = self.bn2(self.conv2(Y))
        if self.conv3:
            X = self.conv3(X)
        Y += X
        return F.relu(Y)

#### 残差块小测试

我们测试一下输入输出的维度是怎么样的：

In [3]:
X = torch.rand((4, 3, 6, 6))
resBlock = Residual(3, 3)
resBlock(X).shape # torch.Size([4, 3, 6, 6]) # same shape

torch.Size([4, 3, 6, 6])

In [4]:
resBlock = Residual(3, 6, use_1x1conv=True, strides=2)
resBlock(X).shape # torch.Size([4, 6, 3, 3]) # 1/2 shape

torch.Size([4, 6, 3, 3])

复习一下`nn.Conv2d`的输入输出大小计算公式

$$
\text{out}(N_i, C_{\text{out}_j}) = \text{bias}(C_{\text{out}_j}) +
        \sum_{k = 0}^{C_{\text{in}} - 1} \text{weight}(C_{\text{out}_j}, k) \star \text{input}(N_i, k)
$$

$$
H_{out} = \left\lfloor\frac{H_{in}  + 2 \times \text{padding}[0] - \text{dilation}[0]
                        \times (\text{kernel\_size}[0] - 1) - 1}{\text{stride}[0]} + 1\right\rfloor 
$$

$$
W_{out} = \left\lfloor\frac{W_{in}  + 2 \times \text{padding}[1] - \text{dilation}[1]
                        \times (\text{kernel\_size}[1] - 1) - 1}{\text{stride}[1]} + 1\right\rfloor
$$

In [5]:
nn.Conv2d(3, 3, kernel_size=3, padding=1, stride=1)(X).shape # torch.Size([4, 3, 6, 6]) # same shape

torch.Size([4, 3, 6, 6])

In [6]:
nn.Conv2d(3, 3, kernel_size=3, padding=1, stride=2)(X).shape # torch.Size([4, 3, 3, 3]) # 1/2 shape

torch.Size([4, 3, 3, 3])

### 定义ResNet的block

ResNet在第一个block的时候是和谷歌的Inception是一样的，不同的是ResNet在卷积层之后加了一个BatchNorm层。

In [19]:
block1 = nn.Sequential(nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3),
                          nn.BatchNorm2d(64), nn.ReLU(),
                            nn.MaxPool2d(kernel_size=3, stride=2, padding=1))

In [20]:
X = torch.rand((4, 3, 224, 224))
block1(X).shape # torch.Size([4, 64, 56, 56])

torch.Size([4, 64, 56, 56])

In [21]:
# 定义除了block1之外的block网络结构
def resnet_block(input_channels, num_channels, num_residuals,
                 first_block=False):
    blk = []
    for i in range(num_residuals):
        if i == 0 and not first_block:
            blk.append(Residual(input_channels, num_channels,
                                use_1x1conv=True, strides=2))
        else:
            blk.append(Residual(num_channels, num_channels))
    return blk

In [22]:
# 接着在ResNet加入所有残差块，这里每个模块使用2个残差块。这样我们下面每个模块就有4个卷积层，下面就一共16个卷积层
block2 = nn.Sequential(*resnet_block(64, 64, 2, first_block=True))
block3 = nn.Sequential(*resnet_block(64, 128, 2))
block4 = nn.Sequential(*resnet_block(128, 256, 2))
block5 = nn.Sequential(*resnet_block(256, 512, 2))

#### 残差block小测试

In [23]:
X = torch.rand((4, 64, 56, 56))
block2(X).shape # torch.Size([4, 64, 56, 56])

torch.Size([4, 64, 56, 56])

In [24]:
block3(X).shape # torch.Size([4, 128, 28, 28])

torch.Size([4, 128, 28, 28])

In [25]:
X = torch.rand((4, 128, 28, 28))
block4(X).shape # torch.Size([4, 256, 14, 14])

torch.Size([4, 256, 14, 14])

In [26]:
X = torch.rand((4, 256, 14, 14))
block5(X).shape # torch.Size([4, 512, 7, 7])

torch.Size([4, 512, 7, 7])

In [27]:
X = torch.rand((4, 512, 7, 7))
nn.AdaptiveAvgPool2d((1, 1))(X).shape # torch.Size([4, 512, 1, 1])

torch.Size([4, 512, 1, 1])

### 定义ResNet18

In [28]:
ResNet18 = nn.Sequential(block1, block2, block3, block4, block5,
                          nn.AdaptiveAvgPool2d((1, 1)),
                            nn.Flatten(),
                            nn.Linear(512, 10))

In [29]:
X = torch.rand(size=(1, 3, 224, 224))
for layer in ResNet18:
    X = layer(X)
    print(layer.__class__.__name__,'output shape:\t', X.shape)

Sequential output shape:	 torch.Size([1, 64, 56, 56])
Sequential output shape:	 torch.Size([1, 64, 56, 56])
Sequential output shape:	 torch.Size([1, 128, 28, 28])
Sequential output shape:	 torch.Size([1, 256, 14, 14])
Sequential output shape:	 torch.Size([1, 512, 7, 7])
AdaptiveAvgPool2d output shape:	 torch.Size([1, 512, 1, 1])
Flatten output shape:	 torch.Size([1, 512])
Linear output shape:	 torch.Size([1, 10])


In [30]:
from torchinfo import summary
summary(ResNet18, input_size=(1, 3, 224, 224))

Layer (type:depth-idx)                   Output Shape              Param #
Sequential                               [1, 10]                   --
├─Sequential: 1-1                        [1, 64, 56, 56]           --
│    └─Conv2d: 2-1                       [1, 64, 112, 112]         9,472
│    └─BatchNorm2d: 2-2                  [1, 64, 112, 112]         128
│    └─ReLU: 2-3                         [1, 64, 112, 112]         --
│    └─MaxPool2d: 2-4                    [1, 64, 56, 56]           --
├─Sequential: 1-2                        [1, 64, 56, 56]           --
│    └─Residual: 2-5                     [1, 64, 56, 56]           --
│    │    └─Conv2d: 3-1                  [1, 64, 56, 56]           36,928
│    │    └─BatchNorm2d: 3-2             [1, 64, 56, 56]           128
│    │    └─Conv2d: 3-3                  [1, 64, 56, 56]           36,928
│    │    └─BatchNorm2d: 3-4             [1, 64, 56, 56]           128
│    └─Residual: 2-6                     [1, 64, 56, 56]           --
│

## 训练过程

In [None]:
learning_rate = 0.01
batch_size = 128
num_epochs = 5

### 加载数据集及可视化

In [None]:
import torch
import torchvision
import torchvision.transforms as transforms

# 定义 transform，包括缩放、中心裁剪、随机水平翻转、归一化
transform_train = transforms.Compose(
    [
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.RandomHorizontalFlip(p=0.5),
        transforms.ToTensor(),
        transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))
    ]
)

# 只需要归一化和中心裁剪
transform_test = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))
])

# 加载 CIFAR10 数据集
trainset = torchvision.datasets.CIFAR10(root='./data', train=True,download=True, transform=transform_train)
testset = torchvision.datasets.CIFAR10(root='./data', train=False,download=True, transform=transform_test)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size,shuffle=True, num_workers=2)
testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size,shuffle=False, num_workers=2)

### 定义训练函数

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'

### 下面训练和验证的代码部分参考了微软家的pytorch教程
# https://learn.microsoft.com/en-us/training/modules/intro-computer-vision-pytorch/6-transfer-learning
def train_epoch(net,dataloader,lr=0.01,optimizer=None,loss_fn = nn.NLLLoss()):
    optimizer = optimizer or torch.optim.Adam(net.parameters(),lr=lr)
    net.train()
    total_loss,acc,count = 0,0,0
    for features,labels in dataloader:
        optimizer.zero_grad()
        lbls = labels.to(device)
        out = net(features.to(device))
        loss = loss_fn(out,lbls) #cross_entropy(out,labels)
        loss.backward()
        optimizer.step()
        total_loss+=loss
        _,predicted = torch.max(out,1)
        acc+=(predicted==lbls).sum()
        count+=len(labels)
    return total_loss.item()/count, acc.item()/count

def validate(net, dataloader,loss_fn=nn.NLLLoss()):
    net.eval()
    count,acc,loss = 0,0,0
    with torch.no_grad():
        for features,labels in dataloader:
            lbls = labels.to(device)
            out = net(features.to(device))
            loss += loss_fn(out,lbls) 
            pred = torch.max(out,1)[1]
            acc += (pred==lbls).sum()
            count += len(labels)
    return loss.item()/count, acc.item()/count

def train(net,train_loader,test_loader,optimizer=None,lr=0.01,epochs=10,loss_fn=nn.NLLLoss()):
    optimizer = optimizer or torch.optim.Adam(net.parameters(),lr=lr)
    res = { 'train_loss' : [], 'train_acc': [], 'val_loss': [], 'val_acc': []}
    for ep in range(epochs):
        tl,ta = train_epoch(net,train_loader,optimizer=optimizer,lr=lr,loss_fn=loss_fn)
        vl,va = validate(net,test_loader,loss_fn=loss_fn)
        print(f"Epoch {ep:2}, Train acc={ta:.3f}, Val acc={va:.3f}, Train loss={tl:.3f}, Val loss={vl:.3f}")
        res['train_loss'].append(tl)
        res['train_acc'].append(ta)
        res['val_loss'].append(vl)
        res['val_acc'].append(va)
    return res

In [None]:
RESNET18 = ResNet18.to(device)
loss_fn = nn.CrossEntropyLoss() # 使用交叉熵损失函数
optimizer = torch.optim.Adam(ResNet18.parameters(), lr=learning_rate) # 使用 Adam 优化器

# 训练模型
history = train(ResNet18, trainloader, testloader, optimizer, num_epochs, loss_fn)

In [None]:
def train_long(net,train_loader,test_loader,epochs=5,lr=0.01,optimizer=None,loss_fn = nn.NLLLoss(),print_freq=10):
    optimizer = optimizer or torch.optim.Adam(net.parameters(),lr=lr)
    for epoch in range(epochs):
        net.train()
        total_loss,acc,count = 0,0,0
        for i, (features,labels) in enumerate(train_loader):
            lbls = labels.to(device)
            optimizer.zero_grad()
            out = net(features.to(device))
            loss = loss_fn(out,lbls)
            loss.backward()
            optimizer.step()
            total_loss+=loss
            _,predicted = torch.max(out,1)
            acc+=(predicted==lbls).sum()
            count+=len(labels)
            if i%print_freq==0:
                print("Epoch {}, minibatch {}: train acc = {}, train loss = {}".format(epoch,i,acc.item()/count,total_loss.item()/count))
        vl,va = validate(net,test_loader,loss_fn)
        print("Epoch {} done, validation acc = {}, validation loss = {}".format(epoch,va,vl))

In [None]:
trainloader = torch.utils.data.DataLoader(trainset, batch_size=128,shuffle=True, num_workers=2)
testloader = torch.utils.data.DataLoader(testset, batch_size=128,shuffle=False, num_workers=2)

train_long(net,trainloader,testloader,loss_fn=torch.nn.CrossEntropyLoss(),epochs=2,print_freq=10)