# Deep Learning with TensorFlow
## Summative assessment 2

#### Instructions

There are 3 questions in this assessment. **You should attempt to answer all questions.** 

You can make imports as and when you need them throughout the notebook, and add code cells where necessary. Make sure your notebook executes correctly in sequence before submitting.

You have 2 hours and 30 minutes to complete this assessment.

#### How to submit

When you have finished and are happy with your code, make sure all cells are executed and their outputs printed, and then save as an html file. You should upload and submit the following files to Turnitin on Blackboard **in a single zip file**:

* Your completed jupyter notebook file (`.ipynb` file format)
* The executed notebook saved as an `.html` file

You are also required to name your zip file as _'SurnameCID.zip'_, e.g. _Smith1234567.zip_. Do not submit multiple files. The submitted ipynb file must produce the output that appears in your html file.

Make sure you submit your files before the exam deadline of **Thursday 12th May 11.40am** (extra 10 minutes is included for preparing and uploading the files).

_Important:_ As this is assessed work you need to work on it individually. It must be your own and unaided work. You are not allowed to discuss the assessment with your fellow students or anybody else. All rules regarding academic integrity and plagiarism apply. Violations of this will be treated as an examination offence. In particular, letting somebody else copy your work constitutes an examination offence. 

In [None]:
import tensorflow as tf
import tensorflow_probability as tfp
tfd = tfp.distributions
tfb = tfp.bijectors
tfpl = tfp.layers

### Question 1 (Total 30 marks)

In this question you will work with the [Human Activity Recognition (HAR) Using Smartphones]

The dataset is also available to download from Blackboard, under Course Content -> Assessments and Mark Schemes -> Summative Assessment 2 - HAR dataset.

You should download this dataset and store it in the current working directory, so that the data files are available inside the folder `'./data/HAR/'`. 

The dataset consists of the readings from an accelerometer (which measures acceleration) carried by a human doing different activities. The six activities are walking horizontally, walking upstairs, walking downstairs, sitting, standing and laying down. The accelerometer is inside a smartphone, and, every 0.02 seconds (50 times per second), it takes six readings: linear and gyroscopic acceleration in the x, y and z directions. Each example in the dataset consists of these six readings recorded at 128 time steps.

The dataset can be loaded by running the following cell.

In [5]:
import numpy as np
from pathlib import Path

har_x_train = np.load(Path("./data/HAR/x_train.npy"))
har_y_train = np.load(Path("./data/HAR/y_train.npy"))
har_x_test = np.load(Path("./data/HAR/x_test.npy"))
har_y_test = np.load(Path("./data/HAR/y_test.npy"))

FileNotFoundError: ignored

In [None]:
# These label names correspond with the integer labels loaded above

class_names = [
    'walking horizontally', 
    'walking upstairs', 
    'walking downstairs', 
    'sitting', 
    'standing', 
    'laying'
]

Define a convolutional autoencoder for the HAR dataset with the specifications given in parts a) and b). This autoencoder model should use 1-dimensional convolutions and maxpooling layers, as well as transposed convolutions and upsampling layers. 

a) Create the encoder network which maps the input data to a latent space of 2 dimensions. It should use the following sequence of layers:

* A convolutional layer with 16 filters, a kernel width of 5, a stride of 1, 'VALID' padding, and a ReLU activation
* A max pooling layer with pooling window width - and stride - of 2
* A convolutional layer with 8 filters, a kernel width of 5, a stride of 1, 'VALID' padding, and a ReLU activation
* A `Flatten` layer, followed by two `Dense` layers with 64 and 16 units respectively and a ReLU activation
* A final `Dense` layer with 2 neurons and no activation function

Create the encoder network using the `Sequential` API and print the model summary. **(10 marks)** 

b) Create the decoder network which maps the latent space encoding back into the data space to reconstruct the original data example. It is structurally built to be (approximately) the opposite to the encoder network. It should use the following sequence of layers:

* Three `Dense` layers with 16, 64 and 464 units respectively, and a ReLU activation
* A layer that reshapes the incoming Tensor to have shape `(58, 8)`
* A transposed convolution with 8 filters, a kernel width of 5, a stride of 1, 'VALID' padding, and a ReLU activation
* A layer that upsamples the incoming Tensor with an upsampling factor of 2
* A transposed convolution with 6 filters, a kernel width of 5, a stride of 1, 'VALID' padding, and no activation function

