# Convolutional Neural Networks

Today we focus on basic operations for convolutional layers and on a simple neural network implementation with convolutional layers.

## Operations

In [None]:
import numpy as np
np.random.seed(42)

# Consider a 2d - window of size 12x12
window = np.random.randn(12, 12)

### Strides

In [None]:
# Implement the sliding window method used in convolutional layers
# Dont do the convolution itself, just let the window slide

# For that, use a stride of 2
# Use a kernel of size 3x3 and slide over the "window" variable defined in the previous block
# When sliding the kernel over the window with the stride of 2, set always the maximum number of that area to every single entry
# E. g. if the window you are currently slicing over looks like this:
#  3 4 5
#  5 2 6
#  1 7 2
# then it should be transformed into
#  7 7 7
#  7 7 7
#  7 7 7

# Make sure that you update the result always after one complete sliding over the complete window
# so that for every calculation you use the original values

stride = 2
kernel_size = 3

def sliding_stride_set_max(window, kernel_size, stride=1):
    # Your code here
    return None

max_slide_window = sliding_stride_set_max(window=window, kernel_size=kernel_size, stride=stride)

print(max_slide_window)

### Padding

In [None]:
# Implement a padding of zeros to the following numpy array
to_pad = np.random.randn(6, 6)
print(to_pad)

# The zero padding should pad the array on every side with size 2
# So that they output array of the 6x6 array is of shape 10x10


# Your code here
padding = None
print(padding)

### Pooling

In [None]:
# Implement max pooling with the following array (window)
to_pool = np.random.randn(6, 6)

# Implement the max pooling
# Use a stride of 2 and kernel size of 2x2

stride = 2
kernel_size = 2

def max_pooling(window, kernel_size, stride=1):
    assert kernel_size >= stride
    assert len(window.shape) == 2
    assert window.shape[0] % kernel_size == 0
    assert window.shape[1] % kernel_size == 0

    # Your code here
    return None

pooling = max_pooling(window=to_pool, kernel_size=kernel_size, stride=stride)
print(pooling)

## First CNN PyTorch implementation

In [None]:
# 1) Implement a small neural network with convolutional layers
# 2) Implement a small neural network with linear layers (as in previous practicals)
    # make sure that the networks from 1) and 2) have around the same amount of parameters, to make them comparable to each other
# 3) Compare the accuracy with the networks from previous practicals, can you improve the accuracy with conv layers?

### MNIST dataset

In [None]:
# Use the FashionMNIST dataset as in previous practicals

import torch
torch.manual_seed(42)
torch.cuda.manual_seed(42)
import torchvision
import torchvision.transforms as transforms
import tqdm
import matplotlib.pyplot as plt


def load_fashion_mnist_data(root_path='./data', batch_size=4):
    transform = transforms.Compose(
        [transforms.ToTensor(),
        transforms.Normalize((0.5), (0.5))]
    )

    train_dataset = torchvision.datasets.FashionMNIST(root=root_path, train=True, download=True, transform=transform)
    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=2)

    test_dataset = torchvision.datasets.FashionMNIST(root=root_path, train=False, download=True, transform=transform)
    test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=2)

    return train_loader, test_loader, train_dataset, test_dataset


def plot_fashsion_mnist_images():
    _, _, train_dataset, _ = load_fashion_mnist_data()

    # get first x items
    no = 5
    images = [train_dataset.__getitem__(i)[0].permute(1,2,0) for i in range(0, no)]

    start_pos = 0
    for i in range(no):
        plt.subplot(1, no, i+1)
        plt.tight_layout()
        plt.imshow(images[i], cmap='gray', interpolation='none')
        plt.xticks([])
        plt.yticks([])
    plt.show()

plot_fashsion_mnist_images()

### Training and evaluation methods

In [None]:
# the operate method calls the train and eval method
# so you only have to call the operate method and pass
# the parameters to train and evaluate your model in one line
import torch
from typing import Callable
from torch.optim import Optimizer
from torch.utils.data import DataLoader
import torch.nn as nn

