
# Variational Autoencoder

In this notebook, you will implement a variational autoencoder and its variations. You will learn how to encode images into latent vectors follow gaussian distribution, and how to control images decoded from latent spaces with condtional variational autoencoder.

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn import init
import torchvision
import torch.optim as optim
from torch.utils.data import DataLoader
from torch.utils.data import sampler
from torch.autograd import Variable
from torch.nn import functional as F
from torchvision import datasets, transforms
from visualize_util import visualize_img_batch, one_hot

In [None]:
#Use this if you are running on any Cuda enabled system
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(device)

## Load MNIST Dataset

In [None]:
# TODO
input_dim = 28*28
hidden_dim = None
latent_dim = None
batch_size = None
n_epochs = None
# convert data to torch.FloatTensor
transform = pass

num_classes = 10
portioned_img_size = 14*14

mnist_train = datasets.MNIST('./data', 
               transform=transform, 
               download=True, train=True)

loader_train = DataLoader(mnist_train, batch_size=batch_size,
                          shuffle=True, drop_last=True, num_workers=2)

mnist_test = datasets.MNIST('./data', 
               transform=torchvision.transforms.ToTensor(), 
               download=True, train=False)

loader_test = DataLoader(mnist_test, batch_size=batch_size,
                          shuffle=True, drop_last=True, num_workers=2)                        

In [None]:
class myVAE(nn.Module):
    def __init__(self, input_dim, hidden_dim, latent_dim, conditional_vec_dim=0):
        super(myVAE, self).__init__()
        self.input_dim =  pass # H*W
        self.latent_dim =  pass # Z
        self.hidden_dim = pass 
        self.conditional_vec_dim = conditional_vec_dim # only used in CVAE

        self.encoder = pass # input: (N,1,H,W) -> output: (N,hidden_dim)
        self.mean_layer = pass  # input: (N,hidden_dim) -> output: (N, Z)
        self.logvar_layer = pass  # input: (N,hidden_dim) -> output: (N, Z)
        self.decoder = pass  # input: (N, Z) -> output: (N,1,H,W)
        
    
    def forward(self, x):
        # TODO
        x_hat = None
        mu = None
        logvar = None

        return x_hat, mu, logvar

    @staticmethod
    def reparametrize(mu, logvar):
        # TODO:
        z = None
        return z

    def loss_function(x_hat, x, mu, logvar):
        # TODO: Compute negative variational lowerbound loss as described in the handout
        loss = None
        KL_loss = None
        reconstruction_loss = None

        return loss

In [None]:
from torchsummary import summary
vae = myVAE(input_dim, hidden_dim, latent_dim).to(device)
summary(vae,(1,28,28))

## Train

In [None]:
#TODO
optimizer = None 
for epoch in range(n_epochs):
  vae.train()
  train_loss = 0
  loss = None
  for batch_idx, (data, labels) in enumerate(loader_train):
      #TODO
      # clear the gradients of all optimized variables
      # forward pass:
      # calculate the loss using the loss function defined above
      # backward pass: compute gradient of the loss with respect to model parameters
      # perform a single optimization step (parameter update)
      # update running training loss
    pass
  train_loss = train_loss/len(loader_train)
  print('Epoch: {} \tTraining Loss: {:.6f}'.format(epoch, train_loss))

## Results

In [None]:
vae.eval()
images = next(iter(loader_test))[0][0:10,:,:,:] # assume batch size >10
images = images.repeat(10,1,1,1)

samples, mu, logvar = vae(images.to(device))
# show vae sampled images
print('sampled image')
visualize_img_batch(samples.detach().cpu().numpy().squeeze())
plt.show()

z = myVAE.reparametrize(mu, logvar+3)
samples = vae.decoder(z)

# show vae sampled images with larger variance
print('sampled image with larger variance')
visualize_img_batch(samples.detach().cpu().numpy().squeeze())
plt.show()

# CVAE

In [None]:
cvae = myVAE(input_dim, latent_dim=latent_dim,hidden_dim=hidden_dim, conditional_vec_size=num_classes).to(device)
optimizer = None

