**SOW-MKI49: Neural Information Processing Systems**  
*Weeks 4 and 5: Assignment (100 points + 20 bonus points)  
Author: Luca and Umut

In [1]:
# Group number: 24
# Franka Buytenhuijs, s4356845
# Hugo Chateau-Laurent, s1023970
# Maria Tsfasman, s1021505

In [39]:
gpu = True

In [51]:
from chainer import cuda, datasets, serializers
from chainer.dataset import DatasetMixin, concat_examples
from chainer.iterators import MultithreadIterator
import chainer
import chainer.functions as F
import chainer.links as L
import cupy as np


if gpu:
    import cupy as np
else:
    import numpy as np
import tqdm
import os

This is the decoder class. It transforms latents (features) to observables (images). It corresponds to p(x | z) in the context of variational inference (and the slides), where x is observables and y is latents.

Task: (10 points)

- Implement the decoder class for a variational autoencoder. Note that the decoder should output the Gaussian distribution parameters (mean and variance per pixel) of images rather than images themselves.

In [38]:
class Decoder(chainer.ChainList):
    def __init__(self, n_hlay, n_z, n_h, n_x): # <= you might want to pass some architecture parameters (e.g., #i/o units, etc.) here
        
        # Parameters:
        #  n_hlay: number of hidden layers
        #  n_z: dimension of latent space
        #  n_h: dimension of hidden layers (if n_hlay>0)
        #  n_x: dimension of x_out (=dimension of x_in)
        
        links=()
        
        if(n_hlay>0):
            links += (L.Linear(n_z, n_h),)
            for _ in range(n_hlay-1):
                links += (L.Linear(n_h, n_h),)
            n_z = n_h
        
        links += (L.Linear(n_z, n_x),) # mean
        links += (L.Linear(n_z, n_x),) # variance
                
        super(Decoder, self).__init__(*links)

    def __call__(self, z):         
        for layer in self[:-2]:
            z = F.relu(layer(z))       
        mean = F.relu(self[-2](z))
        var = F.relu(self[-1](z))
        
        return mean, var


This is the encoder class. It transforms observables (images) to latents (features). It corresponds to q(z | x) in the context of variational inference (and the slides), where z is latents and x is observables.

Task: (10 points)

- Implement the encoder class for a variational autoencoder. Note that the encoder should output the Gaussian distribution parameters (mean and variance per feature) of features rather than features themselves.

In [22]:
class Encoder(chainer.ChainList):
    def __init__(self, n_hlay, n_x, n_h, n_z): # <= you might want to pass some architecture parameters (e.g., #i/o units, etc.) here
        
        # Parameters:
        #  n_hlay: number of hidden layers
        #  n_x: dimension of x_out (=dimension of x_in)
        #  n_h: dimension of hidden layers (if n_hlay>0)
        #  n_z: dimension of latent space

        links=()
        
        if(n_hlay>0):
            links += (L.Linear(n_x, n_h),)
            for _ in range(n_hlay-1):
                links += (L.Linear(n_h, n_h),)
            n_x = n_h
        
        links += (L.Linear(n_x, n_z),) # mean
        links += (L.Linear(n_x, n_z),) # variance
                
        super(Decoder, self).__init__(*links)

    def __call__(self, x):         
        for layer in self[:-2]:
            x = F.relu(layer(x))       
        mean = F.relu(self[-2](x))
        var = F.relu(self[-1](x))
        
        return mean, var


This is the loss class. The loss of encoder and decoder of a variational autoencoder is the evidence lower bound as follows:

$L = D_{KL}(q(z | x), p(z)) -  E_{z\sim q}[log p(x | z)]$

The first term above is the KL divergence between the approximate posterior (q) and the prior (p), which can be interpreted as a form of regularization. You can assume that the prior is unit Gaussian. It can be implemented with the F.gaussian_kl_divergence function in Chainer.

The second term above is the Gaussian negative log likelihood. This is the term that fits the data, which is very similar to the usual loss functions that you use in deep learning. It can be implemented with the F.gaussian_nll function in Chainer.

Task: 

- Implement the loss class. (10 points)

As input, it gets the following arguments:

mean_y => mean of the encoded features (output of the encoder)  
ln_var_y => log variance of the encoded features (output of the encoder)  
x => input images (mini batch)  
mean_x => mean of the decoded images (output of the decoder)  
ln_var_x => mean of the decoded images (output of the decoder)  

As output, it gives the loss.

- Explain why we use log variance instead of variance. (5 points)

    <span style="color:blue">tractability of posterior? (I need to read the slides again)</span>
    

In [35]:
class Loss(object):
    def __call__(self, mean_z, ln_var_z, x, mean_x, ln_var_x):
        return F.gaussian_kl_divergence(mean_z, ln_var_z) - F.gaussian_nll(x, mean_x, ln_var_x)