Create the decoder network using the `Sequential` API and print the model summary. **(10 marks)**

c) You will now build and train the end-to-end autoencoder model, and inspect the latent space encodings before and after training.

* Compute the latent space encodings from the freshly initialised (untrained) encoder network on the first 1000 examples of the test dataset.
* Define a `Model` object for the end-to-end autoencoder that passes the input through the encoder network and then the decoder network. Train this model for 15 epochs on the training data, using a batch size of 64, the RMSprop optimizer with a learning rate of 0.01, and the mean squared error loss function.
* Now compute the latent space encodings from the trained encoder network on the same first 1000 examples of the test dataset.
* Make two scatter plots, one for the untrained encodings and one for the trained encodings of the encoder network. The points in your scatter plots should be coloured according to the different classes in the dataset. 

**(10 marks)**

### Question 2 (Total 30 marks)

a) The 3D Hénon map is the bijective map $f:\mathbb{R}^3\mapsto\mathbb{R}^3$ defined as:

$$
\begin{align}
y_1 &= a_1x_1 + e^{a_2}(x_3 + x_2^2)\\
y_2 &= x_1\\
y_3 &= x_2
\end{align}
$$

where $a_1$ and $a_2$ are parameters.

Implement this mapping as a custom bijector `Henon3d` by subclassing the `tfb.Bijector` class. Your class should implement the following methods:

* `__init__`: initialiser should take arguments `a1` and `a2` and set them as class attributes. It should call the base class initialiser and set `forward_min_event_ndims=1`
* `_forward`: this should take an argument `x`, and implement the transformation $f: (x_1, x_2, x_3)\mapsto (y_1, y_2, y_3)$ above
* `_inverse`: this should take an argument `y`, and implement the inverse transformation $f^{-1}: (y_1, y_2, y_3)\mapsto (x_1, x_2, x_3)$. You should calculate this from the equations above
* `_forward_log_det_jacobian`: this should take an argument `x`, and return the log of the absolute value of the Jacobian determinant $\log \hspace{0.1ex}\left|\det J_f(\mathbf{z}) \right|$. You should calculate this from the equations above. 
* `_inverse_log_det_jacobian`: this should take an argument `y`, and return the log of the absolute value of the Jacobian determinant of the inverse; $\log \hspace{0.1ex}\left|\det J_{f^{-1}}(\mathbf{z}) \right|$.

The `_forward` and `_inverse` methods should account for any additional batch or sample dimensions.

_Hint: in the `_forward` method, you will need to extract the components $x_1$, $x_2$ and $x_3$ by indexing the argument `x` before computing the transformation. You will need to recombine the output components $y_1$, $y_2$ and $y_3$ in the returned Tensor. Similarly for the `_inverse` method._

**(20 marks)**

b) You will now use your `Henon3d` custom bijector to construct a normalising flow model.

* Define a Distribution object called `uniform` for the uniform distribution $U([-1, 1]^3)$. This Distribution object should have an event shape of `[3]` and an empty batch shape.
* Create three instances of the `Henon3d` bijector, called `f1`, `f2` and `f3`. You should pass scalar trainable Variables (initialised randomly) for the initialiser arguments `a1` and `a2` for each one (so you will create 6 `tf.Variable` objects in all).
* Define a bijector object `permutation` that performs the permutation of dimensions 
$$(x_1, x_2, x_3) \mapsto (x_2, x_3, x_1)$$
* Create a `TransformedDistribution` object that transforms samples from the `uniform` distribution through the sequence of bijections:

`f1` $\rightarrow$ `permutation` $\rightarrow$ `f2` $\rightarrow$ `permutation` $\rightarrow$ `f3` $\rightarrow$ `permutation`

* Draw 5 samples from your `TransformedDistribution` object and print out the result. **(10 marks)**

_NB: there is no training required in this question._

### Question 3 (Total 40 marks)

