# 批量归一化

后面的层训练较快

数据在底部，底部训练较慢

底部层变化，所有都跟着变（为什么？）

我们可以在学习底部层的时候避免变化顶部层吗？

### 批量归一化的思想


固定小批量中的均值和方差，然后学习出适合的偏移和方差

可以加速**收敛速度**，但一般不改变模型精度。

我觉得Batch Normalization 有用还是因为解决数值稳定性问题。

另外，有各种各样的normalization，可以看一下：

![normalization](./img/normalization.png)

# coding

In [1]:
import torch
from torch import nn

  from .autonotebook import tqdm as notebook_tqdm


In [11]:
# moving 和 momentum 用于做平滑处理，因为batch的算出来的mean和var噪声太大
def batch_norm(X, gamma, beta, moving_mean, moving_var, eps, momentum):
    if not torch.is_grad_enabled(): # predicting
        X_hat = (X-moving_mean)/torch.sqrt(moving_var + eps)
    else:
        assert len(X.shape) in (2, 4) # MLP or conv2D
        if len(X.shape) == 2: # 二维的话，第一维度是batch， 第二维度是全联接
            # dim = 0: 按行处理，处理特征
            mean = X.mean(dim=0)
            var = ((X-mean)**2).mean(dim=0)
        else:
            mean = X.mean(dim=(0, 2, 3), keepdim=True)
            var = ((X-mean)**2).mean(dim=(0,2,3),keepdim=True)
        X_hat = (X-mean) / torch.sqrt(var + eps)
        moving_mean = momentum * moving_mean + (1 - momentum) * mean
        moving_var = momentum * moving_var + (1-momentum) * var
    # 最后再使用学习的两个参数进行还原（这两个参数的范围我们可以自己控制）
    Y = gamma * X_hat + beta
    
    return Y, moving_mean.data, moving_var.data

In [12]:
class BatchNorm(nn.Module):
    def __init__(self, num_features, num_dims):
        super().__init__()
        if num_dims == 2:
            shape = (1, num_features)
        else:
            shape = (1, num_features, 1, 1)
            
        self.gamma = nn.Parameter(torch.ones(shape))
        self.beta = nn.Parameter(torch.zeros(shape))
        self.moving_mean = torch.zeros(shape)
        self.moving_var = torch.zeros(shape)
        
    def forward(self, X):
        if self.moving_mean.device != X.device:
            self.moving_mean = self.moving_mean.to(X.device)
            self.moving_var = self.moving_var.to(X.device)
        
        Y, self.moving_mean, self.moving_var = batch_norm(
        X, self.gamma, self.beta, self.moving_mean, self.moving_var, eps=1e-5, momentum=0.9)
        
        return Y

In [18]:
test_X = torch.rand(size=(100, 32))
Y = test_X.mean(dim=0,keepdim=True)
Y.shape

torch.Size([1, 32])

## 应用在LeNet

In [13]:
net = torch.nn.Sequential(
        nn.Conv2d(1, 6, kernel_size=5, padding=2), # 6 * 28 * 28
        BatchNorm(6, 4),
        nn.Sigmoid(), 
        nn.AvgPool2d(2, stride=2),# 6 * 14 * 14
        nn.Conv2d(6, 16, kernel_size=5),# 16 * 10 * 10
        BatchNorm(16, 4),
        nn.Sigmoid(),
        nn.AvgPool2d(2, stride=2),# 16 * 5 * 5
        nn.Flatten(),
        nn.Linear(16*5*5, 120),
        BatchNorm(120, 2),
        nn.Sigmoid(),
        nn.Linear(120, 84),
        BatchNorm(84, 2),
        nn.Sigmoid(),
        nn.Linear(84, 10)
        )

In [9]:
import torchvision
from torchvision import transforms
from torch.utils import data

batch_size = 256
def load_data_fashion_mnist(batch_size, resize=None):
    trans = [transforms.ToTensor()]
    # 如果有resize，先做resize，然后再转换到tensor
    if resize:
        trans.insert(0, transforms.Resize(resize))
    trans = transforms.Compose(trans)
    mnist_train = torchvision.datasets.FashionMNIST(root='./data/', train=True, transform=trans, download=False)
    mnist_test = torchvision.datasets.FashionMNIST("./data", train=False, transform=trans, download=False)
    return (data.DataLoader(mnist_train, batch_size, shuffle=True, num_workers=8), 
           data.DataLoader(mnist_test, batch_size, shuffle=True, num_workers=8))
  
train_iter, test_iter = load_data_fashion_mnist(batch_size)

In [14]:
def train(net, train_iter, test_iter, num_epochs, lr):
    def init_weights(m):
        if type(m)==nn.Linear or type(m) == nn.Conv2d:
            nn.init.xavier_uniform_(m.weight)
            
    net.apply(init_weights)
    print('begin training')
    
    optimizer = torch.optim.SGD(net.parameters(), lr=lr)
    loss = nn.CrossEntropyLoss()
    
    for epoch in range(num_epochs):
        net.train()
        for i, (X, y) in enumerate(train_iter):
            optimizer.zero_grad()
            y_hat = net(X)
            L = loss(y_hat, y)
            L.backward()
            optimizer.step()
            
        print(f'loss: {L.sum()}')

lr = 0.9
num_epochs = 10
train(net, train_iter, test_iter, num_epochs, lr)

begin training
loss: 0.47621700167655945
loss: 0.37128904461860657
loss: 0.32932472229003906
loss: 0.430637925863266
loss: 0.29687103629112244
loss: 0.5083425641059875
loss: 0.283053994178772
loss: 0.2738705575466156
loss: 0.36384066939353943
loss: 0.10750596970319748


In [15]:
def accurancy(y, y_hat):
    if len(y_hat) > 1 and len(y_hat[0]) > 1:
        y_hat = y_hat.argmax(axis=1)
    cmp = y_hat == y
    return cmp.sum().item()

# 测试集上的效果
net.eval()

sum_accu = 0
for X, y in test_iter:
    y_hat = net(X)
    sum_accu += accurancy(y, y_hat)
    
print(f'test data accurancy: {sum_accu / 10000}')



# 在训练集上的效果
sum_accu = 0
for X, y in train_iter:
    y_hat = net(X)
    sum_accu += accurancy(y, y_hat)
    

print(f'train data accurancy: {sum_accu / 60000}')


test data accurancy: 0.8999
train data accurancy: 0.9168333333333333


## 在LeNet上使用 group norm

In [20]:
net_use_group_normalization = torch.nn.Sequential(
        nn.Conv2d(1, 6, kernel_size=5, padding=2), # 6 * 28 * 28
        nn.GroupNorm(2, 6), # 6个channel 分成 2个group
        nn.Sigmoid(), 
        nn.AvgPool2d(2, stride=2),# 6 * 14 * 14
        nn.Conv2d(6, 16, kernel_size=5),# 16 * 10 * 10
        nn.GroupNorm(2, 16),
        nn.Sigmoid(),
        nn.AvgPool2d(2, stride=2),# 16 * 5 * 5
        nn.Flatten(),
        nn.Linear(16*5*5, 120),
        nn.Sigmoid(),
        nn.Linear(120, 84),
        nn.Sigmoid(),
        nn.Linear(84, 10)
        )