# **Batch Normalization**

In [1]:
import torch
import torch.nn as nn
import time
import torchvision.datasets as datasets
from torchvision import transforms

def batch_norm(X, gamma, beta, moving_mean, moving_var, eps, momentum):
    # Use is_grad_enabled to determine whether the current mode is training mode
    # or prediction mode.
    if not torch.is_grad_enabled():
        # use the moving average
        X_hat = (X - moving_mean) / torch.sqrt(moving_var + eps)
    else:
        assert len(X.shape) in (2, 4)
        if len(X.shape) == 2: # fully connected layer
            mean = X.mean(axis=0)
            var = ((X - mean) ** 2).mean(axis=0)
        else: # convolution, hence per layer
            mean = X.mean(axis=(0, 2, 3), keepdims=True)
            var = ((X - mean) ** 2).mean(axis=(0, 2, 3), keepdims=True)
        # In training mode, the current mean and variance are used for the 
        #standardization.
        X_hat = (X - mean) / torch.sqrt(var + eps)
        # Update the mean and variance of the moving average.
        moving_mean = momentum * moving_mean + (1.0 - momentum) * mean
        moving_var = momentum * moving_var + (1.0 - momentum) * var
    Y = gamma * X_hat + beta # Scale and shift.
    return Y, moving_mean, moving_var

## **BatchNorm Layer**

In [2]:
class BatchNorm(nn.Module):
    def __init__(self, num_features, num_dims):
        super(BatchNorm, self).__init__()
        if num_dims == 2:
            shape = (1, num_features)
        else:
            shape = (1, num_features, 1, 1)
        # The scale parameter and the shift parameter involved in gradient finding
        # and iteration are initialized to 0 and 1 respectively.
        self.gamma = nn.Parameter(torch.ones(shape))
        self.beta = nn.Parameter(torch.zeros(shape))
        # All the variables not involved in gradient finding and iteration are 
        # initialized to 0 on the CPU.
        self.moving_mean = torch.zeros(shape)
        self.moving_var = torch.zeros(shape)
    def forward(self, X):
        # If X is not on the CPU, copy moving_mean and moving_var to the device 
        # where X is located.
        if self.moving_mean.device != X.device:
            self.moving_mean = self.moving_mean.to(X.device)
            self.moving_var = self.moving_var.to(X.device)
        # Save the updated moving_mean and moving_var.
        Y, self.moving_mean, self.moving_var = batch_norm(
            X, self.gamma, self.beta, self.moving_mean, 
            self.moving_var, eps=1e-5, momentum=0.9)
        return Y

## **LeNet with Batch Norm**

In [3]:
class flatten(nn.Module):
    def forward(self, input):
        return input.view(input.size(0), -1)

net = nn.Sequential(nn.Conv2d(1, 6, kernel_size=5),
        BatchNorm(6, num_dims=4),
        nn.Sigmoid(),
        nn.MaxPool2d(kernel_size=2, stride=2),
        nn.Conv2d(6, 16, kernel_size=5),
        BatchNorm(16, num_dims=4),
        nn.Sigmoid(),
        nn.MaxPool2d(kernel_size=2, stride=2),
        flatten(),
        nn.Linear(256, 120),
        BatchNorm(120, num_dims=2),
        nn.Sigmoid(),
        nn.Linear(120, 84),
        BatchNorm(84, num_dims=2),
        nn.Sigmoid(),
        nn.Linear(84, 10))

def weights_init(m):
    classname = m.__class__.__name__
    if classname.find('Conv') != -1:
        torch.nn.init.xavier_uniform_(m.weight)
        m.bias.data.normal_(0.0, 0.01)
    elif classname.find('Linear') != -1:
        torch.nn.init.xavier_uniform_(m.weight)
        m.bias.data.normal_(0.0, 0.01)


def evaluate_accuracy(data_iter, net):
    """Evaluate accuracy of a model on the given data set."""
    acc_sum,n = 0,0
    for (imgs, labels) in data_iter:
        # send data to the GPU if cuda is availabel
        if torch.cuda.is_available():
            imgs = imgs.cuda()
            labels = labels.cuda()
        net.eval()
        with torch.no_grad():
            labels = labels.long()
            acc_sum += torch.sum((torch.argmax(net(imgs), dim=1) == labels)).float()
            n += labels.shape[0]
    return acc_sum.item()/n

In [4]:
if torch.cuda.is_available():
    print('Training using GPU.')
    net.cuda()
else:
    print('Training using CPU.')

#Initialize network parameters.
net.apply(weights_init)

lr, num_epochs, batch_size = 1.0, 5, 256
optimizer = torch.optim.SGD(net.parameters(), lr=lr)

transform = transforms.Compose([transforms.ToTensor()]) 
mnist_trainset = datasets.FashionMNIST(root='./data', train=True, download=True, transform=transform)
mnist_testset = datasets.FashionMNIST(root='./data', train=False, download=True, transform=transform)
# Loading training set and test set using DataLoader.
train_loader = torch.utils.data.DataLoader(mnist_trainset, batch_size=batch_size,
    shuffle=True, num_workers=0)
test_loader = torch.utils.data.DataLoader(mnist_testset, batch_size=batch_size,
    shuffle=True, num_workers=0)

