### ex11_2 Residual Model
本模型是一个简化版的残差网络的实现，同样针对MNIST数据集的卷积神经网络
主要是封装一个Residual Block

#### 初始化基本设置并建立数据集

In [1]:
import torch
import torch.nn as nn
import matplotlib.pyplot as plt
import numpy as np
import random
from torch.utils.data import Dataset, DataLoader
from torchvision import datasets, transforms
import torch.nn.functional as F

# 初始化并固定随机种子


def setup_seed(seed):
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    np.random.seed(seed)
    random.seed(seed)
    torch.backends.cudnn.deterministic = True


setup_seed(1012)

# 设置GPU加速
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"The current computing device is {device.type} ")
if torch.cuda.is_available():
    print(f'The current GPU is :{torch.cuda.get_device_name(0)}')

# prepare dataset

batch_size = 64
transform = transforms.Compose(
    [transforms.ToTensor(),
     transforms.Normalize((0.1307,), (0.3081,))])
# 处理图像数据的一个转换类 将pillow类转化为tensor, 并将值归一化： 0.1307 和 0.3081 为该数据集的均值和标准差
# 每一个数据为[28,28]的tensor

train_dataset = datasets.MNIST(
    root='./dataset/mnist/', train=True, download=True, transform=transform)
train_loader = DataLoader(train_dataset, shuffle=True, batch_size=batch_size)
test_dataset = datasets.MNIST(
    root='./dataset/mnist/', train=False, download=True, transform=transform)
test_loader = DataLoader(test_dataset, shuffle=False,
                         batch_size=len(test_dataset))  # 测试肯定是


print(f'[size of train_set/test_set]:{len(train_dataset)},{len(test_dataset)}')


The current computing device is cpu 
[size of train_set/test_set]:60000,10000


#### 定义模型

In [2]:
class ResidualBlock(nn.Module):
    '''
    内部有等宽的卷积层，此模块保持维度和channel不变，以使残差连接
    '''
    def __init__(self,channels) -> None:
        super(ResidualBlock,self).__init__()
        self.layers = nn.Sequential(
            nn.Conv2d(channels,channels,kernel_size=3,padding=1),
            nn.ReLU(),
            nn.Conv2d(channels,channels,kernel_size=3,padding=1)
        )
    def forward(self,x):
        y = self.layers(x)
        return F.relu(x+y)

class Net(nn.Module):
    def __init__(self) -> None:
        super(Net,self).__init__()
        self.layers = nn.Sequential(
            # input [b,1,28,28]
            nn.Conv2d(1,16,kernel_size=5),  # [b,16,24,24]
            nn.ReLU(),
            nn.MaxPool2d(2), #[b,16,12,12]
            ResidualBlock(16), # 维度和channel均不变
            nn.Conv2d(16,32,kernel_size=5), # [b,32,8,8]
            nn.MaxPool2d(2),    # [b,32,4,4]
            ResidualBlock(32)   
            # 之后转化为全连接层 [b,512]
        )
        self.fc = nn.Linear(512,10)
    def forward(self,x):
        batch_size = x.shape[0]
        x = self.layers(x)
        x = x.view(batch_size,-1)
        return self.fc(x)
model = Net()
model.to(device)

Net(
  (layers): Sequential(
    (0): Conv2d(1, 16, kernel_size=(5, 5), stride=(1, 1))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (3): ResidualBlock(
      (layers): Sequential(
        (0): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        (1): ReLU()
        (2): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      )
    )
    (4): Conv2d(16, 32, kernel_size=(5, 5), stride=(1, 1))
    (5): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (6): ResidualBlock(
      (layers): Sequential(
        (0): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        (1): ReLU()
        (2): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      )
    )
  )
  (fc): Linear(in_features=512, out_features=10, bias=True)
)

#### 设置损失函数和优化器

In [3]:
# construct loss and optimiter

# 包含了softmax层，并且会根据标签类别（即使是多类）,自动构建one-hot计算交叉熵，需要LongTensor类标签
criterion = nn.CrossEntropyLoss(reduction='mean')
optimizer = torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.5)

#### 设置训练和测试函数

In [4]:
# training and test

loss_list = []
accuracy_list = []


def train(epoch):
    '''某一轮epoch上的训练'''
    epoch_loss = []  # 记录该轮epoch上每个batch的loss
    for batch_idx, batch_data in enumerate(train_loader, 1):
        X, y_label = batch_data
        X, y_label = X.to(device), y_label.to(device)
        # print("debug here: X shape:", X.shape)
        y_pred = model(X)
        loss = criterion(y_pred, y_label)

        epoch_loss.append(loss.item())

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    average_loss = sum(epoch_loss)/len(epoch_loss)
    print(f'[epoch]:{epoch},  [average_loss]: {average_loss}')


def test():
    '''在全集合上测试一次准确率'''
    correct_num = 0
    num = len(test_dataset)
    with torch.no_grad():
        for batch_data in test_loader:
            X, y = batch_data
            X, y = X.to(device) ,y.to(device)
            y_pred = model(X)
            y_pred = torch.argmax(y_pred, dim=1)
            correct_num += torch.sum(y_pred == y).item()
    accuracy = correct_num/num
    accuracy_list.append(accuracy)
    print(f'Current accuracy on the whole set is {accuracy}')




#### 训练与测试

In [5]:
# start training now!


num_epochs = 10


for epoch in range(1, num_epochs+1):
    train(epoch)
    test()
    


[epoch]:1,  [average_loss]: 0.25860325733561124
Current accuracy on the whole set is 0.976


KeyboardInterrupt: 

#### 作图

In [None]:
plt.subplot(1,2,1)
epochs = list(range(1,num_epochs))
plt.plot(epochs,loss_list , color='#e4007f', label="Train loss (average training loss over one epoch)")
plt.subplot(1,2,2)
plt.plot(epochs, accuracy_list, color='#f19ec2', label="test accuracy")
plt.show()