### 二维互相关运算
![image.png](image/5.1.jpg)

In [42]:
import torch
from torch import nn

def corr2d(X, K):
    """输入数组X和核数组K，输出数组Y"""
    h, w = K.shape
    Y = torch.zeros((X.shape[0] - h + 1), (X.shape[1] - w + 1))
    for i in range(Y.shape[0]):
        for j in range(Y.shape[1]):
            Y[i, j] = (X[i:i + h, j:j + w] * K).sum()
    return Y

构造图中的输入数组X和核数组K来验证二维互相关运算的输出

In [43]:
X = torch.tensor([[0, 1, 2], [3, 4, 5], [6, 7, 8]])
K = torch.tensor([[0, 1], [2, 3]])
corr2d(X, K)

tensor([[19., 25.],
        [37., 43.]])

构造一个二维卷积层

In [44]:
class Conv2D(nn.Module):
    def __init__(self, kernel_size) -> None:
        super(Conv2D, self).__init__()
        self.weight = nn.Parameter(torch.randn(kernel_size))
        self.bias = nn.Parameter(torch.randn(1))
    
    def forward(self, x):
        return corr2d(x, self.weight) + self.bias

卷积层的一个简单应用是检测图像中物体的边缘，即像素变化的位置

In [45]:
X = torch.ones(6, 8)
X[:, 2:6] = 0
X

tensor([[1., 1., 0., 0., 0., 0., 1., 1.],
        [1., 1., 0., 0., 0., 0., 1., 1.],
        [1., 1., 0., 0., 0., 0., 1., 1.],
        [1., 1., 0., 0., 0., 0., 1., 1.],
        [1., 1., 0., 0., 0., 0., 1., 1.],
        [1., 1., 0., 0., 0., 0., 1., 1.]])

构造一个1*2的卷积核K，如果横向相邻元素相同，输出为0，否则输出为非0

In [46]:
K = torch.tensor([[1, -1]])

In [47]:
Y = corr2d(X, K)
Y

tensor([[ 0.,  1.,  0.,  0.,  0., -1.,  0.],
        [ 0.,  1.,  0.,  0.,  0., -1.,  0.],
        [ 0.,  1.,  0.,  0.,  0., -1.,  0.],
        [ 0.,  1.,  0.,  0.,  0., -1.,  0.],
        [ 0.,  1.,  0.,  0.,  0., -1.,  0.],
        [ 0.,  1.,  0.,  0.,  0., -1.,  0.]])

### 通过数据学习核数组
构造一个卷积层，卷积核将被初始化成随机数组，在接下来的每一次迭代中，利用平方误差来比较Y和卷积层的输出，然后计算梯度来更新卷积核

In [48]:
# 构造一个核数组形装饰1*2的二维卷积层
conv2d = Conv2D(kernel_size=(1, 2))

step = 30
lr = 0.01
print('---------StartTraining-----------')
for i in range(step):
    Y_hat = conv2d(X)
    l = ((Y_hat - Y) ** 2).sum()
    l.backward()

    # 梯度下降
    conv2d.weight.data -= lr * conv2d.weight.grad
    conv2d.bias.data -= lr * conv2d.bias.grad

    # 梯度清零
    conv2d.weight.grad.fill_(0)
    conv2d.bias.grad.fill_(0)
    
    if (i + 1) % 5 == 0:
        print('Step %d, loss %.3f' % (i + 1, l.item()))
        print('weight:', conv2d.weight.data)
        print('bias:', conv2d.bias.data)
        print('---------------------------------')

---------StartTraining-----------
Step 5, loss 15.599
weight: tensor([[-0.0059,  0.0006]])
bias: tensor([0.0033])
---------------------------------
Step 10, loss 4.344
weight: tensor([[ 0.4696, -0.4715]])
bias: tensor([0.0010])
---------------------------------
Step 15, loss 1.210
weight: tensor([[ 0.7203, -0.7209]])
bias: tensor([0.0003])
---------------------------------
Step 20, loss 0.337
weight: tensor([[ 0.8524, -0.8526]])
bias: tensor([0.0001])
---------------------------------
Step 25, loss 0.094
weight: tensor([[ 0.9221, -0.9222]])
bias: tensor([3.8585e-05])
---------------------------------
Step 30, loss 0.026
weight: tensor([[ 0.9589, -0.9589]])
bias: tensor([1.2874e-05])
---------------------------------


