# **Computer Vision & Generative Models seminar**

## **Warm up**

In this seminar we are going to get our hands on (the very tip of the iceberg of) CV and GM: of course, these fields are huge and many things can be tried for which we unfortunately don't have enough time. So we will go for the very basics, by, firstly, implementing the **convolution and pooling operations from scratch** to see, how they change the given image.

In [None]:
import matplotlib.pyplot as plt
import numpy as np
from PIL import Image

In [None]:
!curl "https://cds.cern.ch/images/CMS-PHO-GEN-2008-028-1/file?size=large" --output images/cms.jpg
image_filename = "images/cms.jpg"
grayscale_image = Image.open(image_filename).resize((500, 500), Image.ANTIALIAS).convert("L")
image = np.asarray(grayscale_image).copy()
plt.imshow(image, cmap = 'gray', interpolation='nearest')

In [None]:
def visualization(init_image, image):
    plt.figure(figsize=(10,5))
    plt.subplot(121)
    plt.imshow(init_image,                #numpy array generating the image
               cmap = 'gray',             #color map used to specify colors
               interpolation='nearest'    #algorithm used to blend square colors; with 'nearest' colors will not be blended
              )
    plt.title('Initial image', y=1.02, fontsize=12)

    plt.subplot(122)
    plt.imshow(image, cmap = 'gray', interpolation='nearest')
    plt.title('Image after transformation', y=1.02, fontsize=12)
    
    plt.show()

### Implement convolution! 

![conv](images/conv.jpg)

In [None]:
# convolution for 2d matrix
def convolution(matrix, kernel, padding=0):
    height, width = matrix.shape
    kernel_size, _ = kernel.shape
    out_height = height + 1 - kernel_size + 2 * padding
    out_width  = width  + 1 - kernel_size + 2 * padding
    
    out = np.zeros([out_height, out_width])
    
    for y in range(out_height):
        for x in range(out_width):
            flat_matrix = matrix[y:y + kernel_size, x:x + kernel_size].reshape(-1)
            flat_kernel = kernel.reshape(-1)
            out[y, x] = flat_matrix.dot(flat_kernel)
    
    out = np.clip(out, 0, 255)
    
    return out

In [None]:
kernel = np.array([[1, 1, 1], 
                   [1, -9, 1], 
                   [1, 1, 1]])
conv_image = convolution(image, kernel)

visualization(image, conv_image)

### Implement maxpooling!

![conv](images/maxpool.png)

In [None]:
# max pooling for 2d matrix
def pooling(matrix, kernel_size, stride=1):
    height, width = matrix.shape

    out_height = (height - kernel_size) // stride + 1
    out_width  = (width  - kernel_size) // stride + 1

    out = np.zeros([out_height, out_width])

    for y in range(out_height):
        for x in range(out_width):
            out[y, x] += np.amax(matrix[y * stride:y * stride + kernel_size,
                                        x * stride:x * stride + kernel_size])
    
    return out

In [None]:
pool_image = pooling(image, 9, 9)

visualization(image, pool_image)

## **CV vs MNIST**

_Credit:_ https://nextjournal.com/gkoehler/pytorch-mnist

OK, once we implemented ourselves the very basic CV operations on images, let's try to build something more complex: a simple convolutional neural network. For the task we will try to recognize handwritten digits using the **MNIST** dataset in [PyTorch](https://pytorch.org/tutorials/). Training a classifier on the MNIST dataset can be regarded as the *hello world* of image recognition. 

MNIST contains 70,000 images of handwritten digits: 60,000 for training and 10,000 for testing. The images are grayscale, 28x28 pixels, and centered to reduce preprocessing and get started quicker. 

In [None]:
import torch
import torchvision

### Prepating dataset

Before we start, let's define the hyperparameters we'll be using for the experiment. Here the number of epochs defines how many times we'll loop over the complete training dataset, while `learning_rate` and `momentum` are hyperparameters for the optimizer we'll be using later on.

For repeatable experiments we have to set random seeds for anything using random number generation - this means `numpy` and `random` as well! It's also worth mentioning that cuDNN uses nondeterministic algorithms which can be disabled setting `torch.backends.cudnn.enabled = False`.

In [None]:
n_epochs = 3
batch_size_train = 64
batch_size_test = 1000
learning_rate = 0.01
momentum = 0.5
log_interval = 10

random_seed = 1
torch.backends.cudnn.enabled = False
torch.manual_seed(random_seed)

Now we'll also need [DataLoaders](https://pytorch.org/docs/stable/data.html) for the dataset. This is where TorchVision comes into play - it let's us load the MNIST dataset in a handy way. We'll use a `batch_size` of 64 for training and size 1000 for testing on this dataset. The values `0.1307` and `0.3081` used for the `Normalize()` transformation below are the global mean and standard deviation of the MNIST dataset, we'll take them as a given here.

TorchVision offers a lot of handy transformations, such as cropping or normalization.

