<a href="https://colab.research.google.com/github/Kwanikaze/vpandas/blob/master/VAE_OHE_8digits.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
import matplotlib.pyplot as plt
import numpy as np

## Generate Data

In [2]:
def generate_data(num=8):
    """ Generate 'num' number of one-hot encoded integers. """ 
    x_train = np.eye(num)[np.arange(num)]                       # This is a simple way to one-hot encode integers
    
    # Repeat x_train multiple times for training
    x_train = np.repeat(x_train, 100, axis=0)
    
    # The target is x_train itself!
    x_target = x_train.copy()
    return x_train, x_target

In [3]:
num = 8
np.random.seed(10)
x_train, x_target = generate_data(num=num)

In [4]:
print(x_train)
print(np.shape(x_train))
print(np.shape(x_target))

[[1. 0. 0. ... 0. 0. 0.]
 [1. 0. 0. ... 0. 0. 0.]
 [1. 0. 0. ... 0. 0. 0.]
 ...
 [0. 0. 0. ... 0. 0. 1.]
 [0. 0. 0. ... 0. 0. 1.]
 [0. 0. 0. ... 0. 0. 1.]]
(800, 8)
(800, 8)


## Variational Autoencoder Parameters

In [5]:
#Parameters
latent_dims = 3
num_epochs = 2000
batch_size = 64
learning_rate = 1e-3
use_gpu = True

## VAE Definition
https://colab.research.google.com/github/smartgeometry-ucl/dl4g/blob/master/variational_autoencoder.ipynb#scrollTo=0psoODlF9S_Y

https://stats.stackexchange.com/questions/361643/sampling-z-in-vae

https://stats.stackexchange.com/questions/318748/deriving-the-kl-divergence-loss-for-vaes

In [6]:
class VariationalAutoencoder(nn.Module):
    def __init__(self, latent_dims):
        super().__init__()
        self.fc1 = nn.Linear(num, latent_dims) # why have this additional fc layer?
        self.fc_mu = nn.Linear(latent_dims, latent_dims)
        self.fc_logvar = nn.Linear(latent_dims, latent_dims)
        self.fc_out = nn.Linear(latent_dims,num)

    def encode(self, x):
        h1 = torch.sigmoid(self.fc1(x))
        return self.fc_mu(h1), self.fc_logvar(h1)

    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5*logvar)
        eps = torch.randn_like(std)
        return mu + eps*std

    def decode(self, z):
        if z.size()[0] == latent_dims: #resize from [3] to [1,3]
          z = z.view(1, latent_dims)
        softmax = nn.Softmax(dim=1)
        recon = softmax(self.fc_out(z))
        return recon

    def forward(self, x, latent_dims):
        mu, logvar = self.encode(x)
        z = self.reparameterize(mu, logvar)
        return self.decode(z), mu, logvar

def vae_loss(batch_recon, x_batch_targets, mu, logvar):
  criterion = nn.CrossEntropyLoss()
  CE = criterion(batch_recon, x_batch_targets)
  #print(CE)
  #BCE = F.binary_cross_entropy(recon, x, reduction='sum') #one hot encoded input
  KLd = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp()) # https://stats.stackexchange.com/questions/318748/deriving-the-kl-divergence-loss-for-vaes
  #print(KLd)
  return CE + 0.001*KLd

