In [1]:
"""Variation autoencoder."""

import numpy as np
import tensorflow as tf

from tensorflow import contrib
from tensorflow.contrib import layers
from tensorflow.contrib.slim import fully_connected


In [10]:
class VariationalAutoencoder(object):
    """Varational Autoencoder.
    """
    def __init__(self, ndims=784, nlatent=2):
        """Initializes a VAE. (**Do not change this function**)

        Args:
            ndims(int): Number of dimensions in the feature.
            nlatent(int): Number of dimensions in the latent space.
        """
        tf.reset_default_graph()
        self._ndims = ndims
        self._nlatent = nlatent

        # Create session
        self.session = tf.Session()
        self.x_placeholder = tf.placeholder(tf.float32, [None, ndims])
        self.learning_rate_placeholder = tf.placeholder(tf.float32, [])

        # Build graph.
        self.z_mean, self.z_log_var = self._encoder(self.x_placeholder)
        self.z = self._sample_z(self.z_mean, self.z_log_var)
        self.outputs_tensor = self._decoder(self.z)

        # Setup loss tensor, predict_tensor, update_op_tensor
        self.loss_tensor = self.loss(self.outputs_tensor, self.x_placeholder,
                                     self.z_mean, self.z_log_var)

        self.update_op_tensor = self.update_op(self.loss_tensor,
                                               self.learning_rate_placeholder)

        # Initialize all variables.
        self.session.run(tf.global_variables_initializer())

    def _sample_z(self, z_mean, z_log_var):
        """Samples z using reparametrization trick.

        Args:
            z_mean (tf.Tensor): The latent mean,
                tensor of dimension (None, _nlatent)
            z_log_var (tf.Tensor): The latent log variance,
                tensor of dimension (None, _nlatent)
        Returns:
            z (tf.Tensor): Random sampled z of dimension (None, _nlatent)
        """

        ####### Implementation Here ######
        samples = tf.random_normal(tf.shape(z_log_var), 0, 1, dtype=tf.float32)
        z = tf.add(z_mean , (tf.sqrt(tf.exp(z_log_var)) *samples))
        return z

    def _encoder(self, x):
        """Encoder block of the network.

        Builds a two layer network of fully connected layers, with 100 nodes,
        then 50 nodes, and outputs two branches each with _nlatent nodes
        representing z_mean and z_log_var. Network illustrated below:

                             |-> _nlatent (z_mean)
        Input --> 100 --> 50 -
                             |-> _nlatent (z_log_var)

        Use activation of tf.nn.softplus for hidden layers.

        Args:
            x (tf.Tensor): The input tensor of dimension (None, _ndims).
        Returns:
            z_mean(tf.Tensor): The latent mean, tensor of dimension
                (None, _nlatent).
            z_log_var(tf.Tensor): The latent log variance, tensor of dimension
                (None, _nlatent).
        """
        layer_1= tf.contrib.layers.fully_connected(x, 100, activation_fn=tf.nn.softplus)
        layer_2 = tf.contrib.layers.fully_connected(layer_1, 50, activation_fn=tf.nn.softplus)
        z_mean = tf.contrib.layers.fully_connected(layer_2, self._nlatent, activation_fn=None)
        z_log_var = tf.contrib.layers.fully_connected(layer_2, self._nlatent, activation_fn=None)
        ####### Implementation Here ######
        return z_mean, z_log_var

    def _decoder(self, z):
        """From a sampled z, decode back into image.

        Builds a three layer network of fully connected layers,
        with 50, 100, _ndims nodes.

        z (_nlatent) --> 50 --> 100 --> _ndims.

        Use activation of tf.nn.softplus for hidden layers.

        Args:
            z(tf.Tensor): z from _sample_z of dimension (None, _nlatent).
        Returns:
            f(tf.Tensor): Decoded features, tensor of dimension (None, _ndims).
        """

        ####### Implementation Here ######
        layer_1 = tf.contrib.layers.fully_connected(z, 50, activation_fn=tf.nn.softplus)
        layer_2 = tf.contrib.layers.fully_connected(layer_1, 100, activation_fn=tf.nn.softplus)
        f = tf.contrib.layers.fully_connected(layer_2, self._ndims, activation_fn=tf.nn.sigmoid)
        return f

    def _latent_loss(self, z_mean, z_log_var):
        """Constructs the latent loss.

        Args:
            z_mean(tf.Tensor): Tensor of dimension (None, _nlatent)
            z_log_var(tf.Tensor): Tensor of dimension (None, _nlatent)

        Returns:
            latent_loss(tf.Tensor): A scalar Tensor of dimension ()
                containing the latent loss.
        """
        ####### Implementation Here ######
        latent_loss = -1+tf.add(-z_log_var, tf.add(tf.square(z_mean), tf.exp(z_log_var)))
        latent_loss = 0.5*tf.reduce_sum(latent_loss,1)
        return tf.reduce_mean(latent_loss)

    def _reconstruction_loss(self, f, x_gt):
        """Constructs the reconstruction loss, assuming Gaussian distribution.

        Args:
            f(tf.Tensor): Predicted score for each example, dimension (None,
                _ndims).
            x_gt(tf.Tensor): Ground truth for each example, dimension (None,
                _ndims).
        Returns:
            recon_loss(tf.Tensor): A scalar Tensor for dimension ()
                containing the reconstruction loss.
        """
        ####### Implementation Here ######
        recon_loss = tf.reduce_sum(tf.square(tf.subtract(f, x_gt)), 1)
        
        return tf.reduce_mean(recon_loss)

    def loss(self, f, x_gt, z_mean, z_var):
        """Computes the total loss.

        Computes the sum of latent and reconstruction loss.

        Args:
            f (tf.Tensor): Decoded image for each example, dimension (None,
                _ndims).
            x_gt (tf.Tensor): Ground truth for each example, dimension (None,
                _ndims)
            z_mean (tf.Tensor): The latent mean,
                tensor of dimension (None, _nlatent)
            z_log_var (tf.Tensor): The latent log variance,
                tensor of dimension (None, _nlatent)

        Returns:
            total_loss: Tensor for dimension (). Sum of
                latent_loss and reconstruction loss.
        """
        ####### Implementation Here ######
        total_loss = tf.add(self._latent_loss(z_mean, z_var), self._reconstruction_loss(f, x_gt))
        return total_loss

    def update_op(self, loss, learning_rate):
        """Creates the update optimizer.

        Use tf.train.AdamOptimizer to obtain the update op.

        Args:
            loss(tf.Tensor): Tensor of shape () containing the loss function.
            learning_rate(tf.Tensor): Tensor of shape (). Learning rate for
                gradient descent.
        Returns:
            train_op(tf.Operation): Update opt tensorflow operation.
        """
        ####### Implementation Here ######
        train_op = tf.train.AdamOptimizer(learning_rate,name='Adam').minimize(loss)
        return train_op

    def generate_samples(self, z_np):
        """Generates random samples from the provided z_np.

        Args:
            z_np(numpy.ndarray): Numpy array of dimension
                (batch_size, _nlatent).

        Returns:
            out(numpy.ndarray): The sampled images (numpy.ndarray) of
                dimension (batch_size, _ndims).
        """
        ####### Implementation Here ######
        out = self.session.run(self.outputs_tensor, feed_dict={self.z: z_np})
        return np.array(out)



