# Анализ модели вариационного автокодировщика

## Предварительная работа

### Библиотеки

In [1]:
import io
import math
import requests
import torch

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

from copy                    import deepcopy
from matplotlib.image        import imread
from mpl_toolkits            import mplot3d
from matplotlib              import gridspec
from nerus                   import load_nerus
from nltk.tokenize           import RegexpTokenizer
from skimage.segmentation    import mark_boundaries
from sklearn.metrics         import classification_report
from sklearn.model_selection import ParameterGrid
from torch.utils             import data
from torch.utils.tensorboard import SummaryWriter
from torchvision             import datasets, transforms
from tqdm.autonotebook       import tqdm
from PIL                     import Image
from urllib.request          import urlopen


  from tqdm.autonotebook       import tqdm


In [2]:
import warnings
warnings.filterwarnings("ignore")


### Установка вычислительного устройства

In [3]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device


device(type='cuda')

## Обучение

In [4]:
def train_on_batch(model, x_batch, y_batch, optimizer, loss_function):
    model.train()
    optimizer.zero_grad()

    loss = model.loss(x_batch.to(model.device), y_batch.to(model.device))
    loss.backward()

    optimizer.step()
    return loss.cpu().item()


In [5]:
def train_epoch(train_generator,
                model,
                loss_function,
                optimizer,
                callback = None):

    epoch_loss = 0
    total = 0
    for it, (batch_of_x, batch_of_y) in enumerate(train_generator):
        batch_loss = train_on_batch(model,
                                    batch_of_x,
                                    batch_of_y,
                                    optimizer,
                                    loss_function)
        if callback is not None:
            with torch.no_grad():
                callback(model, batch_loss)

        epoch_loss += batch_loss*len(batch_of_x)
        total += len(batch_of_x)

    return epoch_loss/total


In [6]:
def trainer(count_of_epoch,
            batch_size,
            model,
            dataset,
            loss_function,
            optimizer,
            lr = 0.001,
            callback = None):
    iterations = tqdm(range(count_of_epoch), desc='epoch')
    iterations.set_postfix({'train epoch loss': np.nan})

    n_samples = len(dataset)
    number_of_batch = n_samples//batch_size + (n_samples%batch_size>0)

    for it in iterations:
        batch_generator = tqdm(
            torch.utils.data.DataLoader(dataset = dataset, batch_size = batch_size, shuffle=True),
            leave=False, total=number_of_batch)

        epoch_loss = train_epoch(
            train_generator = batch_generator,
            model = model,
            loss_function = loss_function,
            optimizer = optimizer,
            callback=callback)

        iterations.set_postfix({'train epoch loss': epoch_loss})


## Отслеживание обучения модели

In [2]:
%load_ext tensorboard
%tensorboard --logdir experiment/


The tensorboard extension is already loaded. To reload it, use:
  %reload_ext tensorboard


Reusing TensorBoard on port 6008 (pid 18029), started 0:00:02 ago. (Use '!kill 18029' to kill it.)

In [8]:
class callback():
    def __init__(self, writer, dataset,
                 loss_function, delimeter = 100, batch_size=64):
        self.step = 0
        self.writer = writer
        self.delimeter = delimeter
        self.loss_function = loss_function
        self.batch_size = batch_size

        self.dataset = dataset

    def forward(self, model, loss):
        self.step += 1
        self.writer.add_scalar('LOSS/train', loss, self.step)

        if self.step % self.delimeter == 0:

            batch_generator = torch.utils.data.DataLoader(dataset = self.dataset,
                                                          batch_size = self.batch_size)

            test_loss = 0
            model.eval()
            for it, (x_batch, y_batch) in enumerate(batch_generator):
                x_batch = x_batch.to(model.device)

                test_loss += model.loss(x_batch, y_batch).cpu().item()*len(x_batch)

            test_loss /= len(self.dataset)

            self.writer.add_scalar('LOSS/test', test_loss, self.step)

    def __call__(self, model, loss):
        return self.forward(model, loss)


## Модель вариационного автокодировщика 

