# Deep Learning

# Tutorial 12: Batch Normalization

In this tutorial, we will cover:

- Batch Normalization, Normalization Layers

Prerequisites:

- Python, Tensor basics, DNN Training

My contact:

- Niklas Beuter (niklas.beuter@th-luebeck.de)

Course:

- Slides and notebooks will be available at https://lernraum.th-luebeck.de/course/view.php?id=5383

## Expected Outcomes
* Understand why, where and when we could use Batch normalization or other normalization layers

## Following example demonstrates the problem of a covariance shift

In this code, the training data is generated with a mean of 0 and standard deviation of 1, while the test data has a mean of 2 and standard deviation of 1.5. This difference in distributions between the training and test sets is a classic example of covariate shift.

In [None]:
import numpy as np
import matplotlib.pyplot as plt

def generate_data(mean, std, num_samples=1000):
    """ Generate data with a normal distribution based on the mean and std. """
    return np.random.normal(mean, std, num_samples)

def plot_data(train_data, test_data):
    """ Plot histograms of the training and test data. """
    plt.figure(figsize=(10, 6))
    plt.hist(train_data, bins=30, alpha=0.5, label='Training Data')
    plt.hist(test_data, bins=30, alpha=0.5, label='Test Data')
    plt.title('Distribution of Training and Test Data')
    plt.xlabel('Feature Value')
    plt.ylabel('Frequency')
    plt.legend()
    plt.show()

# Parameters for generating data
mean_train, std_train = 0, 1
mean_test, std_test = 2, 1.5  # Different mean and std for test data to introduce covariate shift

# Generate training and test data
train_data = generate_data(mean_train, std_train)
test_data = generate_data(mean_test, std_test)

# Plot the data to show covariate shift
plot_data(train_data, test_data)


The idea is to normalize the data that the distribution shift is removed. This is also valid for different batches of data. We want to remove any shifts there as well.

## Implementation of Batchnorm

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

We create an own batch norm function, which uses the batch *X*, the parameters of the batch norm $\gamma, \beta$, the *moving_mean* and *moving_var*, which are used during test time. The parameter *eps* is used for numerical stability and to avoid a division by 0. The momentum can be optionally used also in batch norm to get a better moving mean and variance.

In [None]:
def batch_norm(X, gamma, beta, moving_mean, moving_var, eps=1e-5, momentum=0.1):
    if not torch.is_grad_enabled():
        X_hat = (X - moving_mean) / torch.sqrt(moving_var + eps)
    else:
        if len(X.shape) == 2:
            mean = X.mean(dim=0)
            var = ((X - mean) ** 2).mean(dim=0)
        else:
            mean = X.mean(dim=(0, 2, 3), keepdim=True)
            var = ((X - mean) ** 2).mean(dim=(0, 2, 3), keepdim=True)

        X_hat = (X - mean) / torch.sqrt(var + eps)
        moving_mean = (1.0 - momentum) * moving_mean + momentum * mean
        moving_var = (1.0 - momentum) * moving_var + momentum * var
    
    return gamma * X_hat + beta, moving_mean, moving_var

In [None]:
class BatchNorm(nn.Module):
    def __init__(self, num_features, num_dims):
        super().__init__()
        shape = (1, num_features, 1, 1) if num_dims == 4 else (1, num_features)
        self.gamma = nn.Parameter(torch.ones(shape))
        self.beta = nn.Parameter(torch.zeros(shape))
        self.moving_mean = torch.zeros(shape)
        self.moving_var = torch.ones(shape)

    def forward(self, X):
        if self.moving_mean.device != X.device:
            self.moving_mean = self.moving_mean.to(X.device)
            self.moving_var = self.moving_var.to(X.device)
        Y, self.moving_mean, self.moving_var = batch_norm(
            X, self.gamma, self.beta, self.moving_mean, self.moving_var)
        return Y

Here, we use the LeNet architecture and add the batch norm to it. We add the batchnorm before the activation function.

