In [2]:
import torch
from torch import nn, optim
import torch.nn.functional as F
import d2lzh_pytorch as d2l
import time

### 5.1 二维卷积层
* convolution，cross-correlation运算
* 二维卷积层的核心计算是二维互相关运算
* 可以设计卷积核来检测图像中的边缘
* 可以通过数据来学习卷积核

In [3]:
def corr2d(X, K):
    h, w = K.shape
    Y = torch.zeros((X.shape[0] - h + 1,
                    X.shape[1] - w + 1))
    for i in range(Y.shape[0]):
        for j in range(Y.shape[1]):
            Y[i, j] = (X[i: i + h, j: j + w] * K).sum()
    return Y

In [4]:
X = torch.tensor([[0, 1, 2],
                 [3, 4, 5],
                 [6, 7, 8]])
K = torch.tensor([[0, 1], [2, 3]])
corr2d(X, K)

tensor([[19., 25.],
        [37., 43.]])

In [5]:
class Conv2D(nn.Module):
    def __init__(self, kernel_size):
        super(Conv2D, self).__init__()
        self.weight = nn.Parameter(torch.randn(
            kernel_size))
        self.bias = nn.Parameter(torch.randn(1))
    def forward(self, x):
        return corr2d(x, self.weight) + self.bias

In [6]:
X = torch.ones(6, 8)
X[:, 2:6] = 0
X

tensor([[1., 1., 0., 0., 0., 0., 1., 1.],
        [1., 1., 0., 0., 0., 0., 1., 1.],
        [1., 1., 0., 0., 0., 0., 1., 1.],
        [1., 1., 0., 0., 0., 0., 1., 1.],
        [1., 1., 0., 0., 0., 0., 1., 1.],
        [1., 1., 0., 0., 0., 0., 1., 1.]])

In [7]:
K = torch.tensor([[1, -1]])
K

tensor([[ 1, -1]])

In [8]:
K.shape

torch.Size([1, 2])

In [9]:
Y = corr2d(X, K)
Y

tensor([[ 0.,  1.,  0.,  0.,  0., -1.,  0.],
        [ 0.,  1.,  0.,  0.,  0., -1.,  0.],
        [ 0.,  1.,  0.,  0.,  0., -1.,  0.],
        [ 0.,  1.,  0.,  0.,  0., -1.,  0.],
        [ 0.,  1.,  0.,  0.,  0., -1.,  0.],
        [ 0.,  1.,  0.,  0.,  0., -1.,  0.]])

In [10]:
conv2d = Conv2D(kernel_size=(1, 2))
step = 20
lr = 0.01
for i in range(step):
    Y_hat = conv2d(X)
    l = ((Y_hat - Y) ** 2).sum()
    l.backward()
    conv2d.weight.data -= lr * conv2d.weight.grad
    conv2d.bias.data -= lr * conv2d.bias.grad
    conv2d.weight.grad.fill_(0)
    conv2d.bias.grad.fill_(0)
    if (i + 1) % 5 == 0:
        print('step %d, los %.3f' % (i + 1, l.item()))

step 5, los 15.816
step 10, los 3.951
step 15, los 1.050
step 20, los 0.287


In [11]:
print("weight:", conv2d.weight.data)
print("bias:", conv2d.bias.data)

weight: tensor([[ 0.8736, -0.8562]])
bias: tensor([-0.0098])


### 5.2 填充和步幅

In [12]:
# 填充
# 定义一个函数来计算卷积层。对输入和输出做相应的升维和降维
def comp_conv2d(conv2d, X):
    X = X.view((1, 1) + X.shape)
    Y = conv2d(X)
    return Y.view(Y.shape[2:])

conv2d = nn.Conv2d(in_channels=1, out_channels=1, 
                   kernel_size=3, padding=1)
x = torch.rand(8, 8)
comp_conv2d(conv2d, X).shape

torch.Size([6, 8])

In [13]:
conv2d = nn.Conv2d(in_channels=1, out_channels=1,
                  kernel_size=(5, 3), padding=(2, 1))
comp_conv2d(conv2d, X).shape

torch.Size([6, 8])

In [14]:
# 步幅
conv2d = nn.Conv2d(1, 1, kernel_size=3, 
                   padding=1, stride=2)
comp_conv2d(conv2d, X).shape

torch.Size([3, 4])

In [15]:
conv2d = nn.Conv2d(1, 1, kernel_size=(3, 5),
                  padding=(0, 1), stride=(3, 4))