In [9]:
class VAE(torch.nn.Module):
    @property
    def device(self):
        return next(self.parameters()).device

    def __init__(self, latent_dim, input_dim, num_layers, hidden_dim=200):
        """
        Standart model of VAE with ELBO optimization.
        Args:
            latent_dim: int - the dimension of latent space.
            input_dim: int - the dimension of input space.
            hidden_dim: int - the size of hidden_dim neural layer.
        Returns:
            None
        Example:
            >>> model = VAE(2, 10)
        """
        super(VAE, self).__init__()
        self.latent_dim = latent_dim
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers

        self.proposal_z = torch.nn.Sequential(
            torch.nn.Linear(self.input_dim, hidden_dim),
            torch.nn.LeakyReLU(),
        )

        self.proposal_mu = torch.nn.Linear(hidden_dim, self.latent_dim)
        self.proposal_sigma = torch.nn.Linear(hidden_dim, self.latent_dim)

        self.generative_layear = torch.nn.Sequential(
            torch.nn.BatchNorm1d(self.latent_dim),
            torch.nn.Linear(self.latent_dim, num_layers * hidden_dim),
            torch.nn.LeakyReLU(),
        )

        if num_layers > 1:
            self.middle_layer = torch.nn.Sequential()

            for i in range(num_layers - 1):
                self.middle_layer.add_module("Norm" + str(i), torch.nn.BatchNorm1d((num_layers - i) * hidden_dim))
                self.middle_layer.add_module("Linear" + str(i),
                                             torch.nn.Linear((num_layers - i    ) * hidden_dim,
                                                             (num_layers - i - 1) * hidden_dim))
                self.middle_layer.add_module("ReLu" + str(i),
                                             torch.nn.ReLU())

        self.output_layer = torch.nn.Sequential(
            torch.nn.BatchNorm1d(hidden_dim),
            torch.nn.Linear(hidden_dim, self.input_dim),
            torch.nn.LeakyReLU()
        )


    def q_z(self, x):
        """
        Generates distribution of z provided x.
        Args:
            x: Tensor - the matrix of shape batch_size x input_dim.
        Returns:
            tuple(Tensor, Tensor) - the normal distribution parameters.
            mu: Tensor - the matrix of shape batch_size x latent_dim.
            sigma: Tensor - the matrix of shape batch_size x latent_dim.
        Example:
            >>>
        """
        x = x.to(self.device)

        proposal = self.proposal_z(x)
        mu = self.proposal_mu(proposal)
        sigma = torch.nn.Softplus()(self.proposal_sigma(proposal))
        return mu, sigma

    def p_z(self, num_samples):
        """
        Generetes prior distribution of z.
        Args:
            num_samples: int - the number of samples.
        Returns:
            tuple(Tensor, Tensor) - the normal distribution parameters.
                mu: Tensor - the matrix of shape num_samples x latent_dim.
            	sigma: Tensor - the matrix of shape num_samples x latent_dim.
        Example:
            >>>
        """
        mu = torch.zeros([num_samples, self.latent_dim], device=self.device)
        sigma = torch.ones([num_samples, self.latent_dim], device=self.device)
        return mu, sigma

    def sample_z(self, distr, num_samples=1):
        """
        Generates samples from normal distribution q(z|x).
        Args:
            distr = (mu, sigma): tuple(Tensor, Tensor) - the normal distribution parameters.
                mu: Tensor - the matrix of shape batch_size x latent_dim.
                sigma: Tensor - the matrix of shape batch_size x latent_dim.
            num_samples: int - the number of samples for each element.
        Returns:
            Tensor - the tensor of shape batch_size x num_samples x latent_dim - samples from normal distribution in latent space.
        Example:
            >>>
        """
        mu, sigma = distr
        mu = mu.to(self.device)
        sigma = sigma.to(self.device)

        batch_size = mu.shape[0]

        bias = mu.view([batch_size, 1, self.latent_dim])

        epsilon = torch.randn([batch_size, num_samples, self.latent_dim],
                              requires_grad=True,
                              device=self.device)
        scale = sigma.view([batch_size, 1, self.latent_dim])

        return bias + epsilon * scale

    def sample_x(self, z):
        z = z.to(self.device)

        out = self.generative_layear(z.view([z.shape[0], self.latent_dim]))

        if self.num_layers > 1:
            out = self.middle_layer(out)

        return self.output_layer(out)

    def loss(self, batch_x, batch_y):
        """
        Calculate ELBO approximation of log likelihood for given batch with negative sign.
        Args:
            batch_x: FloatTensor - the matrix of shape batch_size x input_dim.
            batch_y: FloatTensor - dont uses parameter in this model.
        Returns:
            Tensor - scalar, ELBO approximation of log likelihood for given batch with negative sign.
        Example:
            >>>
        """
        batch_x = batch_x.to(self.device)
        batch_y = batch_y.to(self.device)

        batch_size = batch_x.shape[0]

        propos_distr = self.q_z(batch_x)
        pri_distr = self.p_z(batch_size)

        sample_x = self.sample_x(self.sample_z(propos_distr))

        expectation = torch.nn.functional.mse_loss(sample_x, batch_x)
        divergence = -1 * torch.mean(self.divergence_KL_normal(propos_distr, pri_distr), dim=0)

        return expectation - divergence

    @staticmethod
    def divergence_KL_normal(q_distr, p_distr):
        """
        Calculate KL-divergence KL(q||p) between n-pairs of normal distribution.
        Args:
            q_distr=(mu, sigma): tuple(Tensor, Tensor) - the normal distribution parameters.
                mu: Tensor - the matrix of shape batch_size x latent_dim.
                sigma: Tensor - the matrix of shape batch_size x latent_dim.
            p_distr=(mu, sigma): tuple(Tensor, Tensor) - the normal distribution parameters.
                mu: Tensor - the matrix of shape batch_size x latent_dim.
                sigma: Tensor - the matrix of shape batch_size x latent_dim.
        Returns:
            Tensor - the vector of shape n, each value of which is a KL-divergence between pair of normal distribution.
        Example:
            >>>
        """
        q_mu, q_sigma = q_distr
        p_mu, p_sigma = p_distr

        D_KL = torch.sum((q_sigma / p_sigma)**2, dim=1)
        D_KL -= p_mu.shape[1]
        D_KL += 2 * torch.sum(torch.log(p_sigma), dim=1) - \
            2 * torch.sum(torch.log(q_sigma), dim=1)
        D_KL += torch.sum((p_mu - q_mu) * (p_mu - q_mu) / (p_sigma**2), dim=1)
        return 0.5 * D_KL

    def forward(self, x):
        """
        Generate decoded sample after encoding.
        Args:
            x: Tensor - the matrix of shape batch_size x input_dim.
        Returns:
            Tensor - the matrix of shape batch_size x input_dim.
        Example:
            >>>
        """

        z = self.sample_z(self.q_z(x))

        return self.sample_x(z)


