<a href="https://colab.research.google.com/github/GiovaniValdrighi/inferencia_causal/blob/master/vae.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
!pip3 install -q http://download.pytorch.org/whl/{accelerator}/torch-0.4.0-{platform}-linux_x86_64.whl
!pip3 install torchvision
!pip3 install pyro-ppl

In [0]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import pyro
import pyro.distributions
import pyro.infer
import pyro.optim
import torchvision.transforms as transforms
import torch
import torch.nn as nn
import networkx as nx

Estudo de variational autoencoder através do texto: https://towardsdatascience.com/understanding-variational-autoencoders-vaes-f70510919f73

Implementação de variational autoencoder com Pyro através do texto: https://pyro.ai/examples/vae.html

#Variational Autoencoder

De modo simples, um **autoencoder** é um método utilizado para diminuir a dimensão de informação. Utilizando de redes neurais criamos duas, um **encoder** e um **decoder**. Seja nossa informação **X** uma matriz de dimensão **(n m)**, o encoder é uma rede neural responsável por transformar **X** para uma dimensão menor, se tornará o que chamamos de **variável latente**, **Z**. Em seguida, o decoder é responsável por transformar **Z** para as dimensões originais, teremos a matriz **X'**. O aprendizado da rede neural ocorre atráves da otimização da distância entre **X** e **X'**. 

No entanto, em um autoencoder simples, o espaço latente não é estável, isto é, valores similhares de **Z** não são similares à **X'**. Para isto, utilizamos de um **variational autoencoder**. A diferença é que transformamos **X** em uma distribuição no espaço latente, e não um valor constante. Assim, para recuperarmos com o decoder, utilizamos de uma amostra aleatória da distribuição do espaço latente, e por consequência do método de treinamento, teremos um decoder que leva amostras aleatórias semlehantes para **X'** semelhantes.

Por conveniência, a distribuição do espaço latente é uma Gaussiana, sendo assim,o encoder ao receber **X** retorna a média e a variância da distribuição, geramos uma amostra aleatória de uma normal padrão e por fim repassamos a variável latente para o decoder.

In [3]:
from google.colab import files
uploaded = files.upload()

Saving dsprites_ndarray_co1sh3sc6or40x32y32_64x64.npz to dsprites_ndarray_co1sh3sc6or40x32y32_64x64 (1).npz


As propriedades latentes das imagens são: shape, scale, size, position X e position Y. Vou criar um DAG para inserir relações causais entre as variáveis e em seguida criar um SCM para este DAG.

In [10]:
dataset_zip = np.load('dsprites_ndarray_co1sh3sc6or40x32y32_64x64.npz', allow_pickle = True, encoding = 'bytes')
print('Keys in the dataset:', dataset_zip.files)
imgs = dataset_zip['imgs']
latents_values = dataset_zip['latents_values']
latents_classes = dataset_zip['latents_classes']
latents_sizes = dataset_zip['metadata'][()][b'latents_sizes']
latents_names = dataset_zip['metadata'][()][b'latents_names']

Keys in the dataset: ['metadata', 'imgs', 'latents_classes', 'latents_values']


In [0]:
#function that pick images from the dataset and return the batchs
#with test and training data
def setup_data_loader(batch_size = 128, size_test = 1/5):
  #create the dataframes for training and test from the dataset_zip['imgs]
  #test with 1/5 of the data
  index = np.random.permutation(imgs.shape[0])
  train_df = torch.utils.data.TensorDataset(torch.from_numpy(imgs[index[int(size_test*imgs.shape[0]):]].astype(np.float32).reshape(-1, 4096)), 
                                            torch.from_numpy(latents_classes[index[int(size_test*imgs.shape[0]):]].astype(np.float32)))
  test_df = torch.utils.data.TensorDataset(torch.from_numpy(imgs[:index[int(size_test*imgs.shape[0])]].astype(np.float32).reshape(-1, 4096)), 
                                           torch.from_numpy(latents_classes[:index[int(size_test*imgs.shape[0])]].astype(np.float32)))
  #creates a iterable dataset to train and test, each iteration have batch_size rows of data
  train_loader = torch.utils.data.DataLoader(train_df, batch_size, shuffle = False)
  test_loader = torch.utils.data.DataLoader(test_df, batch_size, shuffle = False)
  return train_loader, test_loader

In [0]:
class Encoder(nn.Module):
  '''This class receive the images data as vectors 1x4096
  and the labels of the figure in the image as a 1x116 vector (dummy variables) 
  and should encode it to the latent space as mean and variance of
  a normal distribution

  :param img_dim: dimension of image vector
  :param label_dim: dimension of label vector
  :param latent_dim: dimension of latent space, output
  '''
  def __init__(self, img_dim = 4096, label_dim = 116, latent_dim = 200):
    super(Encoder, self).__init__()
    self.img_dim = img_dim
    self.label_dim = label_dim
    self.latent_dim = latent_dim 
    #linear transformations used
    self.fc1 = nn.Linear(img_dim + label_dim, 1000)
    self.fc21 = nn.Linear(1000, latent_dim)
    self.fc22 = nn.Linear(1000, latent_dim)
    #non-linear transformation used
    self.softplus = nn.Softplus()

  def forward(self, img, label):
    #use the transformation to get the hidden variable
    data = torch.cat((img, label), -1)
    hidden = self.softplus(self.fc1(data))
    #use the transformation to get the mean and the variance
    mean_z = self.fc21(hidden)
    cov_z = torch.exp(self.fc22(hidden))
    return mean_z, cov_z


