In [1]:
# The following lines enable automatic reloading of modules in an IPython/Jupyter environment.
# They work exactly like the commented lines below, but avoid errors when not running in such an environment.
# %load_ext autoreload
# %autoreload 2

try:
    # Only defined inside IPython/Jupyter
    get_ipython().run_line_magic("load_ext", "autoreload")
    get_ipython().run_line_magic("autoreload", "2")
except (NameError, AttributeError):
    # Not running in IPython → just ignore
    pass


In [2]:
# Initializing answer variable
answer = {}


In [3]:
# Some libs that we will use
import torch
import random
import numpy as np
import json_tricks
import lovely_tensors as lt

# Making tensor printouts better
lt.monkey_patch()

# Adding sources to the pythonpath
import sys
root_path = '../../../..'
sys.path.append(root_path)

import dotenv
dotenv.load_dotenv(dotenv.find_dotenv(root_path + '/.env'))

# Importing sources of our project
import src


# Task 0: Prepare the environment

(Take care of reproducibility)

In [4]:
# YOUR CODE HERE


# Task 1: Prepare the data

We will use the same MNIST dataset, so just import it

In [5]:
MNIST_train = ...
MNIST_valid = ...
## YOUR CODE HERE


Check that the data is prepared:

In [6]:
train_sample = MNIST_train[0]
valid_sample = MNIST_valid[0]

X_train = train_sample['image']
X_valid = valid_sample['image']
y_train = train_sample['label']
y_valid = valid_sample['label']


In [7]:
## This checks are for dataset verification
answer['X_train.dtype'] = str(X_train.dtype)
answer['y_train.dtype'] = str(y_train.dtype)
answer['X_valid.dtype'] = str(X_valid.dtype)
answer['y_valid.dtype'] = str(y_valid.dtype)
answer['X_train.shape'] = X_train.shape
answer['X_valid.shape'] = X_valid.shape
answer['y_train.shape'] = y_train.shape
answer['y_valid.shape'] = y_valid.shape
answer['X_train.mean'] = float(X_train.mean())
answer['y_train.mean'] = float(y_train.float().mean())
answer['X_valid.mean'] = float(X_valid.mean())
answer['y_valid.mean'] = float(y_valid.float().mean())

print(X_train.dtype, X_valid.dtype, y_train.dtype, y_valid.dtype)
print(X_train.shape, X_valid.shape, y_train.shape, y_valid.shape)

print(X_train)
print(X_valid)
print(y_train)
print(y_valid)


In [8]:
import matplotlib.pyplot as plt
plt.imshow(X_train)
plt.show()
print(y_train)


# Task 2: Build the code for the Autoencoder

This time we will build an autoencoder network that contains two parts:
- Encoder
- Decoder

Your task will be to build a model that contains two of these modules (each Fully-Connected Neural Network) having in mind that volume of Encoder should be exactly equal to the volume of the decoder.

Both encoder and decoder should have the same structure:
```bash
Linear
Activation
Linear     x N
Activation x N
Linear
```

The code should be done in `src/models/feedforward/autoencoders.py`

Number of linear and activation layers should be taken from the input list of channels.

For instance, if the list with channels is `[28*28, 128, 64]`, the encoder should look like:
```
28 * 28 -> Linear -> Activation -> 128 -> Linear -> Activation -> 64 -> Linear -> 64
```

The decoder should look like:
```
28 * 28 <- Linear <- 128 <- Activation <- Linear <- 64 <- Activation <- Linear <- 64
```

In this case the image is compressed into a vector of size 64

In [9]:
## Load your Autoencoder model
model = src.models.feedforward.autoencoder.Autoencoder([28 * 28, 128, 64])

# Init the model for checking
src.utils.deterministic_init(model)


check_input = {'image': torch.randn(10, 28 * 28)}
with torch.no_grad():
    check_output = model(check_input['image'])

    answer['check_result'] = src.utils.detach_copy(check_output)


# Task 3: Build Variational Autoencoder

The main difference between standard AutoEncoder and Variational AutoEncoder is that in Variational Autoencoder there is a reparametrization block (we will call it `sampler` in our network)

The idea is that this block computes $\mu$ and $\sigma$ from the input signal, and then samples the result from $\mathcal N (\mu, \sigma)$

Your task is to code this block.

Use linear layers to predict a tensor of $\mu$ and a tensor used to calculate $\sigma$ (without activation): $\mathbf z_\mu$, $\mathbf z_{\sigma}$
- `mu_regressor`
- `logvar_regressor`

After that, calculate:
- $\mu = \mathbf z_{\mu}$
- $\sigma = \exp(\frac{1}{2} \mathbf z_{\sigma})$

In the end, you should calculate the result as
- In case of training mode:
    $\mathbf z_{out} = \mu + \mathbf \epsilon * \sigma,$ where $\epsilon \propto \mathcal N (0, 1)$
- In case of evaluation mode:
    $\mathbf z_{out} = \mu$

Note that all the tensors, $\mu$, $\sigma$ and $\epsilon$ should have the same shape as the input tensor.