In [None]:
train_loader = torch.utils.data.DataLoader(
  torchvision.datasets.MNIST('./mnist_data/', train=True, download=True,
                             transform=torchvision.transforms.Compose([
                               torchvision.transforms.ToTensor(),
                               torchvision.transforms.Normalize(
                                 (0.1307,), (0.3081,))
                             ])),
  batch_size=batch_size_train, shuffle=True)

test_loader = torch.utils.data.DataLoader(
  torchvision.datasets.MNIST('./mnist_data/', train=False, download=True,
                             transform=torchvision.transforms.Compose([
                               torchvision.transforms.ToTensor(),
                               torchvision.transforms.Normalize(
                                 (0.1307,), (0.3081,))
                             ])),
  batch_size=batch_size_test, shuffle=True)

Now let's take a look at some examples. We'll use the `test_loader` for this.

In [None]:
examples = enumerate(test_loader)
batch_idx, (example_data, example_targets) = next(examples)
example_data.shape

In [None]:
fig = plt.figure()
for i in range(6):
    plt.subplot(2,3,i+1)
    plt.tight_layout()
    plt.imshow(example_data[i][0], cmap='gray', interpolation='none')
    plt.title("Ground Truth: {}".format(example_targets[i]))
    plt.xticks([])
    plt.yticks([])

### Building the Network

Now let's go ahead and build our network. We'll use two 2-D convolutional layers followed by two fully-connected (or linear) layers. As activation function we'll choose rectified linear units (ReLUs in short) and as a means of regularization we'll use two dropout layers. In PyTorch a nice way to build a network is by creating a new class for the network we wish to build. Let's import a few submodules here for more readable code.

In [None]:
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

In [None]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
        self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
        self.conv2_drop = nn.Dropout2d()
        self.fc1 = nn.Linear(320, 50)
        self.fc2 = nn.Linear(50, 10)

    def forward(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
        x = x.view(-1, 320)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, training=self.training)
        x = self.fc2(x)
        return F.log_softmax(x)

Broadly speaking we can think of the `torch.nn` layers as which contain trainable parameters while `torch.nn.functional` are purely functional. The `forward()` pass defines the way we compute our output using the given layers and functions. It would be perfectly fine to print out tensors somewhere in the forward pass for easier debugging. This comes in handy when experimenting with more complex models. Note that the forward pass could make use of e.g. a member variable or even the data itself to determine the execution path - and it can also make use of multiple arguments!

Now let's initialize the network and the optimizer.

In [None]:
network = Net()
optimizer = optim.SGD(network.parameters(), lr=learning_rate,
                      momentum=momentum)

### Training the Model

Time to build our training loop. First we want to make sure our network is in training mode. Then we iterate over all training data once per epoch. Loading the individual batches is handled by the DataLoader. First we need to manually set the gradients to zero using `optimizer.zero_grad()` since PyTorch by default accumulates gradients. We then produce the output of our network (forward pass) and compute a negative log-likelihodd loss between the output and the ground truth label. The `backward()` call we now collect a new set of gradients which we propagate back into each of the network's parameters using `optimizer.step()`.

We'll also keep track of the progress with some printouts. In order to create a nice training curve later on we also create two lists for saving training and testing losses. On the x-axis we want to display the number of training examples the network has seen during training. 

In [None]:
train_losses = []
train_counter = []
test_losses = []
test_counter = [i*len(train_loader.dataset) for i in range(n_epochs + 1)]

We'll run our test loop once before even starting the training to see what accuracy/loss we achieve just with randomly initialized network parameters. Can you guess what our accuracy might look like for this case?

In [None]:
def train(epoch):
    network.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        optimizer.zero_grad()
        output = network(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % log_interval == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item()))
            train_losses.append(loss.item())
            train_counter.append((batch_idx*64) + ((epoch-1)*len(train_loader.dataset)))

Now for our test loop. Here we sum up the test loss and keep track of correctly classified digits to compute the accuracy of the network.

In [None]:
def test():
    network.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            output = network(data)
            test_loss += F.nll_loss(output, target, size_average=False).item()
            pred = output.data.max(1, keepdim=True)[1]
            correct += pred.eq(target.data.view_as(pred)).sum()
    test_loss /= len(test_loader.dataset)
    test_losses.append(test_loss)
    print('\nTest set: Avg. loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
    test_loss, correct, len(test_loader.dataset),
    100. * correct / len(test_loader.dataset)))

Using the context manager `no_grad()` we can avoid storing the computations done producing the output of our network in the computation graph.

Time to run the training! We'll manually add a `test()` call before we loop over n_epochs to evaluate our model with randomly initialized parameters.

In [None]:
test()
for epoch in range(1, n_epochs + 1):
    train(epoch)
    test()

### Evaluating model performance

And that's it. With just 3 epochs of training we already managed to achieve 97% accuracy on the test set! We started out with randomly initialized parameters and as expected only got about 10% accuracy on the test set before starting the training.

Let's plot our training curve.

In [None]:
fig = plt.figure()
plt.plot(train_counter, train_losses, color='blue')
plt.scatter(test_counter, test_losses, color='red')
plt.legend(['Train Loss', 'Test Loss'], loc='upper right')
plt.xlabel('number of training examples seen')
plt.ylabel('negative log likelihood loss')