for epoch in range(n_epochs):
  cvae.train()
  train_loss = 0
  loss = None
  for batch_idx, (data, labels) in enumerate(loader_train):
      # Calculate the onehot vector based on the label, use the pre-defined function
      # clear the gradients of all optimized variables
      # Forward pass with the one-hot vector
      # calculate the loss using the loss function defined above
      # backward pass: compute gradient of the loss with respect to model parameters
      # perform a single optimization step (parameter update)
      # update running training loss
      pass
  train_loss = train_loss/len(loader_train)
  print('Epoch: {} \tTraining Loss: {:.6f}'.format(epoch, train_loss))

## Results

In [None]:
z = torch.randn(10, latent_dim)
c = torch.eye(10, 10) # [one hot labels for 0-9]
z = torch.cat((z,c), dim=-1).to(device)
cvae.eval()
samples = cvae.decoder(z)
# show vae sampled images with larger variation
print('sampled image')
visualize_img_batch(samples.detach().cpu().numpy().squeeze())
plt.show()

## CVAE + structured output prediction

In [None]:
cvae = myVAE(input_dim, latent_dim=latent_dim, hidden_dim=hidden_dim, conditional_vec_size = portioned_img_size)
cvae.to(device)
optimizer = None
for epoch in range(n_epochs):
  cvae.train()
  train_loss = 0
  loss = None
  for batch_idx, (data, labels) in enumerate(loader_train):
      # Slice the bottom left portion of the image
      # Flatten it/ reshape it to (-1,14*14)
      # clear the gradients of all optimized variables
      # Forward pass with the conditional vector
      # calculate the loss using the loss function defined above
      # backward pass: compute gradient of the loss with respect to model parameters
      # perform a single optimization step (parameter update)
      # update running training loss
      pass
  train_loss = train_loss/len(loader_train)
  print('Epoch: {} \tTraining Loss: {:.6f}'.format(epoch, train_loss))

## Baseline NN for structured output prediction

In [None]:
class BaselineNN(nn.Module):
    def __init__(self, input_dim):
        super().__init__()
        #TODO

    def forward(self, z):
        #TODO
        return z

In [None]:
baseline =  BaselineNN(input_dim = portioned_img_size).to(device)
criterion = None #TODO
optimizer = None #TODO

for epoch in range(0, n_epochs):
  baseline.train()
  train_loss = 0
  loss = None
  for batch_idx, (data, labels) in enumerate(loader_train):
      # Slice the bottom left portion of the image
      # Flatten it/ reshape it to (-1,14*14)
      # clear the gradients of all optimized variables
      # Forward pass with the conditional vector as input
      # calculate the loss using the criterion function defined above
      # backward pass: compute gradient of the loss with respect to model parameters
      # perform a single optimization step (parameter update)
      # update running training loss
      pass
  train_loss = train_loss/len(loader_train)
  print('Epoch: {} \tTraining Loss: {:.6f}'.format(epoch, train_loss))

## Results

In [None]:
full_imgs = next(iter(loader_test))[0][0:10,:,:,:]
# TODO
# Slice the bottom left portion of the image
quad_imgs = None #(10,1,14,14)

print('Ground truth')
visualize_img_batch(full_imgs.detach().cpu().numpy().squeeze())
plt.show()

print('Quarter image')
visualize_img_batch(quad_imgs.detach().cpu().numpy().squeeze())
plt.show()

print('Baseline results')
baseline.eval()
samples = baseline(quad_imgs.to(device))
visualize_img_batch(samples.detach().cpu().numpy().squeeze())
plt.show()

print('CVAE results')
cvae.eval()
num_random = 5
quad_imgs = quad_imgs.repeat(num_random ,1,1,1)
quad_imgs = torch.reshape(quad_imgs,(-1, 14*14))

z = torch.randn(num_random*10, latent_dim)
z = pass #TODO: concat latent vector with the conditional_vec (quad_imgs)
samples = cvae.decoder(z).data
visualize_img_batch(samples.detach().cpu().numpy().squeeze())
plt.show()