In [7]:
def trainVAE(VAE, latent_dims):
  VAE.train()
  x_train, x_target = generate_data(num=num)
  inds = list(range(x_train.shape[0]))
  N = x_train.shape[0] # 800
  freq = num_epochs // 10 # floor division

  loss_hist = []
  x_train = Variable(torch.from_numpy(x_train))
  x_target = Variable(torch.from_numpy(x_target))
  for epoch in range(num_epochs):
      inds = np.random.permutation(inds)
      x_train = x_train[inds]
      x_train = x_train.to(device)
      x_target = x_target[inds]
      x_target = x_target.to(device)
      
      loss = 0
      num_batches = N / batch_size
      for b in range(0, N, batch_size):
          #get the mini-batch
          x_batch = x_train[b: b+batch_size]
          x_target_batch = x_target[b: b+batch_size]
          
          #feed forward
          batch_recon,latent_mu,latent_logvar = VAE(x=x_batch.float(),latent_dims = latent_dims)
          
          # Error
          #Convert x_batch from OHE vectors to single scalar for target class, of each sample in batch 
          _, x_batch_targets = x_batch.max(dim=1)
          train_loss = vae_loss(batch_recon, x_batch_targets, latent_mu, latent_logvar)
          #print(batch_recon.size())
          #print(x_batch_targets.size())
          loss += train_loss.item() / N # update epoch loss
          
          #Backprop the error, compute the gradient
          optimizer.zero_grad()
          train_loss.backward()
          
          #update parameters based on gradient
          optimizer.step()
          
      #Record loss per epoch        
      loss_hist.append(loss)
      
      if epoch % freq == 0:
          print()
          print("Epoch %d/%d\tloss=%.5f" % (epoch + 1, num_epochs, loss), end='\t', flush=True)
          
          #Test with all training data
          VAE.eval()
          train_recon, train_mu, train_logvar = VAE(x = x_train.float(),latent_dims=latent_dims)
          _, x_targets = x_target.max(dim=1)
          l = vae_loss(train_recon, x_targets, train_mu, train_logvar)
          print("Test loss: {:.5f}".format(l.item()), end='')
      
  print("\nTraining finished!")

## Latent dimensions set to 3

In [8]:
#  use gpu if available
device = torch.device("cuda:0" if use_gpu and torch.cuda.is_available() else "cpu")
VAE = VariationalAutoencoder(latent_dims=3)
VAE = VAE.to(device)
num_params = sum(p.numel() for p in VAE.parameters() if p.requires_grad)
print(VAE.parameters)
print("Number of parameters: %d" % num_params) #8*3 + 3 = 27, 3*8 + 8 = 32, 27+32

# optimizer object
optimizer = torch.optim.Adam(params = VAE.parameters(), lr = learning_rate)
#criterion = nn.CrossEntropyLoss()    # for target, does not accept a OHE vector
#criterion = nn.NLLLoss()

trainVAE(VAE, latent_dims=3)

<bound method Module.parameters of VariationalAutoencoder(
  (fc1): Linear(in_features=8, out_features=3, bias=True)
  (fc_mu): Linear(in_features=3, out_features=3, bias=True)
  (fc_logvar): Linear(in_features=3, out_features=3, bias=True)
  (fc_out): Linear(in_features=3, out_features=8, bias=True)
)>
Number of parameters: 83

Epoch 1/2000	loss=0.03433	Test loss: 2.38692
Epoch 201/2000	loss=0.02827	Test loss: 3.77613
Epoch 401/2000	loss=0.02512	Test loss: 4.03435
Epoch 601/2000	loss=0.02460	Test loss: 3.83993
Epoch 801/2000	loss=0.02450	Test loss: 3.77158
Epoch 1001/2000	loss=0.02429	Test loss: 3.70782
Epoch 1201/2000	loss=0.02417	Test loss: 3.70442
Epoch 1401/2000	loss=0.02417	Test loss: 3.66809
Epoch 1601/2000	loss=0.02413	Test loss: 3.67263
Epoch 1801/2000	loss=0.02404	Test loss: 3.64308
Training finished!


In [9]:
print("Print prediction results:")
x_test = np.eye(num)[np.arange(num)]                        # Test data (one-hot encoded)
x_test = Variable(torch.from_numpy(x_test))
x_test = x_test.to(device)
#np.set_printoptions(2)
for x in x_test:
    print("\tInput: {} \t Output: {}".format(x.cpu().detach().numpy(), np.round(VAE(x=x.float(),latent_dims=3)[0].cpu().detach().numpy(),decimals=2)))

