# DSME Bonus Point Assignment I Part A
<div style="text-align: right;font-size: 0.8em">Document Version 1.0.1, released 2021-01-04</div>
For task instructions, refer to the assignment PDF.

* The parts of the code you are to implement are indicated via `# TODO` comments.
* You can use the `# Test code` cells to verify your implementation. However note that these are not the unit tests used for grading.
* Some cells create export file in the `solution/` folder. _Include whole `solution/` folder in your submission_.
* DO NOT CLEAR THE OUTPUT of the notebook you are submitting!

In [5]:
# General imports
from pathlib import Path
import pickle
import numpy as np
import matplotlib.pyplot as plt
from tqdm.auto import tqdm
import torch
import torchvision

# Create solution folder
Path("solution/").mkdir(exist_ok=True)

## Question A1 - Neural Network Class
In this task you will implement the building blocks of a simple neural network class.

For simplicity's sake, this class will only allow feed forward networks.
All individual operations, including activation functions, will be represented as layers in this model.
The abstract `Layer` interface defines three methods:

In [6]:
class Layer:
    def forward(self, x):
        """Forward pass of the layer.
        For convenience, input and output are stored in the layer. 

        Args:
            x: Input for this layer. Shape is (batch_size, num_inputs).

        Returns:
            x: Output of this layer. Shape is (batch_size, num_outputs).
        """
        raise NotImplementedError

    def backward(self, gradient):
        """Backward pass of the layer.
        The incoming gradients are stored in the layer. 

        Args:
            gradient: Incoming gradient from the next layer. Shape is (batch_size, num_outputs).

        Returns:
            gradient: Gradient passed to previous layer. Shape is (batch_size, num_inputs).
        """
        raise NotImplementedError

    def update(self, learn_rate):
        """Perform weight update based on previously stored input and gradients.

        Args:
            learn_rate: Learn rate to use for the update.
        """

### a) Linear Layer

In [7]:
class LinearLayer(Layer):
    def __init__(self, input_dim, output_dim, seed=None):
        """Initialize the layer with random weights."""
        # Initialize weights with the He initializer
        rnd = torch.random.RandomState(seed).randn(input_dim, output_dim)
        self.w = rnd * (2 / input_dim) 
        
        # Initialize bias with zeros
        self.b = np.zeros(output_dim)
        
    def forward(self, x):
        """Forward pass of the layer."""
        self.input = x # Store input
        
        # ********************
        # TODO Compute output
        
        x = x@self.w + self.b
        
        # ********************
        
        self.output = x # Store output
        return x
    
    def backward(self, gradient):
        """Backward pass of the layer."""
        self.gradient = gradient # Store incoming gradient
        
        # ********************
        # TODO Apply transfer gradient
        
        gradient = gradient@self.w.T #local_gradient*upstream_gradient 
        
        # ********************
        
        return gradient
    
    def update(self, learn_rate):
        """Perform weight update"""
        # ********************
        # TODO Update weights and bias
        
        self.w = self.w - learn_rate*(self.input.T@self.gradient)
        self.b = self.b - learn_rate*np.sum(self.gradient, axis=0) 
        
        
        # ********************

In [8]:
# Test code
_test_layer = LinearLayer(4, 5, seed=42)

# Test forward pass
np.testing.assert_array_almost_equal(_test_layer.forward(np.ones((1, 4))), [[-0.38156401, -0.01880618,  0.98566644, -0.88387442, -1.41440743]])

# Test backward pass
np.testing.assert_array_almost_equal(_test_layer.backward(np.ones((1, 5))), [[1.147507,  1.092798, -2.162692, -1.7906 ]])

# Test update
_test_layer.update(0.1)
np.testing.assert_array_almost_equal(_test_layer.w, [[ 0.148357, -0.169132,  0.223844,  0.661515, -0.217077], [-0.217068,  0.689606,  0.283717, -0.334737,  0.17128 ], [-0.331709, -0.332865,  0.020981, -1.05664 , -0.962459], [-0.381144, -0.606416,  0.057124, -0.554012, -0.806152]])
np.testing.assert_array_almost_equal(_test_layer.b, [-0.1, -0.1, -0.1, -0.1, -0.1])
"ok"

AttributeError: module 'torch.random' has no attribute 'RandomState'

### b) Implementing ReLU

In [None]:
class ReLULayer(Layer):      
    def forward(self, x):
        """Forward pass of the ReLU layer."""
        self.input = x # Store input
        
        # ********************
        # TODO Compute output
        
        x = x*(x > 0)
        
        
        # ********************
        
        self.output = x # Store output
        return x
    
    def backward(self, gradient):
        """Backward pass of the ReLU layer."""
        self.gradient = gradient # Store incoming gradient
        
        # ********************
        # TODO Apply transfer gradient
        
        gradient = np.multiply(self.gradient,((self.input > 0)*1))
        
        # ********************
        
        return gradient