事实上，为了得到卷积运算的输出，我们只需要将核数组左右翻转并上下翻转，再与输入数组做互相关运算即可，所以在实践中二者是等效的

二维卷积层输出的二维数组叫做特征图，影响特征图中某个元素的计算结果的对应的输入区域叫做这个元素的感受野，我们可以通过更深的卷积神经网络让特征图中的单个元素的感受野变得更加广阔，从而捕捉输入上更大尺寸的特征

通过填充和步幅，我们可以改变输出形状，即便输入和卷积核形状已经确定
![image.png](image/5.2.jpg)

### 填充和步幅
通常我们会让输入和输出拥有相同的高和宽

In [49]:
# 定义一个函数来计算卷积层。它对输入和输出做相应的升维和降维
def comp_conv2d(conv2d, X):
    # (1, 1)代表批量大小和通道数（“多输入通道和多输出通道”一节将介绍）均为1
    X = X.view((1, 1) + X.shape)
    Y = conv2d(X)
    return Y.view(Y.shape[2:])  # 排除不关心的前两维：批量和通道

# 注意这里是两侧分别填充1行或列，所以在两侧一共填充2行或列
conv2d = nn.Conv2d(in_channels=1, out_channels=1, kernel_size=3, padding=1)

X = torch.rand(8, 8)
comp_conv2d(conv2d, X).shape


torch.Size([8, 8])

#### 填充
当卷积核的高和宽不同时，我们可以设置高和宽上不同的填充数让输出和输入有相同的高和宽

In [50]:
# 使用高为5、宽为3的卷积核。在高和宽两侧的填充数分别为2和1
conv2d = nn.Conv2d(in_channels=1, out_channels=1, kernel_size=(5, 3), padding=(2, 1))
comp_conv2d(conv2d, X).shape

torch.Size([8, 8])

#### 步幅
![image.png](image/5.3.jpg)
具体而言，输出尺寸可以用以下公式计算
$$output=\lfloor\frac{input+2\times padding-kernel}{stride}\rfloor+1$$

### 多输入通道和多输出通道
多输入通道\
当输入图片为彩色的时候，图像在高和宽之外还有RGB三个颜色通道，其可以表示成一个3\*h\*w的多维数组\
要实现多个输入通道的互相关运算，只需要对每个通道做互相关运算，然后通过`add_n`函数来进行累加

In [51]:
import torch
from torch import nn
import sys
sys.path.append('..')
import d2lzh_pytorch as d2l

def corr2d_multi_in(X, K):
    # 沿着K和X的第0维（通道维）分别计算再相加
    res = d2l.corr2d(X[0, :, :], K[0, :, :])
    for i in range(1, X.shape[0]):
        res += d2l.corr2d(X[i, :, :], K[i, :, :])
    return res

In [52]:
X = torch.tensor([[[0, 1, 2], [3, 4, 5], [6, 7, 8]],
              [[1, 2, 3], [4, 5, 6], [7, 8, 9]]])
K = torch.tensor([[[0, 1], [2, 3]], [[1, 2], [3, 4]]])

corr2d_multi_in(X, K)

tensor([[ 56.,  72.],
        [104., 120.]])

多输出通道

In [53]:
def corr2d_multi_in_out(X, K):
    #对K的第0维遍历，每次输入X做互相关运算，所有结果用stack函数合并在一起
    return torch.stack([corr2d_multi_in(X, k) for k in K])

In [54]:
# 将核数组K和K+1，K+2拼在一起构造一个输出通道数为3的卷积核
K = torch.stack([K, K + 1, K + 2])
K.shape

torch.Size([3, 2, 2, 2])

In [55]:
corr2d_multi_in_out(X, K)

tensor([[[ 56.,  72.],
         [104., 120.]],

        [[ 76., 100.],
         [148., 172.]],

        [[ 96., 128.],
         [192., 224.]]])