This is the model class. It combines the encoder and the decoder.

Task: (20 points)

- Implement the reparameterziation trick for sampling latents. (10 points)
- Explain why we need to use this trick. (10 points)

    <span style="color:blue">answer here</span>

In [48]:
class Model(chainer.Chain):
    def __init__(self, decoder, encoder):
        super(Model, self).__init__()

        with self.init_scope():
            self.decoder = decoder
            self.encoder = encoder

    def __call__(self, x):
        ln_var_z, mean_z = self.encoder(x)
        
        # Sample latents (z) from the Gaussian with parameters ln_var_z, mean_z by using the reparameterization trick
        
        ln_var_x, mean_x = self.decoder(z)

        return mean_z, ln_var_z, x, mean_x, ln_var_x
    
    @classmethod
    def load(cls, directory):
        self = cls()
        self.log = np.load('{}/log.npy'.format(directory))
        serializers.load_npz('{}/weights.npz'.format(directory), self.model)
        serializers.load_npz('{}/optimizer.npz'.format(directory), self.optimizer)

        return self

    def save(self, directory):
        np.save('{}/log.npy'.format(directory), self.log)
        serializers.save_npz('{}/weights.npz'.format(directory), self.model)
        serializers.save_npz('{}/optimizer.npz'.format(directory), self.optimizer)

    

This is a helper class to use the Mnist dataset.

In [49]:
class Mnist(DatasetMixin):
    def __init__(self):
        self.dataset = datasets.get_mnist(False)[0 if chainer.config.train else 1]

    def __len__(self):
        return len(self.dataset)

    def get_example(self, i):
        return self.dataset[i]

Task: (50 points)

- Train the above defined variational autoencoder on the Mnist dataset. You can refer to the earlier assignments to implement your training loop. (25 points)

- How good are the samples? Randomy sample some digits and visualize them. (10 points)

- How good are the reconstructions? Draw an Mnist like digit, encode it, decode it and visualize the digits. How different is the reconstruction from the original. (10 points)

- Repeat the last task but by drawing something other than a digit (e.g., a face). How accuracte is the reconstructions? Explain the results. (5 points)

In [52]:
class Optimizer():
    def __init__(self, model):
        self.model = model
        self.log = {('test', 'loss'): (),
                    ('training', 'loss'): ()}        
        self.optimizer = optimizers.Adam()
        self.optimizer.setup(self.model)
        self.loss = Loss()
        
    def test(self, x):
        with chainer.using_config('train', False):
            err = self.loss(*self.model(x)) # Forward propagation            
            self.log['test', 'loss'] += (float(err.data),)
    
    def train(self, x):
        err = self.loss(*self.model(x)) # Forward propagation 
        self.model.cleargrads()
        err.backward() # backpropagation
        self.optimizer.update()        
        self.log['training', 'loss'] += (float(err.data),)
        
if True:
    # model from scratch
    encoder = Encoder(1,?,?,?)
    decoder = Decoder(1,?,?,?)
    vae = Model(decoder, encoder)
else:
    # model from trained weights
    vae = Model.load("./Models/100")

if gpu:
    vae.model.to_gpu()
optimizer = Optimizer(vae)    
if gpu:
    optimizer.model.to_gpu()
    

epochs = 100
batch_size = 10
training_iterator = iterators.SerialIterator(training_set, batch_size, False, True)
test_iterator = iterators.SerialIterator(test_set , batch_size, False, False)


SyntaxError: invalid syntax (<ipython-input-52-3c2332fb2940>, line 11)

In [46]:
# Create new folder to avoid FileExistsError
root = '.'
i=-1
path = '{}/Models'.format(root)
if not os.path.isdir(path):
    os.makedirs(path)
while i==-1 or os.path.isdir(path):
    i+=1
    path = '{}/Models/Weights_{}'.format(root, i)
os.makedirs('{}/Models/Weights_{}'.format(root, i))

for epoch in tqdm.tnrange(epochs):
    training_iterator.reset()
    test_iterator.reset()
    
    for j, batch in enumerate(training_iterator):
        t, x = concat_examples(batch, device = 0 if gpu else -1)
        optimizer.train(x)
    for j, batch in enumerate(test_iterator):
        t, x = concat_examples(batch, device = 0 if gpu else -1)
        optimizer.test(x)
        
    
    # store weights and loss
    os.makedirs('{}/{}'.format(path, epoch))
    vae.save('{}/{}'.format(path, epoch))
    
for i, key in enumerate(optimizer.log):
    plt.subplot(221 + i)
    plt.plot(numpy.array(optimizer.log[key]).reshape(epoch + 1, -1).mean(1))
    plt.xlabel('iteration')
    plt.ylabel(key)
plt.tight_layout()
plt.show()

NameError: name 'epochs' is not defined

Bonus task: Try the same experiments on a different dataset. (20 bonus points)