In [None]:
# Test Code
_test_layer = ReLULayer()

# Test forward pass
np.testing.assert_array_almost_equal(_test_layer.forward(np.arange(5)-2), [0, 0,  0, 1, 2])

# Test backward pass
assert np.array_equal(_test_layer.backward(np.ones(5)), [0, 0, 0, 1, 1]) or np.array_equal(_test_layer.backward(np.ones(5)), [0, 0, 1, 1, 1]), f"backward expected {[0, 0, 1, 1, 1]} or {[0, 0, 0, 1, 1]}, but got {_test_layer.backward(np.ones(5))}"

"ok"

### c) Softmax Layer

In [None]:
class SoftmaxLayer(Layer):      
    def forward(self, x):
        """Forward pass of the softmax layer."""
        self.input = x # Store input
        
        # ********************
        # TODO Compute output
        
        x = x - np.max(x)
        exp_x = np.exp(x)
        sum_exp_x = np.sum(exp_x)
        x = exp_x/sum_exp_x
        
        
        # ********************
        
        self.output = x # Store output
        return x

In [None]:
# Test code
_test_layer = SoftmaxLayer()

# Test forward pass
np.testing.assert_array_almost_equal(_test_layer.forward([[1, 1]]), [[0.5, 0.5]])
np.testing.assert_array_almost_equal(_test_layer.forward([[1, 2, 3]]), [[0.09003057, 0.24472847, 0.66524096]])
np.testing.assert_array_almost_equal(_test_layer.forward([[0, 1, 2]]), [[0.09003057, 0.24472847, 0.66524096]])
np.testing.assert_array_almost_equal(_test_layer.forward([[0]]), [[1]])
np.testing.assert_array_almost_equal(_test_layer.forward([[0, 10, 100, 1e6]]), [[0, 0, 0, 1]], err_msg="Overflow")

"ok"

### d) Completing the Network

In [None]:
class FeedForwardNet:
    def __init__(self, layers):
        self.layers = layers
    
    def forward(self, x):
        """Forwar pass through the entire network."""
        # ********************
        # TODO Compute output
        # x = ...
        
        for l in self.layers:
            x = l.forward(x)
        
        
        # ********************
        return x
    
    def backward(self, gradient):
        """Backward pass through the entire network."""
        # ********************
        # TODO Back propagate gradients through all layers
        
        for l in reversed(self.layers):
            gradient = l.backward(gradient)
        
        # ********************

    def train(self, x, target, learn_rate):
        """Train one batch."""
        gradient = self.forward(x) - target  # Assumes quadratic loss function
        self.backward(gradient)  # Backprop
        
        # Update weights in all layers
        for layer in self.layers:
            layer.update(learn_rate)
        

In [None]:
# Test Code
_test_net = FeedForwardNet([LinearLayer(4, 10, seed=0),
                      ReLULayer(),
                      LinearLayer(10, 3, seed=1),
                      SoftmaxLayer(),
                     ])
np.testing.assert_array_almost_equal(_test_net.forward([1,2,3,4]), [0.27760362, 0.64002456, 0.08237182])
"ok"

## Question A2 - Autoencoder on MNIST
For more information about MNIST, refer to the assignment PDF.

The next cell will download the dataset and show some examples images.

In [None]:
import torch
import torchvision
from torchvision import transforms

# Load MNIST dataset
mnist = torchvision.datasets.MNIST(root='data/', download=True, transform=transforms.ToTensor())
mnist_test = np.array([x.numpy() for x, y in torchvision.datasets.MNIST(root='data/', train=False, transform=transforms.ToTensor())]).reshape(-1, 28, 28)
data_loader = torch.utils.data.DataLoader(mnist, batch_size=32, shuffle=True)

# Show examples
plt.figure(figsize=(16,2))
for i in range(10):
    plt.subplot(1, 10, i+1)
    
    # Choose first example with the corresponding digit
    example = next(x for x, y in mnist if y == i).reshape(28, 28)
    plt.imshow(example, cmap="gray", vmin=0, vmax=1)
    plt.axis("off")
plt.tight_layout()

### a) Build the Autoencoder

In [None]:
# TODO Build autoencoder
autoencoder = FeedForwardNet([LinearLayer(784,175), ReLULayer(),LinearLayer(175,16), LinearLayer(16,300), ReLULayer(), LinearLayer(300,784)])