Add the abovecreated block in the middle of your Autoencoder to get VAE.

In [None]:
## Load your VAE model
model = src.models.feedforward.autoencoder.VAE([28 * 28, 128, 64])

# Init the model for checking
src.utils.deterministic_init(model)


check_input = {'image': torch.randn(10, 28 * 28)}
with torch.no_grad():
    check_output = model(check_input['image'])

    answer['check_result'] = src.utils.detach_copy(check_output)


# Task 3: Create loss function and optimizer



In this case we will use Adam with standard parameters as optimizer (`lr=1.0e-4` and weight decay `1.0e-4`)

As a loss function we will use MAE loss:

$L(\mathbf x^*, \mathbf x) = \frac{1}{S}\sum_{s=1}^S |x^*_s - x^s|,$

where index $s$ runs through all the pixels of all images (predicted and input)

Also create yet another loss function for regularization of the Variational AutoEncoder (VAE)

It works like this:

$L_{reg} = \frac{1}{2S}\sum_{s=1}^{S} (\mu_s^2 + \sigma_s^2 - \log \sigma_s^2 - 1)$

(this value is KL distance beteween normal distribution $\mathcal N (0, 1)$ and $\mathcal N (\mu_s, \sigma_s)$). What we are doing is we are training the model so that it minimizes the loss above, but does not deviate too much from univariate normal distribution.

In [10]:
import torchmetrics


def loss(pred, targ):
    val = 0
    ## YOUR CODE HERE
    return val

epochs = 10

optimizer = ...

## YOUR CODE HERE


In [None]:
# Checking loss
answer['loss_val'] = loss(torch.randn(5, 28, 28), torch.randn(5, 28, 28))

# Adding optimizer to the checker
answer['optimizer'] = str(optimizer)
print(optimizer)


# Task 4: Create DataLoaders

In [12]:
from tqdm import trange, tqdm
import neptune
import os

batch_size = 32
n_epochs = 10

train_dl = ...
valid_dl = ...

## YOUR CODE HERE


# Task 5: Create training loop

The task is the same:
- load samples batch-by-batch
- send the smaples through the model
- calculate loss function
- backpropagate the gradient
- step the optimizer
- perform validation to check overfitting

In [13]:
def train_model(model, n_epochs, train_dl, valid_dl, loss, optimizer):
    train_loss_history = []
    valid_loss_history = []

    ## YOU SHOULD ADD THE nepune.ai TOKEN BEFORE RUNNING THE TRAINING
    run = neptune.init_run(
        ## YOUR CODE HERE
    )

    for epoch in range(n_epochs):
        train_loss = {'enumerator': 0.0, 'denominator': 1.0e-8}
        valid_loss = {'enumerator': 0.0, 'denominator': 1.0e-8}

        for batch in tqdm(train_dl):
            ...
            ## YOUR TRAINING CODE HERE

        with torch.no_grad():
            for valid_batch in tqdm(valid_dl):
                ...
                ## YOUR EVALUATION CODE HERE

            finalized_train_loss = train_loss['enumerator'] / train_loss['denominator']
            finalized_valid_loss = valid_loss['enumerator'] / valid_loss['denominator']

            # Logging the progress to neptune

            train_loss_history.append(finalized_train_loss)
            valid_loss_history.append(finalized_valid_loss)

    run.stop()
    return train_loss_history, valid_loss_history

train_loss_history, valid_loss_history = train_model(model, n_epochs, train_dl, valid_dl, loss, optimizer)


In [14]:
answer['train_loss_history'] = train_loss_history
answer['valid_loss_history'] = valid_loss_history

json_tricks.dump(answer, '.answer.json')


# Task 7. Experiment time!

Create a new autoencoder model and train it for 100 epochs (or more if you like)

In [33]:
n_epochs = 150
model = src.models.feedforward.autoencoder.Autoencoder([28 * 28, 128, 64])
optimizer = torch.optim.Adam(model.parameters(), lr=1.0e-3, weight_decay=1.0e-8)

train_loss_history, valid_loss_history = train_model(model, n_epochs, train_dl, valid_dl, loss, optimizer)


In [34]:
with torch.no_grad():
    img1 = MNIST_valid[0]['image']
    img2 = MNIST_valid[1]['image']

    plt.figure()
    plt.imshow(img1.reshape([28, 28]))
    plt.figure()
    plt.imshow(img2.reshape([28, 28]))

    emb1 = model.encoder(img1.reshape([1, -1]))
    emb2 = model.encoder(img2.reshape([1, -1]))

    rec1 = model.decoder(emb1)
    rec2 = model.decoder(emb2)

    plt.figure()
    plt.imshow(rec1.reshape([28, 28]))
    plt.figure()
    plt.imshow(rec2.reshape([28, 28]))


In [None]:
figures = []

with torch.no_grad():
    for a in np.linspace(0, 1, 10):
        rec = model.decoder(a * emb1 + (1 - a) * emb2)
        figures.append(rec)

plt.imshow(np.concatenate(figures, axis=1))