In [11]:
"""CS446 2018 Spring MP10.
   Implementation of a variational autoencoder for image generation.
"""

import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
#from vae import VariationalAutoencoder
from tensorflow.contrib.learn.python.learn.datasets.mnist import read_data_sets


def train(model, mnist_dataset, learning_rate=0.0005, batch_size=16,
          num_steps=5000):
    """Implements the training loop of mini-batch gradient descent.

    Performs mini-batch gradient descent with the indicated batch_size and
    learning_rate. (**Do not modify this function.**)

    Args:
        model(VariationalAutoencoder): Initialized VAE model.
        mnist_dataset: Mnist dataset.
        learning_rate(float): Learning rate.
        batch_size(int): Batch size used for training.
        num_steps(int): Number of steps to run the update ops.
    """
    for step in range(0, num_steps):
        batch_x, _ = mnist_dataset.train.next_batch(batch_size)
        model.session.run(model.update_op_tensor,feed_dict={model.x_placeholder: batch_x,model.learning_rate_placeholder: learning_rate})
        #print(model.session.run(model.loss_tensor,feed_dict={model.x_placeholder: batch_x}))
mnist_dataset = read_data_sets('MNIST_data', one_hot=True)

    # Build model.
model = VariationalAutoencoder()

    # Start training
train(model, mnist_dataset)

    # Plot out latent space, for +/- 3 std.
std = 1
x_z = np.linspace(-3*std, 3*std, 20)
y_z = np.linspace(-3*std, 3*std, 20)

out = np.empty((28*20, 28*20))
for x_idx, x in enumerate(x_z):
    for y_idx, y in enumerate(y_z):
        z_mu = np.array([[y, x]])
        img = model.generate_samples(z_mu)
        out[x_idx*28:(x_idx+1)*28,y_idx*28:(y_idx+1)*28] = img[0].reshape(28, 28)
plt.imsave('latent_space_vae.png', out, cmap="gray")

#f __name__ == "__main__":
#   tf.app.run()
print("Done")


Extracting MNIST_data\train-images-idx3-ubyte.gz
Extracting MNIST_data\train-labels-idx1-ubyte.gz
Extracting MNIST_data\t10k-images-idx3-ubyte.gz
Extracting MNIST_data\t10k-labels-idx1-ubyte.gz
Done


In [7]:
print(z_mu)

[[ 3.  3.]]