1 $\times$ 1卷积层\
这种卷积运算失去了可以识别高和宽维度上相邻元素构成的模式的功能，主要使用在通道维上，输出中的每个元素来自输入中相同位置不同通道间按权重累加的结果，假设我们将通道维当作特征维，将高和宽维度上的元素当作数据样本，那么该卷积层的作用和全连接层等价

In [56]:
def corr2d_multi_in_out_1x1(X, K):
    c_i, h, w = X.shape
    c_o = K.shape[0]
    X = X.view(c_i, h * w)
    K = K.view(c_o, c_i)
    Y = torch.mm(K, X)
    return Y.view(c_o, h, w)

做1 $\times$ 1卷积时，以上函数和之前实现的`corr2d_multi_in_out`等价

In [57]:
X = torch.rand(3, 3, 3)
K = torch.rand(2, 3, 1, 1)

Y1 = corr2d_multi_in_out_1x1(X, K)
Y2 = corr2d_multi_in_out(X, K)

(Y1 - Y2).norm().item() < 1e-6

True

在之后的模型里我们将会看到1×1卷积层被当作保持高和宽维度形状不变的全连接层使用。于是，我们可以通过调整网络层之间的通道数来控制模型复杂度

### 池化层
池化(pooling)层，它的提出是为了缓解卷积层对位置的过度敏感性\
不同于卷积层里计算输入和核的互相关性，池化层直接计算池化窗口内元素的最大值或者平均值

In [60]:
def pool2d(X, pool_size, mode='max'):
    X = X.float()
    p_h, p_w = pool_size
    Y = torch.zeros(X.shape[0] - p_h + 1, X.shape[1] - p_w + 1)
    for i in range(Y.shape[0]):
        for j in range(Y.shape[1]):
            if mode == 'max':
                Y[i, j] = X[i: i + p_h, j: j+ p_w].max()
            elif mode == 'avg':
                Y[i, j] = X[i: i + p_h, j: j+ p_w].mean()
    return Y

In [61]:
X = torch.tensor([[0, 1, 2], [3, 4, 5], [6, 7, 8]])
pool2d(X, (2, 2))

tensor([[4., 5.],
        [7., 8.]])

In [62]:
pool2d(X, (2, 2), 'avg')

tensor([[2., 3.],
        [5., 6.]])

和卷积层一样，池化层也可以输入高和宽两侧的填充并调整窗口的移动步幅来改变输出形状

In [63]:
X = torch.arange(16, dtype=torch.float).view((1, 1, 4, 4)) # 前两个维度分别是批量和通道
X

tensor([[[[ 0.,  1.,  2.,  3.],
          [ 4.,  5.,  6.,  7.],
          [ 8.,  9., 10., 11.],
          [12., 13., 14., 15.]]]])

默认状态下，`MaxPool2d`实例里的步幅和池化窗口形状相同

In [64]:
pool2d = nn.MaxPool2d(3)
pool2d(X)

tensor([[[[10.]]]])

可以手动指定步幅和填充

In [65]:
pool2d = nn.MaxPool2d(3, padding=1, stride=2)
pool2d(X)

tensor([[[[ 5.,  7.],
          [13., 15.]]]])

也可以指定非正方形的池化窗口

In [66]:
pool2d = nn.MaxPool2d((2, 4), padding=(1, 2), stride=(2, 3))
pool2d(X)


tensor([[[[ 1.,  3.],
          [ 9., 11.],
          [13., 15.]]]])

在处理多通道的数据时，池化层对每个输入通道分别池化，而不是像卷积层一样将各通道的输入按通道相加

In [67]:
X = torch.cat((X, X + 1), dim=1)
X

tensor([[[[ 0.,  1.,  2.,  3.],
          [ 4.,  5.,  6.,  7.],
          [ 8.,  9., 10., 11.],
          [12., 13., 14., 15.]],

         [[ 1.,  2.,  3.,  4.],
          [ 5.,  6.,  7.,  8.],
          [ 9., 10., 11., 12.],
          [13., 14., 15., 16.]]]])

In [68]:
pool2d = nn.MaxPool2d(3, padding=1, stride=2)
pool2d(X)

tensor([[[[ 5.,  7.],
          [13., 15.]],

         [[ 6.,  8.],
          [14., 16.]]]])