In this question you will implement a conditional VAE algorithm (C-VAE) for the [MNIST](https://keras.io/api/datasets/mnist/) dataset. This dataset can be downloaded by running the following cell.

In [13]:
import tensorflow as tf
import tensorflow_probability as tfp
import sys
import numpy as np
import matplotlib.pyplot as plt
from keras.layers import Input, Dense 
from tensorflow.keras.layers import Flatten, Dense
from keras.layers import BatchNormalization, Dropout, Flatten, Reshape, Lambda
from keras.layers import concatenate
from keras.models import Model
from keras.objectives import binary_crossentropy
from keras.layers.advanced_activations import LeakyReLU
from keras import backend as K
from tensorflow.keras import Sequential


%matplotlib inline


from tensorflow.keras.datasets import mnist
(mnist_x_train, mnist_y_train), (mnist_x_val, mnist_y_val) = mnist.load_data()

array([7, 2, 1, ..., 4, 5, 6], dtype=uint8)

The C-VAE algorithm makes use of the class labels in the MNIST dataset. In particular, the class labels are provided as additional inputs to the encoder and decoder networks. So the approximate posterior $q_\phi(z\mid x, c)$ and the distribution $p_\theta(x \mid z, c)$ are now both conditioned on the class label $c$.

a) Load the training and validation data into `tf.Dataset` objects. Both the training and validation Datasets should return tuples of image and label Tensors.

Process the training and validation Datasets as follows, using the `map` method:

* Rescale the image pixel values to the range $[0, 1]$
* Convert the integer labels to one-hot vectors (_hint: see [`tf.one_hot`](https://www.tensorflow.org/api_docs/python/tf/one_hot)_)
* Return a nested tuple of image and labels Tensors of the form `((image, label), image)`

The training Dataset should be shuffled (with buffer size 500) and both Datasets should be batched with batch size 64. Print the `element_spec` for one of the Datasets. **(8 marks)**

In [21]:
mnist_train_ds = tf.data.Dataset.from_tensor_slices((mnist_x_train, mnist_y_train))

mnist_train_ds


<TensorSliceDataset element_spec=(TensorSpec(shape=(28, 28), dtype=tf.uint8, name=None), TensorSpec(shape=(), dtype=tf.uint8, name=None))>

In [23]:
def rescale_mnist(image, label):
    image = tf.cast(image, tf.float32)
    image = image / 255.
    return image, label

mnist_train_ds = mnist_train_ds.shuffle(500).batch(64)
mnist_train_ds


<BatchDataset element_spec=(TensorSpec(shape=(None, None, 28, 28), dtype=tf.uint8, name=None), TensorSpec(shape=(None, None), dtype=tf.uint8, name=None))>

Your C-VAE algorithm will be trained to optimise the following objective:

$$
\hat{\mathcal{L}}^A(\theta,\phi;x) := \log p_\theta(x \mid \hat{z}, c) + \frac{1}{L} \sum_{j=1}^L \log p(z^{(j)}) − \log q_\phi(z^{(j)}\mid x, c) \\
\text{where } p(z) \text{ is the prior distribution, and }\hat{z}, z^{(j)} \sim q_\phi(z | x, c)
$$

b) Define the prior distribution as a zero mean Gaussian with identity covariance, acting on a one-dimensional latent space of dimension 2.

Build the encoder network as an MLP that uses a probabilistic layer to output a `MultivariateNormalTriL` Distribution as the approximate posterior $q_\phi(z \mid x, c)$. The structure of the encoder should be as follows:

* The encoder should flatten the input image, and concatenate it with the one-hot label vector.
* It should then pass this concatenated Tensor through two `Dense` layers with 512 and 32 units respectively, both with a ReLU activation
* The resulting Tensor should be passed through another `Dense` layer with enough units to parameterise the following `MultivariateNormalTriL` probabilistic layer. This `Dense` layer should not use an activation function.
* The following `MultivariateNormalTriL` probabilistic layer should output a `MultivariateNormalTriL` distribution over an event space of shape `[2]`
* The encoder network should also use a `KLDivergenceAddLoss` layer to add the KL approximation term $\frac{1}{L} \sum_{j=1}^L \log p_\theta(z^{(j)}) − \log q_\phi(z^{(j)}\mid x, c)$ to the loss, using $L=5$

You should create the encoder using the functional API as a multi-input model, where the inputs are the image and the one-hot vector label. Print the encoder summary. 

_Hint: You should use two `Input` layers for the image and labels Tensor respectively, and pass both of these inputs in a list `[image_input, label_input]` to the `inputs` argument of the `Model` class._ **(13 marks)**

In [26]:
from tensorflow.keras.layers import Input, Flatten, Dense
from tensorflow.keras import Model
inputs = Input(shape=(28, 28))
h = Flatten()(inputs)
h = Dense(100, activation='relu')(h)
outputs = Dense(10, activation='softmax')(h)

model = Model(inputs=inputs, outputs=outputs)
model.compile(loss='sparse_categorical_crossentropy', optimizer='sgd')

model.summary()


Model: "model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_2 (InputLayer)        [(None, 28, 28)]          0         
                                                                 
 flatten_1 (Flatten)         (None, 784)               0         
                                                                 
 dense_2 (Dense)             (None, 100)               78500     
                                                                 
 dense_3 (Dense)             (None, 10)                1010      
                                                                 
Total params: 79,510
Trainable params: 79,510
Non-trainable params: 0
_________________________________________________________________


c) Build the decoder network as an MLP according to the following specifications:

* The inputs to the decoder are the 2-dimensional variable $z$ and the one-hot class label $c$
* The decoder should concatenate the latent variable and the class label to make a length-12 Tensor
* This should then be passed through two `Dense` layers with 32 and 512 units respectively, each with a ReLU activation function
* The resulting Tensor should be passed through another `Dense` layer with enough units to parameterise the following `IndependentBernoulli` probabilistic layer. This `Dense` layer should not use an activation function.
* The following `IndependentBernoulli` probabilistic layer should output a `Bernoulli` Distribution object over an event space of shape `[28, 28]`

You should create the decoder using the functional API as a multi-input model, where the inputs are the latent variable and the one-hot vector label. Print the decoder summary. 

_Hint: Use the same `Input` Tensor for the class label that you used for the encoder network. You will only need to create one new `Input` Tensor for the latent variable $z$._ **(9 marks)**

In [None]:

z = Input(shape=(latent_dim, ))
input_lbl_d = Input(shape=(num_classes,), dtype='float32')
x = concatenate([z, input_lbl_d])
x = Dense(256)(x)
x = LeakyReLU()(x)
x = apply_bn_and_dropout(x)
x = Dense(28*28, activation='sigmoid')(x)
decoded = Reshape((28, 28, 1))(x)

models["decoder"] = Model([z, input_lbl_d], decoded, name='Decoder')
models["cvae"]    = Model([input_img, input_lbl, input_lbl_d], 
                            models["decoder"]([models["encoder"]([input_img, input_lbl]), input_lbl_d]), 
                            name="CVAE")
models["style_t"] = Model([input_img, input_lbl, input_lbl_d], 
                            models["decoder"]([models["z_meaner"]([input_img, input_lbl]), input_lbl_d]), 
                            name="style_transfer")


def cvae_loss(x, decoded):
    x = K.reshape(x, shape=(batch_size, 28*28))
    decoded = K.reshape(decoded, shape=(batch_size, 28*28))
    xent_loss = 28*28*binary_crossentropy(x, decoded)
    kl_loss = -0.5 * K.sum(1 + z_log_var - K.square(z_mean) - K.exp(z_log_var), axis=-1)
    return (xent_loss + kl_loss)/2/28/28

return models, cvae_loss

d) Now define a Model object `cvae` for the end-to-end architecture. The model should take the `[image, label]` Tensors as inputs, and return the Bernoulli Distribution object output by the decoder. Print the model summary. **(5 marks)**

In [None]:
models, vae_loss = create_cvae()
cvae = models["cvae"]

e) Define the negative log-likelihood loss function that takes the arguments `y_true` and `y_pred`, where `y_pred` is a Distribution object output by the `cvae` model, and `y_true` is the ground truth Tensor object.

Compile the `cvae` model using the Adam optimizer and the negative log-likelihood loss function. Train the model on the training dataset for 5 epochs. **(5 marks)**

_NB: Even though it is not mathematically correct to use a Bernoulli distribution in the output of the decoder, this is commonly done in practice, and often works better than e.g. a Gaussian distribution._

In [3]:
def get_compiled_model(encoder, decoder):
    cvae = Model(inputs=encoder.inputs, outputs=decoder(encoder.outputs))
    
    def reconstruction_loss(batch_of_images, decoding_dist):
        return -tf.reduce_mean(decoding_dist.log_prob(batch_of_images))
    
    optimizer = tf.keras.optimizers.Adam(learning_rate=0.0005)
    vae.compile(optimizer=optimizer, loss=reconstruction_loss, metrics=['mae'])
    return cvae