comp_conv2d(conv2d, X).shape

torch.Size([2, 2])

### 5.3 多输入通道和多输出通道

In [16]:
# 多输入通道
def corr2d_multi_in(X, K):
    res = corr2d(X[0, :, :], K[0, :, :])
    for i in range(1, X.shape[0]):
        res += corr2d(X[i, :, :], K[i, :, :])
    return res

In [17]:
X = torch.tensor([
    [[0, 1, 2], [3, 4, 5], [6, 7, 8]],
    [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
])
K = torch.tensor([
    [[0, 1], [2, 3]],
    [[1, 2], [3, 4]]
])
corr2d_multi_in(X, K)

tensor([[ 56.,  72.],
        [104., 120.]])

In [18]:
print(X.shape, K.shape)

torch.Size([2, 3, 3]) torch.Size([2, 2, 2])


In [19]:
# 多输出通道
def corr2d_multi_in_out(X, K):
    return torch.stack([
        corr2d_multi_in(X, k) for k in K
    ])

In [20]:
K = torch.stack([
    K, K + 1, K + 2
])
#print(K)
K.shape

torch.Size([3, 2, 2, 2])

In [21]:
corr2d_multi_in_out(X, K)

tensor([[[ 56.,  72.],
         [104., 120.]],

        [[ 76., 100.],
         [148., 172.]],

        [[ 96., 128.],
         [192., 224.]]])

In [22]:
# 1x1 卷积层
def corr2d_multi_in_out_1x1(X, K):
    c_i, h, w = X.shape
    c_o = K.shape[0]
    X = X.view(c_i, h * w)
    K = K.view(c_o, c_i)
    Y = torch.mm(K, X)
    return Y.view(c_o, h, w)

In [23]:
X = torch.rand(3, 3, 3)
K = torch.rand(2, 3, 1, 1)
Y1 = corr2d_multi_in_out_1x1(X, K)
Y2 = corr2d_multi_in_out(X, K)
(Y1 - Y2).norm().item() < 1e-6

True

### 5.4 池化层

In [24]:
# 二维最大池化层和平均池化层
def pool2d(X, pool_size, mode='max'):
    X = X.float()
    p_h, p_w = pool_size
    Y = torch.zeros(X.shape[0] - p_h + 1,
                   X.shape[1] - p_w + 1)
    for i in range(Y.shape[0]):
        for j in range(Y.shape[1]):
            if mode == 'max':
                Y[i, j] = X[i: i + p_h, j: j + p_w].max()
            elif mode == 'avg':
                Y[i, j] = X[i: i + p_h, j: j + p_w].mean()
    return Y

In [25]:
X = torch.tensor([
    [0, 1, 2], [3, 4, 5], [6, 7, 8]
])
pool2d(X, (2, 2))

tensor([[4., 5.],
        [7., 8.]])

In [26]:
pool2d(X, (2, 2), 'avg')

tensor([[2., 3.],
        [5., 6.]])

In [27]:
# 填充和步幅
X = torch.arange(16, dtype=torch.float).view(1, 1, 4, 4)
X

tensor([[[[ 0.,  1.,  2.,  3.],
          [ 4.,  5.,  6.,  7.],
          [ 8.,  9., 10., 11.],
          [12., 13., 14., 15.]]]])

In [28]:
# 使用（3，3）的池化窗口，默认获得（3，3）的步幅
pool2d = nn.MaxPool2d(3)
pool2d(X)

tensor([[[[10.]]]])

In [29]:
# 手动指定步幅和填充
pool2d = nn.MaxPool2d(3, padding=1, stride=2)
pool2d(X)

tensor([[[[ 5.,  7.],
          [13., 15.]]]])

In [30]:
# 多通道
X = torch.arange(16, dtype=torch.float).view(1, 1, 4, 4)
X = torch.cat([X, X + 1], dim=1)
X

tensor([[[[ 0.,  1.,  2.,  3.],
          [ 4.,  5.,  6.,  7.],
          [ 8.,  9., 10., 11.],
          [12., 13., 14., 15.]],

         [[ 1.,  2.,  3.,  4.],
          [ 5.,  6.,  7.,  8.],
          [ 9., 10., 11., 12.],
          [13., 14., 15., 16.]]]])

In [31]:
pool2d = nn.MaxPool2d(3, padding=1, stride=2)
pool2d(X)

tensor([[[[ 5.,  7.],
          [13., 15.]],

         [[ 6.,  8.],
          [14., 16.]]]])

### 5.5 卷积神经网络（LeNet）

In [42]:
device = torch.device('cuda' if torch.cuda.is_available()
                     else 'cpu')
class LeNet(nn.Module):
    def __init__(self):
        super(LeNet, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(1, 6, 5), # in, out, kernel
            nn.Sigmoid(),
            nn.MaxPool2d(2, 2), # kernel, stride
            nn.Conv2d(6, 16, 5),
            nn.Sigmoid(),
            nn.MaxPool2d(2, 2)
        )
        self.fc = nn.Sequential(
            nn.Linear(16*4*4, 120),
            nn.Sigmoid(),
            nn.Linear(120, 84),
            nn.Sigmoid(),
            nn.Linear(84, 10),
        )
    def forward(self, img):
        feature = self.conv(img)
        output = self.fc(feature.view(img.shape[0], -1))
        return output

In [43]:
net = LeNet()
print(net)

LeNet(
  (conv): Sequential(
    (0): Conv2d(1, 6, kernel_size=(5, 5), stride=(1, 1))
    (1): Sigmoid()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (3): Conv2d(6, 16, kernel_size=(5, 5), stride=(1, 1))
    (4): Sigmoid()
    (5): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (fc): Sequential(
    (0): Linear(in_features=256, out_features=120, bias=True)
    (1): Sigmoid()
    (2): Linear(in_features=120, out_features=84, bias=True)
    (3): Sigmoid()
    (4): Linear(in_features=84, out_features=10, bias=True)
  )
)


In [44]:
batch_size = 256
train_iter, test_iter = d2l.load_data_fashion_mnist(
    batch_size=batch_size)

In [45]:
def evaluate_accuracy(data_iter, net):
    acc_sum, n = 0.0, 0
    with torch.no_grad():
        for X, y in data_iter:
            if isinstance(net, torch.nn.Module):
                net.eval()
                acc_sum += ((net(X.to(device)).argmax(
                    dim=1) == y.to(device)).float()
                            .sum().cpu().item())
                net.train()
            else:
                if ('is_training' in 
                    net.__code__.co_varnames):
                    acc_sum += ((net(X, is_training=False)
                                .argmax(dim=1) == y)
                               .float().sum().item())
                else:
                    acc_sum += ((net(X).argmax(dim=1) 
                                 == y).float().sum()
                                .item())
            n += y.shape[0]
    print(acc_sum, n)
    return acc_sum / n 

In [46]:
def train_ch5(net, train_iter, test_iter, batch_size, 
             optimizer, device, num_epochs):
    net = net.to(device)
    print("train on ", device)
    loss = torch.nn.CrossEntropyLoss()
    batch_count = 0
    for epoch in range(num_epochs):
        train_l_sum, train_acc_sum = 0.0, 0.0
        n, start = 0, time.time()
        for X, y in train_iter:
            X = X.to(device)
            y = y.to(device)
            y_hat = net(X)
            l = loss(y_hat, y)
            optimizer.zero_grad()
            l.backward()
            optimizer.step()
            train_l_sum += l.cpu().item()
            train_acc_sum += ((y_hat.argmax(dim=1) == y)
                             .sum().cpu().item())
            n += y.shape[0]
            batch_count += 1
        test_acc = evaluate_accuracy(test_iter, net)
        print(('epoch %d, loss %.4f, train acc %.3f, ' + 
             'test acc %.3f, time %.1f') % (
                 epoch + 1, train_l_sum / batch_count, 
                 train_acc_sum / n, test_acc, 
                 time.time() - start))

In [47]:
lr, num_epochs = 0.001, 5
optimizer = torch.optim.Adam(net.parameters(), lr=lr)
train_ch5(net, train_iter, test_iter, batch_size, 
          optimizer, device, num_epochs)

train on  cpu
5748.0 10000
epoch 1, loss 1.8884, train acc 0.304, test acc 0.575, time 23.8
6681.0 10000
epoch 2, loss 0.4811, train acc 0.629, test acc 0.668, time 23.7
7287.0 10000
epoch 3, loss 0.2561, train acc 0.714, test acc 0.729, time 24.1
7427.0 10000
epoch 4, loss 0.1700, train acc 0.742, test acc 0.743, time 24.1
7543.0 10000
epoch 5, loss 0.1262, train acc 0.754, test acc 0.754, time 25.4


### 5.10 批量归一化

* 在模型训练时，批量归一化利用小批量上的均值和标准差，不断调整神经网络中间输出，从而使整个神经网络在各层的中间输出的数值更稳定

In [37]:
# 从零开始实现
def batch_norm(is_training, X, gamma, beta, moving_mean, 
              moving_var, eps, momentum):
    if not is_training:
        # 预测模式，直接使用传入的均值和方差
        X_hat = (X - moving_mean) / torch.sqrt(moving_var
                                      + eps)
    else:
        assert len(X.shape) in (2, 4)
        if len(X.shape) == 2:
            # 全连接层，计算特征维上的均值和方差
            mean = X.mean(dim=0)
            var = ((X - mean) ** 2).mean(dim=0)
        else:
            # 使用二维卷积层的情况，计算通道维上（axis=1）的
            # 均值和方差。保持X的形状以便做广播运算
            mean = X.mean(dim=0, keepdim=True).mean(
                dim=2, keepdim=True).mean(
                dim=3, keepdim=True)
            var = ((X - mean) ** 2).mean(
                dim=0, keepdim=True).mean(
                dim=2, keepdim=True).mean(
                dim=3, keepdim=True)
        # 训练模式下，使用当前的均值和方差做标准化
        X_hat = (X - mean) / torch.sqrt(var + eps)
        # 更新移动平均的均值和方差
        moving_mean = (momentum * moving_mean + 
                       (1.0 - momentum) * mean)
        moving_var = (momentum * moving_var + 
                      (1.0 - momentum) * var)
    # 拉伸和偏移
    Y = gamma * X_hat + beta
    return Y, moving_mean, moving_var

In [38]:
class BatchNorm(nn.Module):
    def __init__(self, num_features, num_dims):
        super(BatchNorm, self).__init__()
        if num_dims == 2:
            shape = (1, num_features)
        else:
            shape = (1, num_features, 1, 1)
        # 参与求梯度和迭代的拉伸和偏移参数
        self.gamma = nn.Parameter(torch.ones(shape))
        self.beta = nn.Parameter(torch.zeros(shape))
        # 不参与求梯度和迭代的变量
        self.moving_mean = torch.zeros(shape)
        self.moving_var = torch.zeros(shape)
    
    def forward(self, X):
        if self.moving_mean.device != X.device:
            self.moving_mean = self.moving_mean.to(
                X.device)
            self.moving_var = self.moving_var.to(
                X.device)
        Y, self.moving_mean, self.moving_var = batch_norm(
                self.training, X, self.gamma, self.beta, 
                self.moving_mean, self.moving_var, 
                eps=1e-5, momentum=0.9)
        return Y

In [39]:
# 使用批量归一化的LeNet
net = nn.Sequential(
    nn.Conv2d(1, 6, 5),
    BatchNorm(6, num_dims=4),
    nn.Sigmoid(),
    nn.MaxPool2d(2, 2),
    nn.Conv2d(6, 16, 5),
    BatchNorm(16, num_dims=4),
    nn.Sigmoid(),
    nn.MaxPool2d(2, 2),
    d2l.FlattenLayer(),
    nn.Linear(16*4*4, 120),
    BatchNorm(120, num_dims=2),
    nn.Sigmoid(),
    nn.Linear(120, 84),
    BatchNorm(84, num_dims=2),
    nn.Sigmoid(),
    nn.Linear(84, 10),
)

In [41]:
batch_size = 256
train_iter, test_iter = d2l.load_data_fashion_mnist(
    batch_size=batch_size)
lr, num_epochs = 0.001, 5
optimizer = torch.optim.Adam(net.parameters(), lr=lr)
train_ch5(net, train_iter, test_iter, batch_size, 
             optimizer, device, num_epochs)

train on  cpu
8063.0 10000
epoch 1, loss 0.9894, train acc 0.792, test acc 0.806, time 34.7
8414.0 10000
epoch 2, loss 0.2253, train acc 0.865, test acc 0.841, time 34.9
8547.0 10000
epoch 3, loss 0.1205, train acc 0.881, test acc 0.855, time 35.1
8598.0 10000
epoch 4, loss 0.0819, train acc 0.887, test acc 0.860, time 34.7
8510.0 10000
epoch 5, loss 0.0611, train acc 0.894, test acc 0.851, time 34.3