class Decoder(nn.Module):
  '''This class receive a sample of the latent variable
  and return the image as a data vector 1x4096 and
  the latent classes as a vector 1x5

  :param img_dim: dimension of image vector
  :param label_dim: dimension of label vector
  :param latent_dim: dimension of latent space, output
  '''
  def __init__(self, img_dim = 4096, label_dim = 116, latent_dim = 200):
    super(Decoder, self).__init__()
    self.img_dim = img_dim
    self.label_dim = label_dim
    self.latent_dim = latent_dim
    #linear transformations used
    self.fc1 = nn.Linear(latent_dim+label_dim, 1000)
    self.fc2 = nn.Linear(1000, img_dim)
    #non-linear transformations used
    self.softplus = nn.Softplus()
    self.sigmoid = nn.Sigmoid()

  def forward(self, latent, label):
    #use the transformation to get the hidden variable
    data = torch.cat((latent, label), -1)
    hidden = self.softplus(self.fc1(data))
    #use the transformation to get the image
    image = self.sigmoid(self.fc2(hidden))
    return image



In [0]:
class VAE(nn.Module):
  '''
  This class define the p(z|x) and the p(x|z)
  and use the scm model to call the encoder and
  decoder

  
  :param img_dim: dimension of image vector
  :param label_dim: dimension of label vector
  :param latent_dim: dimension of latent space, output
  '''
  def __init__(self, latents_sizes, latents_names, img_dim = 4096, label_dim = 116, latent_dim = 200):
    super(VAE, self).__init__()
    #creating networks
    self.encoder = Encoder(img_dim, label_dim, latent_dim)
    self.decoder = Decoder(img_dim, label_dim, latent_dim)
    self.img_dim = img_dim
    self.label_dim = label_dim
    self.latent_dim = latent_dim
    self.latents_sizes = latents_sizes
    self.latents_names = latents_names
  
  def label_variable(self, label):
    new_label = []
    for i, length in enumerate(self.latents_sizes):
      prior = label.new_ones(torch.Size((label.shape[0], length))) / (1.0 *length)
      new_label.append(pyro.sample("label_" + self.latents_names[i], 
                      pyro.distributions.OneHotCategorical(prior), 
                      obs = torch.nn.functional.one_hot(torch.tensor(label[:, i], dtype = torch.int64), int(length))))
    new_label = torch.cat(new_label, -1)
    return torch.tensor(new_label)

  def model(self, img, label):
    '''
    Function in the VAE that defines
    p(x|z)
    '''
    pyro.module("decoder", self.decoder)
    with pyro.plate("data", img.shape[0]):
      z_mean = data.new_zeros(torch.Size((data.shape[0], self.latent_dim)))
      z_variance = data.new_ones(torch.Size((data.shape[0], self.latent_dim)))
      z_sample = pyro.sample("latent", pyro.distributions.Normal(z_mean, z_variance).to_event(1))
      label = self.label_variable(label)
      image = self.decoder.forward(z_sample, label)
      pyro.sample("obs", pyro.distributions.Bernoulli(image).to_event(1), obs = img)


  def guide(self, img, label):
    '''
    Function that is the guide to the model
    shape, scale, orientation, posX, posY = g(img)
    the p(z|x) use on the encoder
    '''
    pyro.module("encoder", self.encoder)
    with pyro.plate("data", img.shape[0]):
      label = self.label_variable(label)
      z_mean, z_variance = self.encoder.forward(img, label)
      pyro.sample("latent", pyro.distributions.Normal(z_mean, z_variance).to_event(1))

In [0]:
#the training routine
train_loader, test_loader = setup_data_loader()
vae = VAE(latents_sizes, latents_names)

#optimizer
optimizer = pyro.optim.Adam({"lr" : 1.0e-3})

#inference algorithm
elbo = pyro.infer.Trace_ELBO()
svi = pyro.infer.SVI(vae.model, vae.guide, optimizer, elbo)

train_elbo = []
test_elbo = []
num_epochs = 5
test_freq = 1
for epoch in range(num_epochs):
  epoch_loss = 0
  for (img, label) in train_loader:
    epoch_loss += svi.step(img, label)
  total_epoch_loss_train = epoch_loss/len(train_loader)
  train_elbo.append(total_epoch_loss_train)
  print("epoch: " + str(epoch) + " average training loss: " + str(epoch_loss))

  if epoch % test_freq == 0:
    test_loss = 0
    for (img, label) in test_loader:
      test_loss += svi.evaluate_loss(img, label)
    total_epoch_loss_test  = epoch_loss/len(test_loader)
    test_elbo.append(total_epoch_loss_test)