## Выборка

In [10]:
class Dataset(torch.utils.data.Dataset):
    def __init__(self, dataset):
        self.dataset = dataset

    def __len__(self):
        return self.dataset.shape[0]

    def __getitem__(self, idx):
        return self.dataset[idx, :], 0


In [11]:

def CreateDataset(input_size, train_size = 10000, test_size = 2000, n_clusters = 5):
    np.random.seed(0)

    # Параметры кластеров
    means = np.random.rand(n_clusters, input_size) * 10
    std_devs = np.random.rand(n_clusters, input_size)

    # Генерация данных для каждого кластера
    dataset_train = np.ndarray((train_size, input_size))
    dataset_test = np.ndarray((test_size, input_size))

    for i, (mean, std_dev) in enumerate(zip(means, std_devs)):
        data = np.random.normal(loc=mean, scale=std_dev, size=(train_size, input_size))
        dataset_train = np.concatenate((dataset_train, data), axis=0)

        data = np.random.normal(loc=mean, scale=std_dev, size=(test_size, input_size))
        dataset_test = np.concatenate((dataset_test, data), axis=0)

    train_size *= n_clusters
    test_size *= n_clusters

    dataset_train = np.array(dataset_train, dtype=np.float32)
    dataset_test = np.array(dataset_test, dtype=np.float32)

    return dataset_train, dataset_test


## Обучение

In [None]:
grid = ParameterGrid({'latent_dim': [50, 300],
                      'hidden_dim': [100, 500],
                      'num_layers': [2, 5],
                      'input_dim':  [200, 400]})

for item in tqdm(grid):
    model = VAE(**item)
    model.to(device)

    name = 'experiment/latent{}_hidden{}_layers{}_input{}'.format(
            item['latent_dim'], item['hidden_dim'], item['num_layers'], item['input_dim'])

    writer = SummaryWriter(log_dir = name)

    dataset_train, dataset_test = CreateDataset(item['input_dim'])

    optimizer = torch.optim.Adam(list(model.parameters()), lr = 1e-3)

    call = callback(writer, Dataset(dataset_test), None, delimeter = 10)

    trainer(count_of_epoch = 5,
            batch_size = 64,
            model = model,
            dataset = Dataset(dataset_train),
            loss_function = None,
            optimizer = optimizer,
            callback = call)


## Результаты

В данной работе был рассмотрен вариационной автокодировщик, на примере синтетической выборки из нормального распределения нескольких кластеров.

Худшие результаты показали модели, у которых размерность латентного представления была больше скрытого. Такие модели также были неустойчевы к выбросам в выборке.

Наилучшие результаты показали модели, у которых отношения размерности латентного представления к скрытому сильно меньше 1. Число слоев и входная размерность не сильно влияло.

Повышение размерности скрытого представления повышало точность.