Print prediction results:
	Input: [1. 0. 0. 0. 0. 0. 0. 0.] 	 Output: [[1. 0. 0. 0. 0. 0. 0. 0.]]
	Input: [0. 1. 0. 0. 0. 0. 0. 0.] 	 Output: [[0. 1. 0. 0. 0. 0. 0. 0.]]
	Input: [0. 0. 1. 0. 0. 0. 0. 0.] 	 Output: [[0. 0. 1. 0. 0. 0. 0. 0.]]
	Input: [0. 0. 0. 1. 0. 0. 0. 0.] 	 Output: [[0. 0. 0. 1. 0. 0. 0. 0.]]
	Input: [0. 0. 0. 0. 1. 0. 0. 0.] 	 Output: [[0. 0. 0. 0. 1. 0. 0. 0.]]
	Input: [0. 0. 0. 0. 0. 1. 0. 0.] 	 Output: [[0. 0. 0. 0. 0. 1. 0. 0.]]
	Input: [0. 0. 0. 0. 0. 0. 1. 0.] 	 Output: [[0. 0. 0. 0. 0. 0. 1. 0.]]
	Input: [0. 0. 0. 0. 0. 0. 0. 1.] 	 Output: [[0.   0.   0.   0.   0.   0.16 0.   0.84]]


## Extract intermediate features using Forward Hook

In [10]:
def printnorm_encoder(self, input1, output):
    # input is a tuple of packed inputs
    # output is a Tensor. output.data is the Tensor we are interested
    print('\tInside ' + self.__class__.__name__ + ' forward')
    #print('\t input:', input1.cpu().detach().numpy())
    print('\t output rounded to 2 decimals:', np.round(output.cpu().detach().numpy(),decimals=2))
    print('\t output rounded to integer:', np.round(output.cpu().detach().numpy(),decimals=0))

In [11]:
def inside_decoder(self, input1, output):
    # input is a tuple of packed inputs
    # output is a Tensor. output.data is the Tensor we are interested
    print('\tInside ' + self.__class__.__name__ + ' forward')
    #print(input1[0].cpu().detach().numpy())
    #print('\t input:', input1.cpu().detach().numpy())
    print('\t output:', input1[0].cpu().detach().numpy())
    print('\t output rounded to 2 decimals:', np.round(input1[0].cpu().detach().numpy(),2))

In [12]:
#AE.encoder_layer.register_forward_hook(printnorm_encoder)
decoder_hook = AE.decoder_layer.register_forward_hook(inside_decoder)
for x in x_test:
    print('INPUT: {}'.format(x.cpu().detach().numpy()))
    out = AE(features=x.float(),latent_dims=3)
    #print(out)
decoder_hook.remove() #remove hook after use

NameError: ignored

When the number of latent dimensions equals 3, each latent variable is binary.

## Latent dimensions set to 4

In [None]:
AE = Autoencoder(latent_dims=4)
AE = AE.to(device)

# optimizer object
optimizer = torch.optim.Adam(params = AE.parameters(), lr = learning_rate)
#criterion = nn.CrossEntropyLoss()    # for target, does not accept a OHE vector
criterion = nn.NLLLoss()

trainAE(AE)

In [None]:
print("Print prediction results:")
x_test = np.eye(num)[np.arange(num)]                        # Test data (one-hot encoded)
x_test = Variable(torch.from_numpy(x_test))
x_test = x_test.to(device)
#np.set_printoptions(2)
for x in x_test:
    print("\tInput: {} \t Output: {}".format(x.cpu().detach().numpy(), np.round(AE(features=x.float(),latent_dims=4).cpu().detach().numpy(),decimals=2)))

In [None]:
decoder_hook = AE.decoder_layer.register_forward_hook(inside_decoder)
for x in x_test:
    print('INPUT: {}'.format(x.cpu().detach().numpy()))
    out = AE(features=x.float(),latent_dims=4)
    #print(out)
decoder_hook.remove() #remove hook after use