# Homework 2 - Unsupervised Deep Learning
<center><b>Student</b>: Francesco Manzali (1234428)</center>

## General overview
In this homework you will learn how to implement and test neural network models for solving unsupervised problems. For simplicity and to allow continuity with the kind of data you have seen before, the homework will be based on images of handwritten digits (MNIST). However, you can optionally explore different image collections (e.g., [Caltech](http://www.vision.caltech.edu/Image_Datasets/Caltech101/) or [Cifar](https://www.cs.toronto.edu/~kriz/cifar.html)) or other datasets based on your interests. The basic tasks for the homework will require to test and analyze the convolutional autoencoder implemented during the Lab practice. If you prefer, you can opt for a fully-connected autoencoder, which should achieve similar performance considering the relatively small size of the MNIST images. As for the previous homework, you should explore the use of advanced optimizers and regularization methods. Learning hyperparameters should be tuned using appropriate search procedures, and final accuracy should be evaluated using a cross-validation setup. More advanced tasks will require the exploration of denoising and variational architectures.



## Technical notes
The homework should be implemented in Python using the PyTorch framework. The student can explore additional libraries and tools to implement the models; however, please make sure you understand the code you are writing because during the exam you might receive specific questions related to your implementation. The entire source code required to run the homework must be uploaded as a compressed archive in a Moodle section dedicated to the homework. If your code will be entirely included in a single Python notebook, just upload the notebook file.




## Final report
Along with the source code, you must separately upload a PDF file containing a brief report of your homework. The report should include a brief Introduction on which you explain the homework goals and the main implementation strategies you choose, a brief Method section where you describe your model architectures and hyperparameters, and a Result section where you present the simulation results. Total length must not exceed 6 pages, though you can include additional tables and figures in a final Appendix (optional).




## Grade
The maximum grade for this homework will be **8 points**. Points will be assigned based on the correct implementation of the following items:
*	1 pt: implement and test (convolutional) autoencoder, reporting the trend of reconstruction loss and some examples of image reconstruction
*	1 pt: explore advanced optimizers and regularization methods 
*	1 pt: optimize hyperparameters using grid/random search and cross-validation
*	1 pt: implement and test denoising (convolutional) autoencoder
*	1 pt: fine-tune the (convolutional) autoencoder using a supervised classification task (you can compare classification accuracy and learning speed with results achieved in homework 1)
*	1 pt: explore the latent space structure (e.g., PCA, t-SNE) and generate new samples from latent codes
*	2 pt: implement variational (convolutional) autoencoder or GAN




## Deadline
The complete homework (source code + report) must be submitted through Moodle at least 10 days before the chosen exam date.

In [None]:
import sys

#Install all the required packages with the correct versions in the current environment.
#Note: this notebook has been run with Python 3.9.0 on a 64-bit Windows 10 machine, with a AMD Ryzen 7 1700 8-core CPU, GTX 970 GPU, and 32GB of DDR4 RAM. 
!{sys.executable} -m pip install numpy~=1.20.1 pandas~=1.2.3 matplotlib~=3.3.4 hiplot~=0.1.24 scipy~=1.6.0 tqdm~=4.59.0 torch~=1.8.0 torchvision~=0.9.0 optuna~=2.7.0 pytorch_lightning~=1.3.4 torchmetrics~=0.3.2 torchsummaryX psutil

In [38]:
#Autoreload imported functions
%load_ext autoreload
%autoreload 2

%matplotlib widget 
#Or use %matplotlib notebook
#I'm running jupyter inside of Visual Studio Code, so %matplotlib widget is needed for me.

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [39]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from mpl2latex import mpl2latex, latex_figsize
from plotting import COLUMNWIDTH

from pathlib import Path
import json
import random
import joblib

import logging 
logging.basicConfig(filename='01_Conv_Autoencoder.log', encoding='utf-8', level=logging.INFO, filemode='w')

In [3]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, TensorDataset, random_split
import torchvision

import pytorch_lightning as pl

import optuna
from optuna.integration import PyTorchLightningPruningCallback

print("Torch version:", torch.__version__)

#Select device for training
#device = "cpu" #For this very simple dataset it is actually faster
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu") #Uncomment for GPU 

logging.info(f"Using {device} for training")
print(f'Using "{device}" for training')

Torch version: 1.8.0
Using "cuda" for training


In [4]:
from pytorch_lightning.callbacks import ModelCheckpoint
from pytorch_lightning.callbacks.early_stopping import EarlyStopping 
from callbacks import MetricsCallback, LitProgressBar, ReconstructedImage

# 1. Data

In [47]:
from data import MNISTDataModule

#Note: the main difference from the previous homework is that samples here are not normalized to have mean=0 and std=1, but just to be in [0, 1]

mnist = MNISTDataModule(batch_size=64) #No data augmentation (to make training faster)
mnist.setup()

In [6]:
from plotting import show_sample

sample, label = mnist.train_dataset[1]
show_sample(sample, label)
print(f"Min: {torch.min(sample)}, Max: {torch.max(sample)}") #Check the normalization (should be about [0,1])

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

Min: 0.0, Max: 1.0


# 2. Models
## 2.1 Convolutional AutoEncoder

In [41]:
class ConvAutoEncoder(pl.LightningModule):
    """Convolutional AutoEncoder for MNIST"""
    
    #Output shape of convolutional layers:
    #x_out = floor([x_in + 2 * x_pad - (x_ker - 1) - 1] / x_stride + 1)

    def __init__(self, hyper_parameters : dict = None, *args, **kwargs):
        super().__init__()

        if hyper_parameters is None:
            self.hyper_parameters = { #Default values
                'optimizer' : 'Adam',
                'encoded_space_dim' : 10,
                'learning_rate' : 1e-3,
                'dropout_rate' : 0
            }
            self.hyper_parameters.update(**kwargs)
        else:
            self.hyper_parameters = hyper_parameters

        self.save_hyperparameters() #store hyper_parameters in checkpoints

        self.dropout_rate = self.hyper_parameters['dropout_rate']
        self.encoded_space_dim = self.hyper_parameters['encoded_space_dim']

        #in = (1, 28, 28)
        self.encoder_cnn = nn.Sequential(
            nn.Conv2d(1, 8, kernel_size=3, stride=2, padding=1), #out = (8, 14, 14)
            nn.ReLU(),
            nn.Dropout2d(self.dropout_rate),
            nn.Conv2d(8, 16, kernel_size=3, stride=2, padding=1), #out = (16, 7, 7)
            nn.ReLU(),
            nn.Dropout(self.dropout_rate),
            nn.Conv2d(16, 32, kernel_size=3, stride=2, padding=0), #out = (32, 3, 3)
            nn.ReLU(),
            nn.Dropout(self.dropout_rate),
            nn.Flatten(start_dim=1),
            nn.Linear(3 * 3 * 32, 64),
            nn.ReLU(),
            nn.Dropout(self.dropout_rate),
            nn.Linear(64, self.encoded_space_dim)
        )

        self.decoder_cnn = nn.Sequential( #Specular
            nn.Linear(self.encoded_space_dim, 64),
            nn.ReLU(),
            nn.Dropout(self.dropout_rate),
            nn.Linear(64, 3 * 3 * 32),
            nn.ReLU(),
            nn.Dropout(self.dropout_rate),
            nn.Unflatten(dim=1, unflattened_size=(32, 3, 3)),
            nn.ConvTranspose2d(32, 16, kernel_size=3, stride=2, output_padding=0),
            nn.ReLU(),
            nn.Dropout(self.dropout_rate),
            nn.ConvTranspose2d(16, 8, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.ReLU(),
            nn.Dropout2d(self.dropout_rate),
            nn.ConvTranspose2d(8, 1, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.Sigmoid() #Force output to be in [0, 1] (valid pixel values)
        )
    
    def forward(self, x : "torch.tensor"):
        embedding = self.encoder_cnn(x)

        return embedding
    
    def training_step(self, batch, batch_idx):
        x, _ = batch #ignore labels

        internal_repr = self.encoder_cnn(x)
        
        x_hat = self.decoder_cnn(internal_repr)

        loss = F.mse_loss(x_hat, x, reduction='mean')
        self.log('train_loss', loss)

        return loss

    def configure_optimizers(self):
        optimizer = getattr(optim, self.hyper_parameters['optimizer'])(self.parameters(), lr=self.hyper_parameters['learning_rate']) #, weight_decay=1e-5)
        return optimizer

    def validation_step(self, batch, batch_idx, log_name='val_loss'):
        x, _ = batch

        internal_repr = self.encoder_cnn(x)
        
        x_hat = self.decoder_cnn(internal_repr)

        loss = F.mse_loss(x_hat, x)
        self.log(log_name, loss, prog_bar=True)

        return loss

    def test_step(self, batch, batch_idx, log_name='test_loss'):
        
        return self.validation_step(batch, batch_idx, log_name='test_loss')

### 2.1.1 Testing Cross-Validation
Is cross-validation necessary for autoencoders trained on MNIST? One way to find out is to use it, and see how much the losses computed on each fold are different. 

If they are really close, then any fixed train/val split would suffice, since the size of the validation dataset would be enough for estimating the generalization performance.

In [8]:
#---Cross Validation Test---# #Takes about 2 min to complete.

from data import MNIST_CV

mnist_cv = MNIST_CV()

fold_metrics = []

for fold_idx, (train_dataloader, val_dataloader) in enumerate(mnist_cv.get_fold()):
    logging.info(f"Cross-validation - Fold {fold_idx} - Started")
    
    bar = LitProgressBar() #Fixes a tqdm bug in Jupyter by disabling the validation progressbar during training
    metrics_callback = MetricsCallback()
    cnn_autoencoder = ConvAutoEncoder()
    trainer = pl.Trainer(gpus=1, max_epochs=3, callbacks=[metrics_callback, bar], checkpoint_callback=False)
    trainer.fit(cnn_autoencoder, train_dataloader, val_dataloader)

    fold_metrics.append(metrics_callback.metrics)

Validation sanity check: 0it [00:00, ?it/s]



Training: 0it [00:00, ?it/s]

Validation sanity check: 0it [00:00, ?it/s]

Training: 0it [00:00, ?it/s]

Validation sanity check: 0it [00:00, ?it/s]

Training: 0it [00:00, ?it/s]

Validation sanity check: 0it [00:00, ?it/s]

Training: 0it [00:00, ?it/s]

In [9]:
final_val_losses = [metric[-1]['val_loss'] for metric in fold_metrics]
print(f"Average final loss: {np.mean(final_val_losses):.4f}, std: {np.std(final_val_losses):.4f}")

Average final loss: 0.0224, std: 0.0004


Note that the losses are very similar in all cross-validation folds (the standard deviation is very low).
This suggests that the dataset is sufficiently big that any of its k-parts (with $k=4$ here) suffices for reaching similar results. Thus, a full k-fold cross-validation can be neglected, and a fixed validation set may be used instead, reducing the amount of computation needed.

### 2.1.2 Training

A CNN autoencoder is trained to compress MNIST samples to just two numbers. This allows to test if the model works, while also being able to plot directly the latent representations. 

In [42]:
from typing import Tuple

MODEL_SAVE_FOLDER = Path("SavedModels")

#---Helper functions for managing checkpoints---#
def checkpoint_path(name : str):
    """Given a model `name`, return a path to its checkpoint"""
    return MODEL_SAVE_FOLDER / Path(name + '.ckpt')

def learn_curve_path(name : str):
    """Given a model `name`, return a path to its saved learning curves data"""
    return MODEL_SAVE_FOLDER / Path(name + '.csv') 

def save_state(name : str, trainer : "pl.Trainer", metrics : "MetricsCallback"):
    """Save the model as a checkpoint, and also its learning curves data, under the specified `name`."""
    trainer.save_checkpoint(checkpoint_path(name))
    
    df = pd.DataFrame(metrics_callback.metrics)
    df.to_csv(learn_curve_path(name), index=False)

def load_state(model_class : "pl.Module", name : str) -> Tuple["pl.Module", "pd.DataFrame"]:
    """Load the state named `name` for the model with class `model_class`.
    
    Returns
    -------
    instance : pl.Module
        Model instantiated from the checkpoint
    metrics : pd.DataFrame
        DataFrame with the learning curves from the loaded training process.
    """
    
    instance = model_class.load_from_checkpoint(checkpoint_path(name))
    
    metrics = pd.read_csv(learn_curve_path(name))
    
    return instance, metrics

In [43]:
bar = LitProgressBar()
metrics_callback = MetricsCallback()
early_stopping_callback = EarlyStopping(monitor='val_loss', patience=4, verbose=True)
reco_callback = ReconstructedImage(dataset = mnist.val_dataset, every_n_epochs=1, directory="features/autoencoder/enc2")
checkpoint_callback = ModelCheckpoint(monitor='val_loss', dirpath='SavedModels/cnn_autoencoder_first', filename='cnn_autoencoder-{epoch:02d}-{val_loss:.2f}', save_top_k=5, mode='min')

#Define training
trainer = pl.Trainer(gpus=1, max_epochs=50, callbacks=[checkpoint_callback, metrics_callback, reco_callback, bar, early_stopping_callback])

#Define model
cnn_autoencoder = ConvAutoEncoder(encoded_space_dim=2, learning_rate=1e-3, dropout=.4) #Start with a 2D encoding space, so that it can be easily plotted

#Train
#trainer.fit(cnn_autoencoder, mnist) 
# [UNCOMMENT] the previous line to repeat the training. Otherwise, the next cell will load a previous checkpoint.

# Note: samples of reconstructed images from the validation dataset
# are plotted in the folder "features/autoencoder/enc2" by the `reco_callback`.

#Save model & learning curve
#save_state("cnn_autoencoder_first", trainer, metrics_callback)


Checkpoint directory SavedModels/cnn_autoencoder_first exists and is not empty.



In [71]:
#Load saved model
cnn_autoencoder, metrics = load_state(ConvAutoEncoder, "cnn_autoencoder_first")

In [11]:
#---Plot train/val reconstruction error at each epoch---#
from plotting import plot_reconstruction_error

plot_reconstruction_error(metrics)

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

In [12]:
#---Explore the 2D representation space---#
from plotting import plot_2D_representation_space

n_samples = 500
images, labels = mnist.val_dataset[:n_samples]
encoded = cnn_autoencoder.encoder_cnn(images).data.numpy()

x, y = encoded[:, 0], encoded[:, 1]
images = images.squeeze(1).numpy()

with mpl2latex(True):
    fig = plt.figure(figsize=latex_figsize(wf=1., columnwidth=COLUMNWIDTH))
    plot_2D_representation_space(x, y, images, labels, fig=fig)
    
    fig.savefig("Plots/2d_encoded_space.pdf", transparent=True, bbox_inches='tight')
    
# NOTE: Move the mouse over the dots to see which digit is encoded there! 

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

In [13]:
plt.close() #Close the previous plot when finished inspecting, so that no RAM is wasted

In [72]:
encoded_batches = []
label_batches = []

#---Compute the latent representations of samples from the validation dataset---#
cnn_autoencoder.eval()
for (batch_x, y) in mnist.val_dataloader():
    encoded_batches.append(cnn_autoencoder(batch_x).data.numpy())
    label_batches.append(y.data.numpy())

encoded_batches = np.vstack(encoded_batches)
label_batches = np.hstack(label_batches)    

from sklearn.metrics import davies_bouldin_score #https://www.wikiwand.com/en/Davies%E2%80%93Bouldin_index

davies_bouldin_score(encoded_batches, label_batches)
#The "similarity" of two clusters is defined as the ratio between their "homogeneity" (average distance of points that belong to a cluster to the centroid of that cluster) and their "separation" (distance between the two centroids). 
#The Davies Bouldin score measures the average similarity of each cluster with its most similar cluster. If clusters are well separated, the score is lower, down to a minimum value of 0 for "perfect separation". 

1.9650963377201613

In [73]:
from scipy.spatial.distance import cdist, pdist

#---Compare the average distance between representations of the same digit, and representations of different digits---#
outer_distances = []
inner_distances = []
for digit in range(10):
    outer_distances.append(cdist(encoded_batches[label_batches == digit], encoded_batches[label_batches != digit]).mean())
    inner_distances.append(pdist(encoded_batches[label_batches == digit]).mean())
    
print(f"Outer: {np.mean(outer_distances):.3f}, Inner: {np.mean(inner_distances):.3f}") 

Outer: 17.934, Inner: 8.340


In [14]:
#Generate a sample based on position
x, y = [0, -5]
custom_encoded_value = torch.tensor([x, y]).float().unsqueeze(0)

cnn_autoencoder.eval()
with torch.no_grad():
    generated_img = cnn_autoencoder.decoder_cnn(custom_encoded_value)

plt.figure(figsize=(5, 5))
plt.imshow(generated_img.squeeze().cpu().numpy(), cmap='gist_gray')

plt.title(f"Image from ({x:.2f}, {y:.2f})")
plt.show()    

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

In [15]:
#Show a random reconstructed image from the validation set
reco_callback = ReconstructedImage(dataset=mnist.val_dataset, every_n_epochs=1, show=True, directory="features/autoencoder/enc2/samples")
reco_callback.on_train_epoch_end(trainer, cnn_autoencoder)

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

### 2.1.3 Hyperparameter optimization with Optuna

In [16]:
#---Hyperparameter Optimization with Optuna---#

def objective(trial: optuna.trial.Trial) -> float:
    optimizer = trial.suggest_categorical("optimizer", ["SGD", "Adam"])

    encoded_space_dim = trial.suggest_int("encoded_space_dim", 2, 50)

    learning_rate = trial.suggest_float("learning_rate", 1e-5, 1e-2, log=True)

    dropout_rate = trial.suggest_float("dropout_rate", 0.0, 1.0)

    batch_size = trial.suggest_categorical("batch_size", [64, 128, 256])

    #Convert to a dict 
    hyper_parameters = {
        'optimizer' : optimizer,
        'encoded_space_dim' : encoded_space_dim,
        'learning_rate' : learning_rate,
        'dropout_rate' : dropout_rate,
        'batch_size' : batch_size
    }

    mnist = MNISTDataModule(batch_size=batch_size)
    model = ConvAutoEncoder(hyper_parameters = hyper_parameters)

    bar = LitProgressBar()

    trainer = pl.Trainer(
        logger = True,
        limit_val_batches=1., #percentage of validation batches to be used
        checkpoint_callback=False, #Do not save models during hyperparams opt.
        max_epochs=10,
        gpus = 1 if torch.cuda.is_available() else None,
        callbacks = [PyTorchLightningPruningCallback(trial, monitor="val_loss"), bar]
    )

    trainer.logger.log_hyperparams(hyper_parameters)
    trainer.fit(model, datamodule=mnist)

    return trainer.callback_metrics["val_loss"].item() #Minimize "val_loss"

In [17]:
#---Execute Hyper Parameter Optimization---#

pruner = optuna.pruners.MedianPruner(n_startup_trials=5, n_warmup_steps=30) #Prune (=terminate a trial early) if the trial's best intermediate result is worse than the median of intermediate results of previous trials at the same step. It is used to avoid wasting time evaluating hyperparameter choices that are "really bad".

study = optuna.create_study(study_name="cnn_autoencoder", storage="sqlite:///cnn_autoencoder.db", direction="minimize", pruner=pruner, load_if_exists=True)

#[UNCOMMENT] this line if you want to execute the hyperparameters optimization (it can take a really long time)
#Otherwise, the next cell will load the results
#study.optimize(objective, n_trials=10, timeout=None) #timeout = stop after this many seconds (set to None to proceed without time limitation)

[32m[I 2021-06-21 00:49:33,935][0m Using an existing study with name 'cnn_autoencoder' instead of creating a new one.[0m


In [26]:
study = optuna.create_study(study_name="cnn_autoencoder", storage="sqlite:///cnn_autoencoder.db", direction="minimize", load_if_exists=True)

print("Number of finished trials: {}".format(len(study.trials)))

print("Best trial:")
best_trial = study.best_trial

print("  Value (val_loss): {}".format(best_trial.value))
print("  Params: ")
for key, value in best_trial.params.items():
    print("    {}: {}".format(key, value))

[32m[I 2021-06-21 22:53:41,118][0m Using an existing study with name 'cnn_autoencoder' instead of creating a new one.[0m


Number of finished trials: 100
Best trial:
  Value (val_loss): 0.010333129204809666
  Params: 
    batch_size: 128
    dropout_rate: 0.0011920463762665756
    encoded_space_dim: 33
    learning_rate: 0.005410311458704212
    optimizer: Adam


In [27]:
# from optuna.visualization import plot_optimization_history
from optuna.visualization import plot_parallel_coordinate

# #plot_optimization_history(study)
fig = plot_parallel_coordinate(study)
fig.write_image("Plots/hyperparameters.pdf")
#this may require plotly-orca to be installed with conda:
#conda install -c plotly plotly-orca

In [44]:
#---Train with best hyperparameters---#
best_hyperparameters = best_trial.params

bar = LitProgressBar()
metrics_callback = MetricsCallback()
early_stopping_callback = EarlyStopping(monitor='val_loss', patience=3, verbose=True)
reco_callback = ReconstructedImage(dataset = mnist.val_dataset, every_n_epochs=1, directory="features/autoencoder/best")
checkpoint_callback = ModelCheckpoint(monitor='val_loss', dirpath='SavedModels/experiments', filename='cnn_autoencoder-{epoch:02d}-{val_loss:.2f}', save_top_k=5, mode='min')

#Define training
trainer = pl.Trainer(gpus=1, max_epochs=100, callbacks=[checkpoint_callback, metrics_callback, reco_callback, bar, early_stopping_callback])

#Define data (with the correct batch_size)
mnist = MNISTDataModule(batch_size=best_hyperparameters['batch_size'])

#Define model
cnn_autoencoder = ConvAutoEncoder(hyper_parameters=best_hyperparameters) #Start with a 2D encoding space, so that it can be easily plotted

#Train
#trainer.fit(cnn_autoencoder, mnist)
# [UNCOMMENT] the previous line to repeat the training. Otherwise, the next cell will load a previous checkpoint.

# Note: samples of reconstructed images from the validation dataset
# are plotted in the folder "features/autoencoder/best" by the `reco_callback`.

#save_state("cnn_autoencoder_best", trainer, metrics_callback)

In [65]:
#--Load results from previous training--#
cnn_autoencoder, metrics = load_state(ConvAutoEncoder, "cnn_autoencoder_best")

In [23]:
#---Plot train/val reconstruction error at each epoch---#
from plotting import plot_reconstruction_error

with mpl2latex(True):
    fig, ax = plt.subplots(figsize=latex_figsize(wf=1, columnwidth=COLUMNWIDTH))

    plot_reconstruction_error(metrics, ax=ax)

    fig.savefig("Plots/best_hyperparams_learning_curve.pdf", transparent=True, bbox_inches='tight')

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

In [24]:
trainer.validate(model=cnn_autoencoder, datamodule=mnist)


The dataloader, val dataloader 0, does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` (try 16 which is the number of cpus on this machine) in the `DataLoader` init to improve performance.



--------------------------------------------------------------------------------
DATALOADER:0 VALIDATE RESULTS
{'val_loss': 0.008700749836862087}
--------------------------------------------------------------------------------


[{'val_loss': 0.008700749836862087}]

In [25]:
#Estimate val/test loss
trainer.test(model=cnn_autoencoder, datamodule=mnist)


The dataloader, test dataloader 0, does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` (try 16 which is the number of cpus on this machine) in the `DataLoader` init to improve performance.



Testing: 0it [00:00, ?it/s]

--------------------------------------------------------------------------------
DATALOADER:0 TEST RESULTS
{'test_loss': 0.00853609200567007, 'val_loss': 0.008700749836862087}
--------------------------------------------------------------------------------


[{'val_loss': 0.008700749836862087, 'test_loss': 0.00853609200567007}]

In [26]:
#---Plot reconstructed samples---#
import math
import matplotlib.gridspec as gridspec

n_samples = 9
MAX_PER_ROW = 3

n_rows = math.ceil(n_samples / MAX_PER_ROW)
n_cols = min(n_samples, MAX_PER_ROW)

cnn_autoencoder.eval()

with mpl2latex(True):
    fig = plt.figure(figsize=latex_figsize(wf=1., columnwidth=COLUMNWIDTH))

    ext_grid = gridspec.GridSpec(n_rows, n_cols, figure=fig)

    for place in ext_grid:
        in_grid = place.subgridspec(1, 2)

        ax_orig = fig.add_subplot(in_grid[0])
        ax_reco = fig.add_subplot(in_grid[1])

        ax_orig.axis('off')
        ax_reco.axis('off')

        x, _ = random.choice(mnist.val_dataset)
        
        with torch.no_grad():
            encoded = cnn_autoencoder.encoder_cnn(x.unsqueeze(0))
            reco = cnn_autoencoder.decoder_cnn(encoded)
            
        ax_orig.set_title('Orig.', fontsize=10)
        ax_reco.set_title('Reco.', fontsize=10)

        ax_orig.imshow(x.numpy().reshape(28, 28), cmap='gist_gray')
        ax_reco.imshow(reco.numpy().reshape(28, 28), cmap='gist_gray')

    fig.add_subplot(111, frameon=False)
    plt.tick_params(labelcolor='none', which='both', top=False, bottom=False, left=False, right=False)
    plt.suptitle("(Original, Reconstructed) samples")

    plt.tight_layout(w_pad=1.5, h_pad=3)
    fig.savefig("Plots/orig_reconstructed.pdf", transparent=True, bbox_inches='tight')

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

### 2.1.4 Latent Space Visualization

In [63]:
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE

encoded_batches = []
label_batches = []

#---Compute the latent representations of samples from the validation dataset---#
cnn_autoencoder.eval()
for (batch_x, y) in mnist.val_dataloader():
    encoded_batches.append(cnn_autoencoder.encoder_cnn(batch_x).data.numpy())
    label_batches.append(y.data.numpy())

encoded_batches = np.vstack(encoded_batches)
label_batches = np.hstack(label_batches)    

In [28]:
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
from matplotlib.cm import get_cmap
from matplotlib.lines import Line2D

#---PCA projection---#
pca = PCA(n_components=2)
projected_features = pca.fit_transform(encoded_batches)

print(f"Explained variance: {pca.explained_variance_ratio_}")

colors = get_cmap('tab10').colors
point_colors = np.array(colors)[label_batches]

with mpl2latex(True):
    fig, ax = plt.subplots(figsize=latex_figsize(1., columnwidth=COLUMNWIDTH))
    ax.scatter(projected_features[:, 0], projected_features[:, 1], c=point_colors, s=1)
    ax.set_title("PCA (Validation Dataset)") #Analysis on the autoencoder with best hyperparameters

    legend_elements = [Line2D([0], [0], marker='o', color='w', label=f'{i}',
                            markerfacecolor=colors[i]) for i in range(10)]
    ax.legend(handles=legend_elements, ncol=5, columnspacing=.5, handletextpad=-.1) #Save this image

    ax.patch.set_facecolor("white")
    
    ax.set_xlabel("var0")
    ax.set_ylabel("var1")
    
    fig.savefig("Plots/best_hyperparams_PCA.pdf", transparent=True, bbox_inches='tight')

Explained variance: [0.20324522 0.11365472]


Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

In [29]:
from sklearn.metrics import davies_bouldin_score #https://www.wikiwand.com/en/Davies%E2%80%93Bouldin_index

davies_bouldin_score(encoded_batches, label_batches)
#The "similarity" of two clusters is defined as the ratio between their "homogeneity" (average distance of points that belong to a cluster to the centroid of that cluster) and their "separation" (distance between the two centroids). 
#The Davies Bouldin score measures the average similarity of each cluster with its most similar cluster. If clusters are well separated, the score is lower, down to a minimum value of 0 for "perfect separation". 

2.764029853970084

In [30]:
davies_bouldin_score(projected_features, label_batches) #Note that in the projected space points are not separated very well!

8.243625314312094

In [31]:
#---t-SNE---#
tsne_projected = TSNE(n_components=2).fit_transform(encoded_batches) #Warning: this computation takes a bit of time!

davies_bouldin_score(tsne_projected, label_batches)

0.9393163735679441

In [32]:
#---Plot t-SNE projection---#
from matplotlib.cm import get_cmap
from matplotlib.lines import Line2D

colors = get_cmap('tab10').colors
point_colors = np.array(colors)[label_batches]

with mpl2latex(True):
    fig, ax = plt.subplots(figsize=latex_figsize(1., columnwidth=COLUMNWIDTH))
    ax.scatter(tsne_projected[:, 0], projected_features[:, 1], c=point_colors, s=1)
    ax.set_title("t-SNE (Validation Dataset)") 
    
    ax.set_xlabel("var0")
    ax.set_ylabel("var1")

    legend_elements = [Line2D([0], [0], marker='o', color='w', label=f'{i}',
                            markerfacecolor=colors[i]) for i in range(10)]
    ax.legend(handles=legend_elements, ncol=5, columnspacing=.5, handletextpad=-.1)
    ax.patch.set_facecolor("white")

    fig.savefig("Plots/best_hyperparams_tSNE.pdf", transparent=True, bbox_inches='tight')

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

# 2.2 Denoising Convolutional AutoEncoder

In [29]:
#---Noise types---#
from data import AddGaussianNoise, AddSaltPepperNoise, AddSpeckleNoise

x_clean, y = mnist.train_dataset[10]

x_gaussian   = AddGaussianNoise(0.3, .05)(x_clean)
x_saltpepper = AddSaltPepperNoise(salt_vs_pepper=.5, amount=.05)(x_clean)
x_speckle    = AddSpeckleNoise(0., .5)(x_clean)

all_noises = torchvision.transforms.Compose([
    AddSpeckleNoise(),
    AddSaltPepperNoise(salt_vs_pepper=.5, amount=.1),
    AddGaussianNoise(.5, .3)
])

x_all_noise = all_noises(x_clean)

with mpl2latex(True):
    fig, axes = plt.subplots(nrows=1, ncols=5, figsize=latex_figsize(wf=1., hf=.28, columnwidth=COLUMNWIDTH))
    plt.subplots_adjust(wspace=.05)

    tensors = [x_clean, x_gaussian, x_saltpepper, x_speckle, x_all_noise]
    labels  = ["Clean", "+ Gauss", "+ S\&P", "+ Speckle", "+ All"]

    for i, ax in enumerate(axes):
        ax.imshow(tensors[i].squeeze(0).data.numpy(), cmap='gist_gray')
        ax.axis('off')
        ax.set_title(f"{labels[i]}")
        ax.set_xticklabels([])
        ax.set_yticklabels([])
        
        ax.patch.set_facecolor("white")
    
    fig.add_subplot(111, frameon=False)
    plt.tick_params(labelcolor='none', which='both', top=False, bottom=False, left=False, right=False)
    plt.title("Noise Types")
    fig.savefig("Plots/noise_types.pdf", transparent=True, bbox_inches='tight')

AttributeError: 'MNISTDataModule' object has no attribute 'train_dataset'

In [30]:
#---Create noisy dataset---#
from data import MNISTNoisy

mnist_noisy = MNISTNoisy(noise_transform=all_noises, batch_size=best_hyperparameters['batch_size'])
mnist_noisy.setup()

In [35]:
class DenoisingAutoEncoder(pl.LightningModule):
    """Denoising Convolutional AutoEncoder for MNIST"""
    
    #x_out = floor([x_in + 2 * x_pad - (x_ker - 1) - 1] / x_stride + 1)

    def __init__(self, hyper_parameters : dict = None, *args, **kwargs):
        super().__init__()

        if hyper_parameters is None:
            self.hyper_parameters = { #Default values
                'optimizer' : 'Adam',
                'encoded_space_dim' : 10,
                'learning_rate' : 1e-3,
                'dropout_rate' : .1
            }
            self.hyper_parameters.update(**kwargs)
        else:
            self.hyper_parameters = hyper_parameters

        self.save_hyperparameters()

        self.autoencoder = ConvAutoEncoder(hyper_parameters=hyper_parameters) #Copy the same architecture used for the initial AutoEncoder

        self.dropout_rate = self.hyper_parameters['dropout_rate']
        self.encoded_space_dim = self.hyper_parameters['encoded_space_dim']

        self.encoder_cnn = self.autoencoder.encoder_cnn
        self.decoder_cnn = self.autoencoder.decoder_cnn
    
    def forward(self, x : "torch.tensor"):
        embedding = self.encoder_cnn(x)

        return embedding
    
    def training_step(self, batch, batch_idx, log_name='train_loss', prog_bar=False):
        x_clean, x_noisy, _ = batch #ignore labels

        internal_repr = self.encoder_cnn(x_noisy)
        x_hat = self.decoder_cnn(internal_repr)

        loss = F.mse_loss(x_hat, x_clean)
        self.log(log_name, loss, prog_bar=prog_bar)

        return loss

    def configure_optimizers(self):
        optimizer = getattr(optim, self.hyper_parameters['optimizer'])(self.parameters(), lr=self.hyper_parameters['learning_rate'])
        return optimizer

    def validation_step(self, batch, batch_idx, log_name='val_loss', prog_bar=True):
        return self.training_step(batch, batch_idx, log_name=log_name, prog_bar=prog_bar)

    def test_step(self, batch, batch_idx, log_name='test_loss'):
        
        return self.validation_step(batch, batch_idx, log_name=log_name, prog_bar=True)

In [32]:
metrics_callback = MetricsCallback()
reco_callback = ReconstructedImage(dataset = mnist_noisy.val_dataset, every_n_epochs=1, directory = "features/denoising/", pick_noisy=True)
bar = LitProgressBar()
checkpoint_callback = ModelCheckpoint(monitor='val_loss', dirpath='SavedModels/denoising', filename='denoising_autoencoder-{epoch:02d}-{val_loss:.2f}', save_top_k=5, mode='min')
early_stopping_callback = EarlyStopping(monitor='val_loss', patience=4, verbose=True)

trainer = pl.Trainer(gpus=1, max_epochs=50, callbacks=[metrics_callback, reco_callback, bar, checkpoint_callback, early_stopping_callback]) #reco_callback (show noisy image and reconstructed)

denoise_autoencoder = DenoisingAutoEncoder(hyper_parameters=best_hyperparameters) #encoded_space_dim

#trainer.fit(denoise_autoencoder, mnist_noisy) 
# [UNCOMMENT] the previous line to repeat the training. Otherwise, the next cell will load a previous checkpoint.

# Note: samples of reconstructed images from the validation dataset
# are plotted in the folder "features/denoising" by the `reco_callback`.

#Save model
#save_state("cnn_denoiser", trainer, metrics_callback)


Checkpoint directory SavedModels/denoising exists and is not empty.



In [36]:
cnn_denoiser, metrics = load_state(DenoisingAutoEncoder, "cnn_denoiser")

In [37]:
trainer.test(cnn_denoiser, mnist_noisy.test_dataloader())

Testing: 0it [00:00, ?it/s]

--------------------------------------------------------------------------------
DATALOADER:0 TEST RESULTS
{'test_loss': 0.02999124675989151}
--------------------------------------------------------------------------------


[{'test_loss': 0.02999124675989151}]

In [38]:
#---Plot denoised samples---#
import math
import matplotlib.gridspec as gridspec

n_samples = 9
MAX_PER_ROW = 3

n_rows = math.ceil(n_samples / MAX_PER_ROW)
n_cols = min(n_samples, MAX_PER_ROW)

cnn_denoiser.eval()

with mpl2latex(True):
    fig = plt.figure(figsize=latex_figsize(wf=1., columnwidth=COLUMNWIDTH))

    ext_grid = gridspec.GridSpec(n_rows, n_cols, figure=fig)

    for place in ext_grid:
        in_grid = place.subgridspec(1, 2)

        ax_noisy = fig.add_subplot(in_grid[0])
        ax_denoisy = fig.add_subplot(in_grid[1])

        ax_noisy.axis('off')
        ax_denoisy.axis('off')

        x, x_noisy, y = random.choice(mnist_noisy.val_dataset)
        
        with torch.no_grad():
            encoded = cnn_denoiser.encoder_cnn(x_noisy.unsqueeze(0))
            reco = cnn_denoiser.decoder_cnn(encoded)

        ax_noisy.set_title(f'"{y}"')

        ax_noisy.imshow(x_noisy.numpy().reshape(28, 28), cmap='gist_gray')
        ax_denoisy.imshow(reco.numpy().reshape(28, 28), cmap='gist_gray')

    fig.add_subplot(111, frameon=False)
    plt.tick_params(labelcolor='none', which='both', top=False, bottom=False, left=False, right=False)
    plt.suptitle("(Noisy, Denoised) samples")

    plt.tight_layout(w_pad=1.5, h_pad=3)
    fig.savefig("Plots/noisy_denoised.pdf", transparent=True, bbox_inches='tight')

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

In [39]:
from sklearn.metrics import davies_bouldin_score #https://www.wikiwand.com/en/Davies%E2%80%93Bouldin_index
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE

encoded_batches = []
label_batches = []

#---Compute the latent representations of samples from the validation dataset---#
cnn_denoiser.eval()
for (batch_x, y) in mnist.val_dataloader():
    encoded_batches.append(cnn_denoiser.encoder_cnn(batch_x).data.numpy())
    label_batches.append(y.data.numpy())

encoded_batches = np.vstack(encoded_batches)
label_batches = np.hstack(label_batches)    

davies_bouldin_score(encoded_batches, label_batches)
#The "similarity" of two clusters is defined as the ratio between their "homogeneity" (average distance of points that belong to a cluster to the centroid of that cluster) and their "separation" (distance between the two centroids). 
#The Davies Bouldin score measures the average similarity of each cluster with its most similar cluster. If clusters are well separated, the score is lower, down to a minimum value of 0 for "perfect separation". 

3.052746754689131

## 2.3 Supervised FineTuning

In [8]:
#5. Fine-tune the (convolutional) autoencoder using a supervised classification task 
#You can compare the classification accuracy and learning speed with results achieved in homework 1.

import torchmetrics

class SupervisedFineTuner(pl.LightningModule):
    """Denoising Convolutional AutoEncoder for MNIST"""
    
    #x_out = floor([x_in + 2 * x_pad - (x_ker - 1) - 1] / x_stride + 1)

    def __init__(self):
        super().__init__()

        self.autoencoder = ConvAutoEncoder.load_from_checkpoint(checkpoint_path("cnn_autoencoder_best"))
        self.autoencoder.freeze() #Do not train this part
        
        self.encoder = self.autoencoder.encoder_cnn
        
        self.encoded_space_dim = self.autoencoder.encoded_space_dim

        self.supervised_segment = nn.Sequential(
            nn.Linear(self.encoded_space_dim, 128),
            nn.ReLU(),
            #nn.Dropout(.1),
            nn.Linear(128, 10),
            #nn.ReLU(),
            #nn.Dropout(.1),
            #nn.Linear(64, 10)
        )

        #Accuracy metric
        self.metric = torchmetrics.Accuracy()
    
    def forward(self, x : "torch.tensor"):
        x = self.encoder(x)
        x = self.supervised_segment(x)

        return x
    
    def training_step(self, batch, batch_idx, log_name='train'):
        x, y = batch #ignore labels

        out = self.forward(x)
        probas = F.softmax(out)

        loss = F.cross_entropy(out, y)
        acc  = self.metric(probas, y)

        self.log(log_name + '_loss', loss)
        self.log(log_name + '_acc', acc, prog_bar=True)

        return loss

    def configure_optimizers(self):
        optimizer = optim.Adam(self.parameters(), lr=5e-3)

        return optimizer

    def validation_step(self, batch, batch_idx, log_name='val'):

        return self.training_step(batch, batch_idx, log_name=log_name)

    def test_step(self, batch, batch_idx, log_name='test'):
        
        return self.validation_step(batch, batch_idx, log_name=log_name)

In [11]:
from torchsummaryX import summary

metrics_callback = MetricsCallback()
early_stopping_callback = EarlyStopping(monitor='val_loss', patience=10, verbose=True)

trainer = pl.Trainer(gpus=1, max_epochs=100, callbacks=[metrics_callback, early_stopping_callback]) #reco_callback (show noisy image and reconstructed)

supervised_fine_tuned = SupervisedFineTuner()

supervised_summary = summary(supervised_fine_tuned, torch.zeros((1, 1, 28, 28)))
#Note that only (relatively) few parameters are trainable

             Kernel Shape    Output Shape  Params Mult-Adds
Layer                                                      
0_metric     [1, 8, 3, 3]  [1, 8, 14, 14]       -         -
1_metric     [1, 8, 3, 3]  [1, 8, 14, 14]       -   14.112k
2_metric                -  [1, 8, 14, 14]       -         -
3_metric                -  [1, 8, 14, 14]       -         -
4_metric                -  [1, 8, 14, 14]       -         -
5_metric                -  [1, 8, 14, 14]       -         -
6_metric    [8, 16, 3, 3]   [1, 16, 7, 7]       -         -
7_metric    [8, 16, 3, 3]   [1, 16, 7, 7]       -   56.448k
8_metric                -   [1, 16, 7, 7]       -         -
9_metric                -   [1, 16, 7, 7]       -         -
10_metric               -   [1, 16, 7, 7]       -         -
11_metric               -   [1, 16, 7, 7]       -         -
12_metric  [16, 32, 3, 3]   [1, 32, 3, 3]       -         -
13_metric  [16, 32, 3, 3]   [1, 32, 3, 3]       -   41.472k
14_metric               -   [1, 32, 3, 3

In [12]:
#trainer.fit(supervised_fine_tuned, mnist)
# [UNCOMMENT] the previous line to repeat the training. Otherwise, the next cell will load a previous checkpoint.

#Save model & learning curve
#save_state("cnn_autoencoder_supervised", trainer, metrics_callback)

In [13]:
supervised_fine_tuned, metrics = load_state(SupervisedFineTuner, "cnn_autoencoder_supervised")

In [14]:
trainer.test(model=supervised_fine_tuned, test_dataloaders=mnist.train_dataloader()) #Train error



Testing: 0it [00:00, ?it/s]

  probas = F.softmax(out)


--------------------------------------------------------------------------------
DATALOADER:0 TEST RESULTS
{'test_acc': 0.9887708425521851, 'test_loss': 0.03757133707404137}
--------------------------------------------------------------------------------


[{'test_acc': 0.9887708425521851, 'test_loss': 0.03757133707404137}]

In [46]:
trainer.test(model=supervised_fine_tuned, test_dataloaders=mnist.val_dataloader()) #Validation error

Testing: 0it [00:00, ?it/s]


Implicit dimension choice for softmax has been deprecated. Change the call to include dim=X as an argument.



--------------------------------------------------------------------------------
DATALOADER:0 TEST RESULTS
{'test_acc': 0.9889166951179504, 'test_loss': 0.035658761858940125}
--------------------------------------------------------------------------------


[{'test_acc': 0.9889166951179504, 'test_loss': 0.035658761858940125}]

In [47]:
trainer.test(model=supervised_fine_tuned, test_dataloaders=mnist.test_dataloader()) #Test error

Testing: 0it [00:00, ?it/s]


Implicit dimension choice for softmax has been deprecated. Change the call to include dim=X as an argument.



--------------------------------------------------------------------------------
DATALOADER:0 TEST RESULTS
{'test_acc': 0.979200005531311, 'test_loss': 0.08013071119785309}
--------------------------------------------------------------------------------


[{'test_acc': 0.979200005531311, 'test_loss': 0.08013071119785309}]

In [15]:
from matplotlib import ticker

#---Plot learning curves---#
with mpl2latex(True):
    
    fig, ax = plt.subplots(figsize=latex_figsize(wf=1., columnwidth=COLUMNWIDTH))
    
    ax.plot(metrics["train_loss"], label="train", c='k')
    ax.plot(metrics["val_loss"], label="val", c='r')
    
    ax.set_yscale('log')
    
    ax.set_title("Learning curves - Autoencoder + Supervised")
    ax.set_ylabel("Loss")
    ax.set_xlabel("Epoch")
    ax.legend()

    formatter = ticker.ScalarFormatter(useMathText=True) #Put the multiplier (e.g. *1e-2) at the top left side, above the plot, making everything more "compact"
    formatter.set_scientific(True) 
    formatter.set_powerlimits((-1,1)) 

    ax.yaxis.set_major_formatter(formatter)

    ax.patch.set_facecolor('white')
    plt.tight_layout()
    
    fig.savefig("Plots/learning_curves_supervised.pdf", transparent=True, bbox_inches='tight')

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

## 2.4 Variational Autoencoder

In [66]:
class VariationalAutoEncoder(pl.LightningModule):
    def __init__(self, encoded_space_dim : int = 10):
        super().__init__()

        self.encoded_space_dim = encoded_space_dim

        self.save_hyperparameters()

        self.encoder_cnn = nn.Sequential(
            nn.Conv2d(1, 8, kernel_size=3, stride=2, padding=1), #out = (8, 14, 14)
            nn.ReLU(),
            nn.Conv2d(8, 16, kernel_size=3, stride=2, padding=1), #out = (16, 7, 7)
            nn.ReLU(),
            nn.Conv2d(16, 32, kernel_size=3, stride=2, padding=0), #out = (32, 3, 3)
            nn.ReLU(),
            nn.Flatten(start_dim=1)
        )

        self.decoder_cnn = nn.Sequential( #Specular
            nn.Linear(self.encoded_space_dim, 64),
            nn.ReLU(),
            nn.Linear(64, 3 * 3 * 32),
            nn.ReLU(),
            nn.Unflatten(dim=1, unflattened_size=(32, 3, 3)),
            nn.ConvTranspose2d(32, 16, kernel_size=3, stride=2, output_padding=0),
            nn.ReLU(),
            nn.ConvTranspose2d(16, 8, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(8, 1, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.Sigmoid() #-> [0,1] output
        )

        self.avg = nn.Sequential( #Predicts the means of a multi-variate gaussian
            nn.Linear(3 * 3 * 32, 64),
            nn.ReLU(),
            nn.Linear(64, self.encoded_space_dim)
        )

        self.log_var = nn.Sequential( #Predicts the (log) variances of a multi-variate gaussian (diagonal of the covariance matrix)
            nn.Linear(3 * 3 * 32, 64),
            nn.ReLU(),
            nn.Linear(64, self.encoded_space_dim)
        )

    @staticmethod
    def _variational_loss(x_hat, x_true, mu, log_var):
        """
        Loss for a variational autoencoder, from https://arxiv.org/pdf/1312.6114.pdf
        """

        reco_loss = F.mse_loss(x_hat, x_true, reduction='sum') #Both losses should be aggregated in the same way over a batch
        #otherwise, if one is averaged and the other summed, they will be of different magnitude, and one will be minimized preferentially
        #making convergence difficult
     
        kl_div    = -0.5 * torch.sum(1. + log_var - mu**2 - torch.exp(log_var))

        return reco_loss + kl_div

    @staticmethod
    def _sample(mu, log_var):
        """
        Generate samples from a multi-variate Normal distribution with diagonal covariance matrix, given a vector of means `mu` and 
        the vector of log(variances) `log_var`.
        """

        sigma = torch.exp(0.5 * log_var) #log_var is log(variance) = log(sigma**2) = 2 * log(sigma)

        return mu + torch.randn_like(mu) * sigma
        #Var(aX) = a**2 Var(X), so we need to multiply pred_sqrt_var = sigma (square root of the variance) by the standard normal distribution

    def forward(self, x):
        internal_repr = self.encoder_cnn(x)

        pred_means   = self.avg(internal_repr)
        pred_log_var = self.log_var(internal_repr) 

        #Produce a sample
        sample = self._sample(mu=pred_means, log_var=pred_log_var)
        
        return sample, pred_means, pred_log_var

    def training_step(self, batch, batch_idx, log_name='train', prog_bar=False):
        x, _ = batch
        sample, pred_means, pred_log_var = self.forward(x)

        reconstructed = self.decoder_cnn(sample)
        loss = self._variational_loss(x_hat=reconstructed, x_true=x, mu=pred_means, log_var=pred_log_var)

        self.log(log_name + '_loss', loss, prog_bar=prog_bar)
        
        return loss

    def configure_optimizers(self):
        optimizer = optim.Adam(self.parameters(), lr=1e-3)

        return optimizer

    def validation_step(self, batch, batch_idx, log_name='val'):

        return self.training_step(batch, batch_idx, log_name=log_name, prog_bar=True)

    def test_step(self, batch, batch_idx, log_name='test'):
        
        return self.training_step(batch, batch_idx, log_name=log_name, prog_bar=True)

In [76]:
metrics_callback = MetricsCallback()
bar = LitProgressBar()
early_stopping_callback = EarlyStopping(monitor='val_loss', patience=10, verbose=True)

trainer = pl.Trainer(gpus=1, max_epochs=100, callbacks=[metrics_callback, bar, early_stopping_callback]) #reco_callback (show noisy image and reconstructed)

variational_autoencoder = VariationalAutoEncoder(encoded_space_dim=33) #2D latent space, so that it can be easily visualized

trainer.fit(variational_autoencoder, mnist) 
# [UNCOMMENT] the previous line to repeat the training. Otherwise, the next cell will load a previous checkpoint.

save_state("vae_autoencoder", trainer, metrics_callback)

Validation sanity check: 0it [00:00, ?it/s]


The dataloader, val dataloader 0, does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` (try 16 which is the number of cpus on this machine) in the `DataLoader` init to improve performance.


The dataloader, train dataloader, does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` (try 16 which is the number of cpus on this machine) in the `DataLoader` init to improve performance.



Training: 0it [00:00, ?it/s]

In [74]:
variational_autoencoder, metrics = load_state(VariationalAutoEncoder, "vae_autoencoder")

In [77]:
trainer.test(variational_autoencoder, mnist.test_dataloader())

Testing: 0it [00:00, ?it/s]

--------------------------------------------------------------------------------
DATALOADER:0 TEST RESULTS
{'test_loss': 1788.4122314453125}
--------------------------------------------------------------------------------


[{'test_loss': 1788.4122314453125}]

In [78]:
#---Plot reconstructed samples---#
import math
import matplotlib.gridspec as gridspec

n_samples = 9
MAX_PER_ROW = 3

n_rows = math.ceil(n_samples / MAX_PER_ROW)
n_cols = min(n_samples, MAX_PER_ROW)

variational_autoencoder.eval()

with mpl2latex(True):
    fig = plt.figure(figsize=latex_figsize(wf=1., columnwidth=COLUMNWIDTH))

    ext_grid = gridspec.GridSpec(n_rows, n_cols, figure=fig)

    for place in ext_grid:
        in_grid = place.subgridspec(1, 2)

        ax_orig = fig.add_subplot(in_grid[0])
        ax_reco = fig.add_subplot(in_grid[1])

        ax_orig.axis('off')
        ax_reco.axis('off')

        x, _ = random.choice(mnist.val_dataset)
        
        with torch.no_grad():
            encoded, _, _ = variational_autoencoder(x.unsqueeze(0))
            reco = variational_autoencoder.decoder_cnn(encoded)
            
        ax_orig.set_title('Orig.', fontsize=10)
        ax_reco.set_title('Reco.', fontsize=10)

        ax_orig.imshow(x.numpy().reshape(28, 28), cmap='gist_gray')
        ax_reco.imshow(reco.numpy().reshape(28, 28), cmap='gist_gray')

    fig.add_subplot(111, frameon=False)
    plt.tick_params(labelcolor='none', which='both', top=False, bottom=False, left=False, right=False)
    plt.suptitle("(Original, Reconstructed) samples")

    plt.tight_layout(w_pad=1.5, h_pad=3)
    fig.savefig("Plots/orig_reconstructed_vae.pdf", transparent=True, bbox_inches='tight')

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

In [103]:
variational_autoencoder.eval()

start_img, y1 = mnist.val_dataset[103]
end_img, y2 = mnist.val_dataset[18]

assert y1 != y2, "Please change the indices for start_img and end_img"

with torch.no_grad():
    start_pos = variational_autoencoder.avg(variational_autoencoder.encoder_cnn(start_img.unsqueeze(0))).data.numpy().flatten()
    end_pos = variational_autoencoder.avg(variational_autoencoder.encoder_cnn(end_img.unsqueeze(0))).data.numpy().flatten()
    
steps = 2 + 5
line = np.linspace(0, 1, 7)
points = [start_pos + val * (end_pos - start_pos) for val in line]

with mpl2latex(True):
    fig, axs = plt.subplots(1, steps, figsize=latex_figsize(wf=1., hf=.2, columnwidth=COLUMNWIDTH))
    
    for i, (ax, point) in enumerate(zip(axs.ravel(), points)):
        with torch.no_grad():
            img = variational_autoencoder.decoder_cnn(torch.tensor(point).unsqueeze(0))
        
        if i == 0:
            ax.set_title(f"Start: {y1}")
        if i == steps-1:
            ax.set_title(f"End: {y2}")
            
        ax.imshow(img.squeeze().numpy(), cmap='gist_gray')
        ax.axis('off')
        
    fig.add_subplot(111, frameon=False)
    plt.tick_params(labelcolor='none', which='both', top=False, bottom=False, left=False, right=False)
    plt.suptitle('"Transition" between two samples')

    plt.tight_layout()
    fig.savefig("Plots/vae_transition.pdf", transparent=True, bbox_inches='tight')
    

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

In [69]:
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE

encoded_batches = []
label_batches = []

#---Compute the latent representations of samples from the validation dataset---#
variational_autoencoder.eval()
for (batch_x, y) in mnist.val_dataloader():
    encoded_batches.append(variational_autoencoder.avg(variational_autoencoder.encoder_cnn(batch_x)).data.numpy())
    label_batches.append(y.data.numpy())

encoded_batches = np.vstack(encoded_batches)
label_batches = np.hstack(label_batches)    

In [67]:
from sklearn.metrics import davies_bouldin_score #https://www.wikiwand.com/en/Davies%E2%80%93Bouldin_index

davies_bouldin_score(encoded_batches, label_batches)
#The "similarity" of two clusters is defined as the ratio between their "homogeneity" (average distance of points that belong to a cluster to the centroid of that cluster) and their "separation" (distance between the two centroids). 
#The Davies Bouldin score measures the average similarity of each cluster with its most similar cluster. If clusters are well separated, the score is lower, down to a minimum value of 0 for "perfect separation". 

#It is higher here, since the clusters are "less dense" (as it can be seen from the 2d plot). 

6.3851832225609

In [70]:
from scipy.spatial.distance import cdist, pdist

#---Compare the average distance between representations of the same digit, and representations of different digits---#
outer_distances = []
inner_distances = []
for digit in range(10):
    outer_distances.append(cdist(encoded_batches[label_batches == digit], encoded_batches[label_batches != digit]).mean())
    inner_distances.append(pdist(encoded_batches[label_batches == digit]).mean())
    
print(f"Outer: {np.mean(outer_distances):.3f}, Inner: {np.mean(inner_distances):.3f}")

Outer: 1.836, Inner: 1.051


In [68]:
#---Plot 2D representation space---#
variational_autoencoder, metrics = load_state(VariationalAutoEncoder, "vae_autoencoder2d") 
#load the vae trained with encoded_space_dim=2

from plotting import plot_2D_representation_space

n_samples = 500
variational_autoencoder.eval()
images, labels = mnist.val_dataset[:n_samples]
encoded = variational_autoencoder.avg(variational_autoencoder.encoder_cnn(images)).data.numpy()

x, y = encoded[:, 0], encoded[:, 1]
images = images.squeeze(1).numpy()

with mpl2latex(True):
    fig = plt.figure(figsize=latex_figsize(wf=1., columnwidth=COLUMNWIDTH))
    plot_2D_representation_space(x, y, images, labels, fig=fig)
    
    fig.savefig("Plots/2d_encoded_space_vae.pdf", transparent=True, bbox_inches='tight')

# NOTE: Move your mouse on the dots to see which image is encoded at that position!

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

In [63]:
plt.close() #Close plot to not waste RAM