In [None]:
class BNLeNetScratch(nn.Module):
    def __init__(self):
        super().__init__()
        self.layers = nn.Sequential(
            # Layer 1: Convolutional + BatchNorm + Activation + Pooling
            # Input 1 channel (we input an image with only gray channel), but 6 channel output
            nn.Conv2d(1, 6, kernel_size=5),
            BatchNorm(6, num_dims=4),
            nn.Sigmoid(),
            nn.AvgPool2d(kernel_size=2, stride=2),
            # Layer 2: Convolutional + BatchNorm + Activation + Pooling
            nn.Conv2d(6, 16, kernel_size=5),
            BatchNorm(16, num_dims=4),
            nn.Sigmoid(),
            nn.AvgPool2d(kernel_size=2, stride=2),
            # Flatten the output for the fully connected layer
            nn.Flatten(),
            # Layer 3: Fully Connected + BatchNorm + Activation
            # Here, we 
            nn.Linear(256, 120),  # Adjust the size according to the output from the last pooling layer
            BatchNorm(120, num_dims=2),
            nn.Sigmoid(),
            # Layer 4: Fully Connected + BatchNorm + Activation
            nn.Linear(120, 84),
            BatchNorm(84, num_dims=2),
            nn.Sigmoid(),
            # Output layer with 10 classes
            nn.Linear(84, 10)
        )

    def forward(self, x):
        return self.layers(x)

In [None]:
def train_and_evaluate(model, train_loader, test_loader, criterion, optimizer, num_epochs=5):
    model.to(device)
    # Training
    for epoch in range(num_epochs):
        model.train()
        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')

    # Evaluation
    model.eval()
    correct = total = 0
    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    accuracy = 100 * correct / total
    print(f'Accuracy: {accuracy:.2f}%')

In [None]:
# The transform compose is used to combine several transformations. In this case, it converts the images and normalizes them
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])
# Load datasets MNIST
#train_dataset = datasets.MNIST(root='./data', train=True, download=True, transform=transform)
#test_dataset = datasets.MNIST(root='./data', train=False, download=True, transform=transform)
# Load datasets FASHIONMNIST
train_dataset = datasets.FashionMNIST(root='./data', train=True, download=True, transform=transform)
test_dataset = datasets.FashionMNIST(root='./data', train=False, download=True, transform=transform)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

In [None]:
# Model, Loss, and Optimizer
model = BNLeNetScratch()
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.03, momentum=0.9)

# Train and Evaluate
train_and_evaluate(model, train_loader, test_loader, criterion, optimizer)

In [None]:
## learned gamma and beta parameters
def print_bn_parameters(model):
    for name, module in model.named_modules():
        if isinstance(module, BatchNorm):  # Check if it is the custom BatchNorm
            gamma = module.gamma.data  # Access the gamma parameter
            beta = module.beta.data  # Access the beta parameter
            print(f"{name} - Gamma: {gamma}, Beta: {beta}")

# Assuming model is your trained model instance
print_bn_parameters(model)

## Standard PyTorch Implementation

In [None]:
## Implementation from PyTorch (using BatchNorm2d for convolution layer and BatchNorm1D for fully connected layer)
class BNLeNet(nn.Module):
    def __init__(self, num_classes=10):
        super(BNLeNet, self).__init__()
        self.layers = nn.Sequential(
            # Layer 1: Convolutional + BatchNorm + Activation + Pooling
            # Input 1 channel (we input an image with only gray channel), but 6 channel output
            nn.Conv2d(1, 6, kernel_size=5),
            nn.BatchNorm2d(6),
            nn.Sigmoid(),
            nn.AvgPool2d(kernel_size=2, stride=2),

            # Layer 2: Convolutional + BatchNorm + Activation + Pooling
            nn.Conv2d(6, 16, kernel_size=5),
            nn.BatchNorm2d(16),
            nn.Sigmoid(),
            nn.AvgPool2d(kernel_size=2, stride=2),

            # Flatten the output for the fully connected layer
            nn.Flatten(),

            # Layer 3: Fully Connected + BatchNorm + Activation
            nn.Linear(256, 120),  # Adjust the size according to the output from the last pooling layer
            nn.BatchNorm1d(120),
            nn.Sigmoid(),

            # Layer 4: Fully Connected + BatchNorm + Activation
            nn.Linear(120, 84),
            nn.BatchNorm1d(84),
            nn.Sigmoid(),

            # Output layer
            nn.Linear(84, num_classes)
        )

    def forward(self, x):
        return self.layers(x)

In [None]:
train_and_evaluate(model, train_loader, test_loader, criterion, optimizer)