在先前用多层感知机对Fashion-MNIST数据集中的图像进行分类的时候，我们将图像逐行展开，得到长度为784的向量，并输入到全联接层中，但是这种方法存在的局限性有
1. 在图像中临近的像素在向量中相距可能较远，其构成的模式可能难以被模型识别
2. 对于大尺度的输入图像，使用全联接层可能导致模型过大，模型参数会占用过高的存储开销

卷积层对这两个问题的解决方式为

1. 卷积层保留输入形状，让图像像素在高和宽两个方向上的相关性均有可能被识别
2. 通过滑动窗口将同一卷积核与不同位置的输入重复计算，从而避免参数尺寸过大

### LeNet卷积神经网络
LeNet的网络结构
![image.png](image/5.4.jpg)
整体上，LeNet分为卷积层块和全连接层块两个部分

卷积层块里的基本单位为卷积层+最大池化层，卷积层用来识别图像里的空间模式，最大池化层用来降低卷积层对位置的敏感性。在卷积层块中，每个卷积层使用5x5的窗口，并在输出上使用sigmoid激活函数，第一个卷积层输出通道数为6，第二个卷积层输出通道数为16（这是由于第二个卷积层比第一个卷积层的输出高和宽要小），卷积层块的两个最大池化层的窗口形状均为2x2，且步幅为2

卷积层块的输出形状为（批量大小，通道，高，宽），输入到全连接层块时，全连接层块会将小批量中的每个样本变平，将全连接层的输入变成二维，第一维是小批量中的样本，第二维是每个样本变平后的向量，长度为通道x高x宽，全连接层块包含三个全连接层，输出个数分别为120，84和10，其中10为输出的类别个数

下面通过`Sequential`类来实现LeNet模型

In [77]:
import time
from torch import optim
from torch.utils.tensorboard import SummaryWriter

import sys
sys.path.append('..')
import d2lzh_pytorch as d2l
device = torch.device('mps' if torch.backends.mps.is_available() else 'cpu')

class LeNet(nn.Module):
    def __init__(self):
        super(LeNet, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(1, 6, 5), # in_channels, out_channels, kernel_size
            nn.Sigmoid(),
            nn.MaxPool2d(2, 2), # kernel_size, stride
            nn.Conv2d(6, 16, 5),
            nn.Sigmoid(),
            nn.MaxPool2d(2, 2)
        )
        self.fc = nn.Sequential(
            nn.Linear(16*4*4, 120),
            nn.Sigmoid(),
            nn.Linear(120, 84),
            nn.Sigmoid(),
            nn.Linear(84, 10)
        )
    
    def forward(self, img):
        feature = self.conv(img)
        output = self.fc(feature.view(img.shape[0], -1))
        return output

In [78]:
net = LeNet()
print(net)