def train_model(
    model: nn.Module, loss_fn: Callable, optimizer: Optimizer,
    train_data_loader: DataLoader, epoch: int, batch_size: int = 4, epochs: int = 10, device: torch.device = 'cpu'
):
    # turn training mode on
    model.train()

    running_loss = []
    running_accuracy = []
    for imgs, targets in tqdm.tqdm(train_data_loader, desc=f'Training iteration {epoch + 1}'):
        imgs, targets = imgs.to(device=device), targets.to(device=device)

        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = model(imgs)
        loss = loss_fn(outputs, targets)
        loss.backward()
        optimizer.step()

        # print statistics
        running_loss.append(loss.item())

        # Calculate the Accuracy (how many of all samples are correctly classified?)
        max_outputs = torch.max(outputs, dim=1).indices
        accuracy = (max_outputs.detach() == targets.detach()).to(dtype=torch.float32).mean()
        running_accuracy.append(accuracy)
    
    return torch.mean(torch.as_tensor(running_loss)), torch.mean(torch.as_tensor(running_accuracy))



def eval_model(
    model: nn.Module, loss_fn: Callable, val_data_loader: DataLoader, epoch: int, batch_size: int = 4, device: torch.device = 'cpu'
):
    # turn evaluation mode on
    model.eval()

    with torch.no_grad():
        running_loss = []
        running_accuracy = []
        for imgs, targets in tqdm.tqdm(val_data_loader, desc=f'Evaluation iteration {epoch + 1}'):
            imgs, targets = imgs.to(device=device), targets.to(device=device)

            # forward + backward + optimize
            outputs = model(imgs)
            loss = loss_fn(outputs, targets)

            # print statistics
            running_loss.append(loss.item())

            # Calculate the Accuracy (how many of all samples are correctly classified?)
            max_outputs = torch.max(outputs, dim=1).indices
            accuracy = (max_outputs.detach() == targets.detach()).to(dtype=torch.float32).mean()
            running_accuracy.append(accuracy)
    
    return torch.mean(torch.as_tensor(running_loss)), torch.mean(torch.as_tensor(running_accuracy))


def operate(model: nn.Module, loss_fn: Callable, optimizer: Optimizer,
    train_data_loader: DataLoader, test_data_loader: DataLoader, batch_size: int = 4, epochs: int = 10
):
    t_losses, t_accs, e_losses, e_accs = [], [], [], []
    for epoch in range(epochs):
        t_loss, t_acc = train_model(model=model, loss_fn=loss_fn, optimizer=optimizer, train_data_loader=train_data_loader, epoch=epoch, batch_size=batch_size, epochs=epochs)
        t_losses.append(t_loss)
        t_accs.append(t_acc)

        e_loss, e_acc = eval_model(model=model, loss_fn=loss_fn, val_data_loader=test_data_loader, epoch=epoch, batch_size=batch_size)
        e_losses.append(e_loss)
        e_accs.append(e_accs)

        print(f'Training epoch {epoch + 1} finished with loss: {t_loss} and accuracy {t_acc}')
        print(f'Eval epoch {epoch + 1} finished with loss: {e_loss} and accuracy {e_acc}')
    
    return t_losses, t_accs, e_losses, e_accs

### Method to count the parameters of a model

In [None]:
# pass your model to this function to get the trainable parameters (weights) of your model
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

### Some hyperparameters

In [None]:
# Init the datasets
batch_size = 4
train_data_loader, test_data_loader, _, _ = load_fashion_mnist_data(batch_size=batch_size)

# set epochs
epochs = 5

## Linear and Conv model

In [None]:
import torch.nn as nn

### Use the implementation of the linear layer model
### and update the layers or extend the layers
### to learn the mnist data
class LinearLayerModel(nn.Module):
    def __init__(self) -> None:
        super().__init__()

        self.model = nn.Sequential(
            nn.Linear(784, X),
            # your layers
            nn.Linear(Y, 10)
        )
    
    def forward(self, imgs):
        imgs = imgs.reshape(imgs.shape[0], -1)

        return self.model(imgs)


# Implement your convolutional layer model
class ConvLayerModel(nn.Module):
    def __init__(self) -> None:
        super().__init__()

        self.model = nn.Sequential(
            nn.Conv2d(, ),
            nn.Linear(Y, 10)
        )
    
    def forward(self, imgs):
        return self.model(imgs)

## Train the convolutional model

In [None]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

In [None]:
from torch.optim import Adam
import torch.nn.functional as F