Judging from the *training curve* it looks like we could even continue training for a few more epochs

But before that let's again look at a few examples as we did earlier and compare the model's output.

In [None]:
with torch.no_grad():
    output = network(example_data)

In [None]:
fig = plt.figure()
for i in range(20):
    plt.subplot(4,5,i+1)
    plt.tight_layout()
    plt.imshow(example_data[i][0], cmap='gray', interpolation='none')
    plt.title("Prediction: {}".format(
    output.data.max(1, keepdim=True)[1][i].item()))
    plt.xticks([])
    plt.yticks([])

## **VAE vs MNIST**

_Credit:_ https://github.com/lyeoni/pytorch-mnist-VAE

As we've just learned to classify various MNIST digits, let's step into the land of generative models, and **Variational Autoencoder** in particular. Below, we build it from scratch using PyTorch now to _generate_ MNIST images.

In [None]:
# prerequisites
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.autograd import Variable
from torchvision.utils import save_image

batch_size_train = 100
batch_size_test = 100

# MNIST Dataset
train_dataset = datasets.MNIST('./mnist_data/', train=True, download=True,
            transform=torchvision.transforms.Compose([torchvision.transforms.ToTensor()]))

test_dataset = datasets.MNIST('./mnist_data/', train=False, download=True,
                transform=torchvision.transforms.Compose([torchvision.transforms.ToTensor()]))

# Data Loader (Input Pipeline)
train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=batch_size_train, shuffle=True)
test_loader = torch.utils.data.DataLoader(dataset=test_dataset, batch_size=batch_size_test, shuffle=False)

### Implementing architecture

![vae](images/vae.png)

In [None]:
class VAE(nn.Module):
    def __init__(self, x_dim, h_dim1, h_dim2, z_dim):
        super(VAE, self).__init__()
        
        # encoder part
        self.fc1 = nn.Linear(x_dim, h_dim1)
        self.fc2 = nn.Linear(h_dim1, h_dim2)
        self.fc31 = nn.Linear(h_dim2, z_dim)
        self.fc32 = nn.Linear(h_dim2, z_dim)
        # decoder part
        self.fc4 = nn.Linear(z_dim, h_dim2)
        self.fc5 = nn.Linear(h_dim2, h_dim1)
        self.fc6 = nn.Linear(h_dim1, x_dim)
        
    def encoder(self, x):
        h = F.relu(self.fc1(x))
        h = F.relu(self.fc2(h))
        return self.fc31(h), self.fc32(h) # mu, log_var
    
    def sampling(self, mu, log_var):
        std = torch.exp(0.5*log_var)
        eps = torch.randn_like(std)
        return eps.mul(std).add_(mu) # return z sample
        
    def decoder(self, z):
        h = F.relu(self.fc4(z))
        h = F.relu(self.fc5(h))
        return F.sigmoid(self.fc6(h)) 
    
    def forward(self, x):
        mu, log_var = self.encoder(x.view(-1, 784))
        z = self.sampling(mu, log_var)
        return self.decoder(z), mu, log_var

In [None]:
vae = VAE(x_dim=28*28, h_dim1=512, h_dim2=256, z_dim=2)
vae

Loss explanation: https://stats.stackexchange.com/questions/318748/deriving-the-kl-divergence-loss-for-vaes

### Training the Model

In [None]:
optimizer = optim.Adam(vae.parameters())
# return reconstruction error + KL divergence losses
def loss_function(recon_x, x, mu, log_var):
    BCE = F.binary_cross_entropy(recon_x, x.view(-1, 784), reduction='sum')
    KLD = -0.5 * torch.sum(1 + log_var - mu.pow(2) - log_var.exp())
    return BCE + KLD

In [None]:
def train(epoch):
    vae.train()
    train_loss = 0
    for batch_idx, (data, _) in enumerate(train_loader):
        optimizer.zero_grad()
        
        recon_batch, mu, log_var = vae(data)
        loss = loss_function(recon_batch, data, mu, log_var)
        
        loss.backward()
        train_loss += loss.item()
        optimizer.step()
        
        if batch_idx % 100 == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item() / len(data)))
    print('====> Epoch: {} Average loss: {:.4f}'.format(epoch, train_loss / len(train_loader.dataset)))

In [None]:
def test():
    vae.eval()
    test_loss= 0
    with torch.no_grad():
        for data, _ in test_loader:
            recon, mu, log_var = vae(data)    
            # sum up batch loss
            test_loss += loss_function(recon, data, mu, log_var).item()
        
    test_loss /= len(test_loader.dataset)
    print('====> Test set loss: {:.4f}'.format(test_loss))

In [None]:
for epoch in range(3):
    train(epoch)
    test()

### Generating images

In [None]:
with torch.no_grad():
    z = torch.randn(64, 2)
    sample = vae.decoder(z)
    
images = sample.view(64, 1, 28, 28)
    
fig = plt.figure()
for i in range(12):
    plt.subplot(3,4,i+1)
    plt.tight_layout()
    plt.imshow(images[i][0], cmap='gray', interpolation='none')
    plt.xticks([])
    plt.yticks([])