In [None]:
# Test Code
assert all(type(l) in [LinearLayer, ReLULayer] for l in autoencoder.layers), "Only use Linear and ReLu layers"
assert autoencoder.layers[0].w.shape[0] == 784, "Wrong size of input"
assert autoencoder.layers[-1].w.shape[1] == 784, "Wrong size of output"
assert min(l.w.shape[0] for l in autoencoder.layers if type(l) == LinearLayer) == 16, "Wrong size of the bottleneck"

"ok"

In [None]:
epochs = 15
learn_rate = 0.0004

losses = np.empty((epochs, 2))
with tqdm(range(epochs)) as pbar:
    for epoch in pbar:
        running_loss = 0.0
        for batch, _ in data_loader:
            # ********************
            # TODO Reshape and train batch
            # batch =
            
            batch = batch.numpy() #np.float64(...)
            batch = batch.reshape(batch.shape[0],28*28) #batch.shape[0] -> -1
            
            autoencoder.train(batch, batch, learn_rate)
            
            
            # ********************

            running_loss += np.sum((autoencoder.layers[-1].output - batch)**2)
        
        # Log losses and update progress bar
        train_loss = running_loss/len(mnist)
        validation_loss = np.sum(np.mean((autoencoder.forward(mnist_test.reshape(-1, 28*28))-mnist_test.reshape(-1, 28*28))**2, axis=0))
        losses[epoch] = [train_loss, validation_loss]
        pbar.set_description(f"Loss {train_loss:.02f}/{validation_loss:.02f}")

# Save model
with open("solution/a2b.pickle", "wb") as f:
    pickle.dump(autoencoder, f)

# Visualize losses
losses = np.array(losses)
plt.plot(np.arange(len(losses)), losses[:,0], label="train")
plt.plot(np.arange(len(losses)), losses[:,1], label="validation")
plt.legend()
plt.xlabel("epoch")
plt.ylabel("MSE")
plt.tight_layout()
plt.savefig("solution/a2b-train.png")

In [None]:
# Show reconstruction
plt.figure(figsize=(12,6))
for i in range(16):
    # Show image
    plt.subplot(4,8,2*i+1)
    plt.imshow(mnist[i][0].reshape((28,28)), cmap="gray")
    plt.axis("off")

    # Show reconstruction
    plt.subplot(4,8, 2*i+2)
    plt.imshow(autoencoder.forward(mnist[i][0].reshape(28*28)).reshape(28,28), cmap="gray", vmin=0, vmax=1)
    plt.axis("off")
plt.tight_layout()
plt.savefig("solution/a2b-rec.png")

### c) Interpolation

In [None]:
# ********************
# TODO Split the autoencoder at the bottleneck into encoder and decoder 

for l in autoencoder.layers:
    if (type(l) == LinearLayer) and (l.w.shape[0] == 16):
        t = autoencoder.layers.index(l)
        
encoder = FeedForwardNet(autoencoder.layers[:t])
decoder = FeedForwardNet(autoencoder.layers[t:])

# ********************

#  Choose two images 
image_a = mnist[5][0]
image_b = mnist[4][0]

# Compute their latent representations
latent_a = encoder.forward(image_a.reshape(28*28))
latent_b = encoder.forward(image_b.reshape(28*28))


steps=10
plt.figure(figsize=(16,2))
for i, f in enumerate(np.linspace(0, 1, steps)):
    plt.subplot(1, steps, i+1)
    
    # ********************
    # TODO Interpolate between latent_a and latent_b with the mixing factor f
    
    latent = (1-f)*latent_a + f*latent_b
    
    # ********************
    
    plt.imshow(decoder.forward(latent).reshape(28, 28), cmap="gray", vmin=0, vmax=1)
    plt.axis("off")
plt.tight_layout()
plt.savefig("solution/a2c.png")

### d) Latent space sampling

In [None]:
# Compute mean and std of latent states
latent_space = encoder.forward(mnist_test.reshape(-1, 28*28))
latent_space_mean = np.mean(latent_space, axis=0)
latent_space_std = np.std(latent_space, axis=0)

# Sample from latent distribution
plt.figure(figsize=(16,2))
for i, f in enumerate(np.linspace(0,1,steps)):
    plt.subplot(1, steps, i+1)
    
    # ********************
    # TODO Sample from the normal distribution with latent_space_mean and latent_space_std
    
    latent = np.random.normal(latent_space_mean,latent_space_std)

    # ********************
    
    plt.imshow(decoder.forward(latent).reshape(28, 28), cmap="gray", vmin=0, vmax=1)
    plt.axis("off")
plt.tight_layout()
plt.savefig("solution/a2d.png")