criterion = nn.CrossEntropyLoss()

for epoch in range(num_epochs):
    net.train() # Switch to training mode
    n, start = 0, time.time()
    train_l_sum = torch.tensor([0.0], dtype=torch.float32)
    train_acc_sum = torch.tensor([0.0], dtype=torch.float32)
    train_iter = iter(train_loader)
    for X, y in train_iter:
        optimizer.zero_grad()
        if torch.cuda.is_available():
            X = X.cuda()
            y = y.cuda()
            train_l_sum = train_l_sum.cuda()
            train_acc_sum = train_acc_sum.cuda()
        y_hat = net(X)
        loss = criterion(y_hat, y)
        loss.backward()
        optimizer.step()
        with torch.no_grad():
            y = y.long()
            train_l_sum += loss.float()
            train_acc_sum += (torch.sum((torch.argmax(y_hat, dim=1) == y))).float()
            n += y.shape[0]

    test_acc = evaluate_accuracy(iter(test_loader), net) 
    print('epoch %d, loss %.4f, train acc %.3f, test acc %.3f, time %.1f sec'\
        % (epoch + 1, train_l_sum/n, train_acc_sum/n, test_acc, time.time() - start))

Training using GPU.
epoch 1, loss 0.0025, train acc 0.766, test acc 0.821, time 5.3 sec
epoch 2, loss 0.0015, train acc 0.858, test acc 0.797, time 5.3 sec
epoch 3, loss 0.0013, train acc 0.877, test acc 0.734, time 5.1 sec
epoch 4, loss 0.0012, train acc 0.886, test acc 0.784, time 5.3 sec
epoch 5, loss 0.0012, train acc 0.893, test acc 0.805, time 5.2 sec


# **BatchNorm with PyTorch Built-in Function**

The performance of two types of implementation are quite similar, while the PyToch built-in function is around 10% faster. This is due to the PyTorch build-in function was written and compiled in C++.

In [5]:
net = nn.Sequential(
        nn.Conv2d(1, 6, kernel_size=5),
        nn.BatchNorm2d(6),
        nn.Sigmoid(),
        nn.MaxPool2d(kernel_size=2, stride=2),
        nn.Conv2d(6, 16, kernel_size=5),
        nn.BatchNorm2d(16),
        nn.Sigmoid(),
        nn.MaxPool2d(kernel_size=2, stride=2),
        flatten(),
        nn.Linear(256, 120),
        nn.BatchNorm1d(120),
        nn.Sigmoid(),
        nn.Linear(120, 84),
        nn.BatchNorm1d(84),
        nn.Sigmoid(),
        nn.Linear(84, 10))


if torch.cuda.is_available():
    print('Training using GPU.')
    net.cuda()
else:
    print('Training using CPU.')

#Initialize network parameters.
net.apply(weights_init)

lr, num_epochs, batch_size = 1.0, 5, 256
optimizer = torch.optim.SGD(net.parameters(), lr=lr)

transform = transforms.Compose([transforms.ToTensor()]) 
mnist_trainset = datasets.FashionMNIST(root='./data', train=True, download=True, transform=transform)
mnist_testset = datasets.FashionMNIST(root='./data', train=False, download=True, transform=transform)
# Loading training set and test set using DataLoader.
train_loader = torch.utils.data.DataLoader(mnist_trainset, batch_size=batch_size,
    shuffle=True, num_workers=0)
test_loader = torch.utils.data.DataLoader(mnist_testset, batch_size=batch_size,
    shuffle=True, num_workers=0)

criterion = nn.CrossEntropyLoss()

for epoch in range(num_epochs):
    net.train() # Switch to training mode
    n, start = 0, time.time()
    train_l_sum = torch.tensor([0.0], dtype=torch.float32)
    train_acc_sum = torch.tensor([0.0], dtype=torch.float32)
    train_iter = iter(train_loader)
    for X, y in train_iter:
        optimizer.zero_grad()
        if torch.cuda.is_available():
            X = X.cuda()
            y = y.cuda()
            train_l_sum = train_l_sum.cuda()
            train_acc_sum = train_acc_sum.cuda()
        y_hat = net(X)
        loss = criterion(y_hat, y)
        loss.backward()
        optimizer.step()
        with torch.no_grad():
            y = y.long()
            train_l_sum += loss.float()
            train_acc_sum += (torch.sum((torch.argmax(y_hat, dim=1) == y))).float()
            n += y.shape[0]

    test_acc = evaluate_accuracy(iter(test_loader), net) 
    print('epoch %d, loss %.4f, train acc %.3f, test acc %.3f, time %.1f sec'\
        % (epoch + 1, train_l_sum/n, train_acc_sum/n, test_acc, time.time() - start))

Training using GPU.
epoch 1, loss 0.0025, train acc 0.771, test acc 0.476, time 4.7 sec
epoch 2, loss 0.0015, train acc 0.860, test acc 0.622, time 4.7 sec
epoch 3, loss 0.0014, train acc 0.874, test acc 0.766, time 4.7 sec
epoch 4, loss 0.0013, train acc 0.884, test acc 0.734, time 4.6 sec
epoch 5, loss 0.0012, train acc 0.891, test acc 0.824, time 4.7 sec