conv_model = ConvLayerModel()
print(f'Convolutional model has: {count_parameters(conv_model)} parameters to optimize.')
optimizer = 

loss_fn = 

conv_t_losses, conv_t_accs, conv_t_iter_losses, conv_t_iter_accs, conv_e_losses, conv_e_accs, conv_e_iter_losses, conv_e_iter_accs = operate(
    model=conv_model, loss_fn=loss_fn, optimizer=optimizer,
    train_data_loader=train_data_loader, test_data_loader=test_data_loader,
    batch_size=batch_size, epochs=epochs
)


## Train the linear layer model

In [None]:
linear_layer_model = LinearLayerModel()
print(f'Linear layer model has: {count_parameters(linear_layer_model)} parameters to optimize.')
optimizer = 

loss_fn = 

ll_t_losses, ll_t_accs, ll_t_iter_losses, ll_t_iter_accs, ll_e_losses, ll_e_accs, ll_e_iter_losses, ll_e_iter_accs = operate(
    model=linear_layer_model, loss_fn=loss_fn, optimizer=optimizer,
    train_data_loader=train_data_loader, test_data_loader=test_data_loader,
    batch_size=batch_size, epochs=epochs
)

## Train the convolutional network with batch normalization

In [None]:
bn_conv_model = ConvLayerModel(...)
print(f'Convolutional model with batch normalization has: {count_parameters(bn_conv_model)} parameters to optimize.')
optimizer =

loss_fn =

bn_conv_t_losses, bn_conv_t_accs, bn_conv_t_iter_losses, bn_conv_t_iter_accs, bn_conv_e_losses, bn_conv_e_accs, bn_conv_e_iter_losses, bn_conv_e_iter_accs = operate(
    model=bn_conv_model, loss_fn=loss_fn, optimizer=optimizer,
    train_data_loader=train_data_loader, test_data_loader=test_data_loader,
    batch_size=batch_size, epochs=epochs
)

## Train the linear layer network with batch normalization

In [None]:
bn_linear_layer_model = LinearLayerModel(batch_norm=True)
print(f'Linear layer model with batch normalization has: {count_parameters(bn_linear_layer_model)} parameters to optimize.')
optimizer =

loss_fn =

bn_ll_t_losses, bn_ll_t_accs, bn_ll_t_iter_losses, bn_ll_t_iter_accs, bn_ll_e_losses, bn_ll_e_accs, bn_ll_e_iter_losses, bn_ll_e_iter_accs = operate(
    model=bn_linear_layer_model, loss_fn=loss_fn, optimizer=optimizer,
    train_data_loader=train_data_loader, test_data_loader=test_data_loader,
    batch_size=batch_size, epochs=epochs
)

## Plot the results

In [None]:
# If you did not change the name of return values
# from the operate methods of the different models
# then you only have to execute the following
# cells to visualize the results

In [None]:
# number of elements
x_vals = np.arange(0, len(conv_t_iter_losses))
x_vals_test_data = [i for i in range(0, len(conv_t_iter_losses), len(x_vals) // len(ll_e_iter_losses))]

In [None]:
fig = plt.figure(figsize=(10,6))
plt.plot(x_vals, ll_t_iter_losses, label="Linear model (Train)", c="red")
plt.plot(x_vals, bn_ll_t_iter_losses, label="Linear model with batch norm (Train)", c="blue")
plt.plot(x_vals, conv_t_iter_losses, label="Conv model (Train)", c="yellow")
plt.plot(x_vals, bn_conv_t_iter_losses, label="Conv model with batch norm (Train)", c="green")

plt.plot(x_vals_test_data, ll_e_iter_losses, label="Linear model (Test)", linestyle = "dashed", c= "red")
plt.plot(x_vals_test_data, bn_ll_e_iter_losses, label="Linear model with batch norm (Test)", linestyle = "dashed", c="blue")
plt.plot(x_vals_test_data, conv_e_iter_losses, label="Conv model (Test)", linestyle = "dashed", c="yellow")
plt.plot(x_vals_test_data, bn_conv_e_iter_losses, label="Conv model with batch norm (Test)", linestyle = "dashed", c="green")

plt.xlabel('Iterationen * 100')
plt.ylabel('training loss')
plt.title("Training no batch norm vs. batch norm")
plt.legend()
plt.show()
plt.close()