LeNet(
  (conv): Sequential(
    (0): Conv2d(1, 6, kernel_size=(5, 5), stride=(1, 1))
    (1): Sigmoid()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (3): Conv2d(6, 16, kernel_size=(5, 5), stride=(1, 1))
    (4): Sigmoid()
    (5): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (fc): Sequential(
    (0): Linear(in_features=256, out_features=120, bias=True)
    (1): Sigmoid()
    (2): Linear(in_features=120, out_features=84, bias=True)
    (3): Sigmoid()
    (4): Linear(in_features=84, out_features=10, bias=True)
  )
)


In [79]:
batch_size = 256
train_iter, test_iter = d2l.load_data_fashion_mnist(batch_size=batch_size)

# 实时可视化训练数据
writer = SummaryWriter()

因为我们要用GPU加速计算，所以需要对`evaluate_accuracy`函数作修改使其支持GPU计算

In [72]:
def evaluate_accuracy(data_iter, net, device=None):
    if device is None and isinstance(net, torch.nn.Module):
        # 如果没有指定device，就是用net的device
        device = list(net.parameters())[0].device
    acc_sum, n = 0.0, 0
    with torch.no_grad():   # 不记录梯度
        for X, y in data_iter:
            if isinstance(net, torch.nn.Module):
                net.eval() # 评估模式，这会关闭dropout
                acc_sum += (net(X.to(device)).argmax(dim=1) == y.to(device)).float().sum().cpu().item()
                net.train() # 改回训练模式
            else:
                if ('is_training' in net.__code__.co_varnames): # 如果有is_training这个参数
                    acc_sum += (net(X, is_training=False).argmax(dim=1) == y).float().sum().item()
                else:
                    acc_sum += (net(X).argmax(dim=1) == y).float().sum().item()
            n += y.shape[0]
    return acc_sum / n

同样的，训练的函数也需要作修改以适应GPU加速

In [80]:
def train_ch5(net, train_iter, test_iter, batch_size, optimizer, device, num_epochs, writer=None):
    net = net.to(device)
    print('training on', device)
    loss = torch.nn.CrossEntropyLoss()
    for epoch in range(num_epochs):
        train_l_sum, train_acc_sum, n, batch_count, start = 0.0, 0.0, 0, 0, time.time()
        for X, y in train_iter:
            X = X.to(device)
            y = y.to(device)
            y_hat = net(X)
            l = loss(y_hat, y)
            optimizer.zero_grad()
            l.backward()
            optimizer.step()
            train_l_sum += l.cpu().item()
            train_acc_sum += (y_hat.argmax(dim=1) == y).sum().cpu().item()
            n += y.shape[0]
            batch_count += 1
        test_acc = evaluate_accuracy(test_iter, net)
        if writer is None:
            print('epoch %d, loss %.4f, train acc %.3f, test acc %.3f, time %.1f sec'
              % (epoch + 1, train_l_sum / batch_count, train_acc_sum / n, test_acc, time.time() - start))
        else:
            writer.add_scalar('Train Loss', l.item(), epoch + 1)
            writer.add_scalar('Train Accuracy', train_acc_sum / n, epoch + 1)
            writer.add_scalar('Test Accuracy', test_acc, epoch + 1)

        

In [82]:
lr, num_epochs = 0.001, 50
optimizer = optim.Adam(net.parameters(), lr=lr)
train_ch5(net, train_iter, test_iter, batch_size, optimizer, device, num_epochs)

training on mps
epoch 1, loss 0.6487, train acc 0.749, test acc 0.747, time 7.1 sec
epoch 2, loss 0.6069, train acc 0.763, test acc 0.764, time 7.8 sec
epoch 3, loss 0.5742, train acc 0.775, test acc 0.778, time 7.2 sec
epoch 4, loss 0.5489, train acc 0.785, test acc 0.782, time 7.1 sec
epoch 5, loss 0.5265, train acc 0.795, test acc 0.794, time 6.8 sec
epoch 6, loss 0.5074, train acc 0.802, test acc 0.793, time 6.7 sec
epoch 7, loss 0.4927, train acc 0.809, test acc 0.802, time 6.7 sec
epoch 8, loss 0.4797, train acc 0.813, test acc 0.802, time 7.3 sec
epoch 9, loss 0.4659, train acc 0.819, test acc 0.817, time 7.0 sec
epoch 10, loss 0.4550, train acc 0.825, test acc 0.817, time 6.7 sec
epoch 11, loss 0.4435, train acc 0.831, test acc 0.821, time 6.7 sec
epoch 12, loss 0.4308, train acc 0.837, test acc 0.831, time 7.1 sec
epoch 13, loss 0.4228, train acc 0.840, test acc 0.835, time 7.2 sec
epoch 14, loss 0.4102, train acc 0.846, test acc 0.825, time 7.1 sec
epoch 15, loss 0.4042, trai

In [90]:
for name, param in net.named_parameters():
    print(name, param.size())

torch.save(net.state_dict(), 'test_data/MyFirstLeNet.pt')

conv.0.weight torch.Size([6, 1, 5, 5])
conv.0.bias torch.Size([6])
conv.3.weight torch.Size([16, 6, 5, 5])
conv.3.bias torch.Size([16])
fc.0.weight torch.Size([120, 256])
fc.0.bias torch.Size([120])
fc.2.weight torch.Size([84, 120])
fc.2.bias torch.Size([84])
fc.4.weight torch.Size([10, 84])
fc.4.bias torch.Size([10])
