In [1]:
import tensorflow as tf
import numpy as np
from tensorflow.examples.tutorials.mnist import input_data
import matplotlib.pyplot as plt
import time

In [2]:
mnist = input_data.read_data_sets('MNIST_data', one_hot=True)

Extracting MNIST_data\train-images-idx3-ubyte.gz
Extracting MNIST_data\train-labels-idx1-ubyte.gz
Extracting MNIST_data\t10k-images-idx3-ubyte.gz
Extracting MNIST_data\t10k-labels-idx1-ubyte.gz


In [21]:
class VariationalAutoencoder(object):
    def __init__(self, name,
                 n_inputs=784,
                 n_neurons_encoder = [2048, 256],
                 n_latent=2,
                 n_neurons_decoder = [256, 2048],
                 batch_size = 128,
                 activation = tf.nn.tanh):
        
        self.name = name
        
        # ATTRIBUTES
        self.N = n_inputs
        self.n_encoder = n_neurons_encoder
        self.n_decoder = n_neurons_decoder
        self.n_latent = n_latent
        self.length_encoder = len(n_neurons_encoder)
        self.length_decoder = len(n_neurons_decoder)
        self.layers = self.length_encoder + self.length_decoder + 1 
        self.activ = activation
        
        ## DATA PLACEHOLDERS (BATCHES)
        with tf.name_scope('input'):
            self.X = tf.placeholder(tf.float32, shape=[None, self.N], name='x-input')
            
        with tf.name_scope('input_reshape'):
            image_shaped_input = tf.reshape(self.X, [-1, 28, 28, 1])
            tf.summary.image('input', image_shaped_input, 10)
        
        # INITIALIZE WEIGHTS & BIASES
        self.W_enc, self.W_z_mu, self.W_z_log_sigma, self.W_dec = self.initialize_W()
        self.b_enc, self.b_z_mu, self.b_z_log_sigma, self.b_dec = self.initialize_b()
        
        for w, b in zip(self.W_enc, self.b_enc):
            print(w.get_shape(), b.get_shape())
          
        print(self.W_z_mu.get_shape(), self.b_z_mu.get_shape())
        print(self.W_z_log_sigma.get_shape(), self.b_z_log_sigma.get_shape())
        
        for w, b in zip(self.W_dec, self.b_dec):
            print(w.get_shape(), b.get_shape())
            
        ## COMPUTATIONAL GRAPH
        self.Y, self.z_mu, self.z_log_sigma = self.feedforward(self.X)
        self.loss, self.kl, self.ell = self.get_nelbo(self.z_mu, self.z_log_o, self.X, self.Y)

        ## Initialize the session
        self.session = tf.InteractiveSession()
    
    def variable_summaries(self, var):
        """Attach a lot of summaries to a Tensor (for TensorBoard visualization)."""
        with tf.name_scope('summaries'):
            mean = tf.reduce_mean(var)
            tf.summary.scalar('mean', mean)
            with tf.name_scope('stddev'):
                stddev = tf.sqrt(tf.reduce_mean(tf.square(var - mean)))
                tf.summary.scalar('stddev', stddev)
                tf.summary.scalar('max', tf.reduce_max(var))
                tf.summary.scalar('min', tf.reduce_min(var))
                tf.summary.histogram('histogram', var)

    ## ---------------------------------------------------------------------            
    ## --------------- TF WEIGHTS & BIASES INITIALIZATION ------------------
    ## ---------------------------------------------------------------------
    
    def initialize_W(self):
        """
        Define all the weights for the network.
        We initialize them to standard normal iid using Xavier Initializer
        """
        
        W_encoder = []
        W_latent_mu = []
        W_latent_log_sigma = []
        W_decoder = []
        
        with tf.name_scope("Encoder_layer_weights"):
            W_encoder.append(w_xavier(shape=[self.N, self.n_encoder[0]]))
            for i in range(1, self.length_encoder):
                W_encoder.append(w_xavier(shape=[self.n_encoder[i-1], self.n_encoder[i]]))
        
        with tf.name_scope("Latent_layer_weights"):
            W_latent_mu = w_xavier(shape =[self.n_encoder[-1], self.n_latent]) 
            W_latent_log_sigma = w_xavier(shape =[self.n_encoder[-1], self.n_latent])      
            
        with tf.name_scope("Decoder_layer_weights"):
            W_decoder.append(w_xavier(shape=[self.n_latent, self.n_decoder[0]]))
            for i in range(2, self.length_encoder):
                W_decoder.append(w_xavier(shape=[self.n_decoder[i-1], self.n_decoder[i]]))
            W_decoder.append(w_xavier(shape=[self.n_decoder[-1], self.N]))
        
        return W_encoder, W_latent_mu, W_latent_log_sigma, W_decoder

    
    def initialize_b(self):
        """
        Define all the biases for the network.
        We initialize them to standard normal iid using Xavier Initializer
        """
        
        b_encoder = []
        b_latent_mu = []
        b_latent_log_sigma = []
        b_decoder = []
        
        with tf.name_scope("Encoder_layer_biases"):
            b_encoder.append(w_xavier(shape=[self.n_encoder[0]]))
            for i in range(1, self.length_encoder):
                b_encoder.append(w_xavier(shape=[self.n_encoder[i]]))
        
        with tf.name_scope("Latent_layer_biases"):
            b_latent_mu = w_xavier(shape =[self.n_latent]) 
            b_latent_log_sigma = w_xavier(shape =[self.n_latent])      
            
        with tf.name_scope("Decoder_layer_biases"):
            b_decoder.append(w_xavier(shape=[self.n_decoder[0]]))
            for i in range(2, self.length_encoder):
                b_decoder.append(w_xavier(shape=[self.n_decoder[i]]))
            b_decoder.append(w_xavier(shape=[self.N]))
            
        return b_encoder, b_latent_mu, b_latent_log_sigma, b_decoder

    
    ## ---------------------------------------------------------------------            
    ## ----------------- SAMPLING FROM THE LATENT SPACE --------------------
    ## ---------------------------------------------------------------------
    
    def get_samples(self, d_in, d_out):
        """
        Sample from noise distribution p(eps) ~ N(0, 1)
        a matrix of samples of dimension [d_in, d_out].
        """
        return tf.random_normal(shape=[d_in, d_out])

    def sample_from_Z(self, z_mu, z_log_o):
        """
        Samples from the posterior of the variational latent space.
        We draw samples using the reparameterization trick.
        """
        return z_mu + tf.exp(z_log_o) * self.get_samples(tf.shape(self.X)[0], self.n_latent)
    
        
        
    ## ---------------------------------------------------------------------            
    ## --------------------------- FEEDFORWARD -----------------------------
    ## ---------------------------------------------------------------------
    
    
    def feedforward(self, X):
        """
        Feedforward pass excluding last layer's transfer function.
        intermediate : index of intermediate layer for output generation
        """
        net = X

        # ENCODER
        for i in range(self.length_encoder):
            net = self.activ(tf.matmul(net, self.W_enc[i]) + self.b_enc[i])
        
        # LATENT
        z_mu = tf.matmul(net, self.W_z_mu) + self.b_z_mu
        z_log_sigma = 0.5 * (tf.matmul(net, self.W_z_log_sigma) + self.b_z_log_sigma)

        # Sample from posterior
        z = self.sample_from_Z(z_mu, z_log_sigma)

        # DECODER
        net = self.activ(tf.matmul(z, self.W_dec[0]) + self.b_dec[0])
        for i in range(1, self.length_decoder-1):
            net = self.activ(tf.matmul(net, self.W_dec[i]) + self.b_dec[i])
        
        Y = tf.nn.softmax(tf.matmul(net, self.W_dec[-1]) + self.b_dec[-1])        
                
        return Y, z_mu, z_log_sigma
    
    
    ## ---------------------------------------------------------------------            
    ## ------------------- LOSS: EXPECTED LOWER BOUND ----------------------
    ## ---------------------------------------------------------------------    
    
    def get_ell(self, x, y):
        """
        Returns the expected log-likelihood of the lower bound.
        For this we use a bernouilli LL.
        """
        # p(x|z)        
        return -tf.reduce_sum(x * tf.log(y + 1e-10) + (1 - x) * tf.log(1 - y + 1e-10), 1)

    
    def get_kl(self, z_mu, z_log_o):
        """
        d_kl(q(z|x)||p(z)) returns the KL-divergence between the prior p and the variational posterior q.
        :return: KL divergence between q and p
        """        
        # Formula: 0.5 * sum(1 + log(sigma^2) - mu^2 - sigma^2)
        return 0.5 * tf.reduce_sum( 1.0 + 2.0 * z_log_o - tf.square(z_mu) - tf.exp(2.0 * z_log_o), 1)
        
        
    def get_nelbo(self, z_mu, z_log_o, x, y):
        """ Returns the negative ELBOW, which allows us to minimize instead of maximize. """
        kl = self.get_kl(z_mu, z_log_o)
        ell = self.get_ell(x, y)
        nelbo = tf.reduce_mean(kl + ell)
        return nelbo, kl, ell
    
    
    ## ---------------------------------------------------------------------            
    ## --------------------------- LEARNING --------------------------------
    ## ---------------------------------------------------------------------  
    
    def learn(self, learning_rate=0.001, batch_size=128,
                    epochs=100, display_step=1):
        """ Our learning procedure """
        optimizer = tf.train.AdamOptimizer(learning_rate)

        ## Set all_variables to contain the complete set of TF variables to optimize
        all_variables = tf.trainable_variables()

        ## Define the optimizer
        train_step = optimizer.minimize(self.loss, var_list=all_variables)

        tf.summary.scalar('negative_elbo', self.loss)
        tf.summary.scalar('kl_div', self.kl)
        tf.summary.scalar('ell', self.ell)
        
        merged = tf.summary.merge_all()
        
        train_writer = tf.summary.FileWriter('logs/train', self.session.graph)
        test_writer = tf.summary.FileWriter('logs/test')        
        
        ## Initialize all variables
        self.session.run(tf.global_variables_initializer())
        
        # Initial model print
        print("*MODEL [", self.name,"] {l_r: %.4f; n_iter: %d; batch: %d}"%\
              (self.l_r, training_epochs, batch_size))
        print (" -> START Training!")
        
        t = time.time()
        total_batch = int(mnist.train.num_examples/batch_size)
        
        for epoch in range(epochs):
            train_cost = 0.
            for batch_i in range(total_batch):
                batch_xs, _ = mnist.train.next_batch(batch_size)

                _, loss, summary = self.session.run([train_step, self.loss, merged],
                                    feed_dict={self.X: batch_xs, self.Y: batch_xs})
                
                train_writer.add_summary(summary, epoch * total_batch + batc_i)
                train_cost += loss / total_batch
            
            print("   [%.1f] Epoch: %02d | NELBO: %.6f"%(time.time()-t,
                                                         epoch, train_cost))
        
        print (" -> Training FINISHED in %.1f seconds."%(time.time()-t))
        self.serialize('model/')
        train_writer.close()
        test_writer.close()
        
    def benchmark(self, validation=False, batch_size = 128):
        if validation:
            benchmark_data = mnist.validation
            title = 'Validation loss:'
        else:
            benchmark_data = mnist.test
            title = 'Test loss:'
        
        total_batch = benchmark_data.num_examples // batch_size
        cost = 0
        for batch_i in range(total_batch):
            batch_xs, _ = benchmark_data.next_batch(batch_size)
            c = self.session.run(self.loss,
                   feed_dict={self.X: batch_xs, self.Y: batch_xs})
            cost+= c/total_batch
        print(title, cost)
        return cost
        
    def serialize(self, path):
        saver = tf.train.Saver()
        save_path = saver.save(self.session, path)
        print("Model saved in file: %s" % save_path+self.name)
        
    def restore(self, path):
        saver = tf.train.Saver()   
        sess = tf.InteractiveSession()
        saver.restore(sess, save_path=path)
        self.session = sess
    
    def encode(self, input_vector):
        _, z = self.session.run(self.feedforward,
                                feed_dict={self.X: input_vector})
        return z

In [5]:
# %%
def w_xavier(shape):
    initial = tf.random_normal(shape, mean=0.0,
                               stddev=tf.sqrt(3./sum(shape)))
    return tf.Variable(initial)


# %%
def b_xavier(shape):
    initial = tf.random_normal(shape, mean=0.0,
                               stddev=sqrt(3./sum(shape)))
    return tf.Variable(initial)

In [22]:
VA =  VariationalAutoencoder('VA_1',
                 n_inputs=784,
                 n_neurons_encoder = [256, 128],
                 n_latent=2,
                 n_neurons_decoder = [128, 256],
                 batch_size = 128)

(784, 256) (256,)
(256, 128) (128,)
(128, 2) (2,)
(128, 2) (2,)
(2, 128) (128,)
(256, 784) (784,)


ValueError: Dimensions must be equal, but are 128 and 256 for 'MatMul_35' (op: 'MatMul') with input shapes: [?,128], [256,784].

In [4]:
# %%
def weight_variable(shape):
    initial = tf.random_normal(shape, mean=0.0, stddev=0.01)
    return tf.Variable(initial)


# %%
def bias_variable(shape):
    initial = tf.random_normal(shape, mean=0.0, stddev=0.01)
    return tf.Variable(initial)

In [5]:
"""Variational autoencoder with 2 layer fully-connected
encoder/decoders and gaussian noise distribution.
"""

# %%
def VAE(input_shape=[None, 784],
        n_components_encoder=2048,
        n_components_decoder=2048,
        n_hidden=2,
        debug=False):
    # %%
    # Input placeholder
    if debug:
        input_shape = [50, 784]
        x = tf.Variable(np.zeros((input_shape), dtype=np.float32))
    else:
        x = tf.placeholder(tf.float32, input_shape)

    activation = tf.nn.softplus

    dims = x.get_shape().as_list()
    n_features = dims[1]

    W_enc1 = weight_variable([n_features, n_components_encoder])
    b_enc1 = bias_variable([n_components_encoder])
    h_enc1 = activation(tf.matmul(x, W_enc1) + b_enc1)

    W_enc2 = weight_variable([n_components_encoder, n_components_encoder])
    b_enc2 = bias_variable([n_components_encoder])
    h_enc2 = activation(tf.matmul(h_enc1, W_enc2) + b_enc2)

    W_enc3 = weight_variable([n_components_encoder, n_components_encoder])
    b_enc3 = bias_variable([n_components_encoder])
    h_enc3 = activation(tf.matmul(h_enc2, W_enc3) + b_enc3)

    W_mu = weight_variable([n_components_encoder, n_hidden])
    b_mu = bias_variable([n_hidden])

    W_log_sigma = weight_variable([n_components_encoder, n_hidden])
    b_log_sigma = bias_variable([n_hidden])

    z_mu = tf.matmul(h_enc3, W_mu) + b_mu
    z_log_sigma = 0.5 * (tf.matmul(h_enc3, W_log_sigma) + b_log_sigma)

    # %%
    # Sample from noise distribution p(eps) ~ N(0, 1)
    if debug:
        epsilon = tf.random_normal(
            [dims[0], n_hidden])
    else:
        epsilon = tf.random_normal(
            tf.stack([tf.shape(x)[0], n_hidden]))

    # Sample from posterior
    z = z_mu + tf.exp(z_log_sigma) * epsilon

    W_dec1 = weight_variable([n_hidden, n_components_decoder])
    b_dec1 = bias_variable([n_components_decoder])
    h_dec1 = activation(tf.matmul(z, W_dec1) + b_dec1)

    W_dec2 = weight_variable([n_components_decoder, n_components_decoder])
    b_dec2 = bias_variable([n_components_decoder])
    h_dec2 = activation(tf.matmul(h_dec1, W_dec2) + b_dec2)

    W_dec3 = weight_variable([n_components_decoder, n_components_decoder])
    b_dec3 = bias_variable([n_components_decoder])
    h_dec3 = activation(tf.matmul(h_dec2, W_dec3) + b_dec3)

    W_mu_dec = weight_variable([n_components_decoder, n_features])
    b_mu_dec = bias_variable([n_features])
    y = tf.nn.sigmoid(tf.matmul(h_dec3, W_mu_dec) + b_mu_dec)

    # p(x|z)
    log_px_given_z = -tf.reduce_sum(
        x * tf.log(y + 1e-10) +
        (1 - x) * tf.log(1 - y + 1e-10), 1)

    # d_kl(q(z|x)||p(z))
    # Appendix B: 0.5 * sum(1 + log(sigma^2) - mu^2 - sigma^2)
    kl_div = -0.5 * tf.reduce_sum(
        1.0 + 2.0 * z_log_sigma - tf.square(z_mu) - tf.exp(2.0 * z_log_sigma),
        1)
    loss = tf.reduce_mean(log_px_given_z + kl_div)

    return {'cost': loss, 'x': x, 'z': z, 'y': y}



In [7]:

# %%
def test_mnist():
    """Summary
    Returns
    -------
    name : TYPE
        Description
    """
    # %%


    # %%
    # load MNIST as before
    ae = VAE()

    # %%
    learning_rate = 0.001
    optimizer = tf.train.AdamOptimizer(learning_rate).minimize(ae['cost'])

    # %%
    # We create a session to use the graph
    sess = tf.Session()
    sess.run(tf.global_variables_initializer())

    # %%
    # Fit all training data
    t_i = 0
    batch_size = 100
    n_epochs = 50
    n_examples = 20
    test_xs, _ = mnist.test.next_batch(n_examples)
    xs, ys = mnist.test.images, mnist.test.labels
    fig_manifold, ax_manifold = plt.subplots(1, 1)
    fig_reconstruction, axs_reconstruction = plt.subplots(2, n_examples, figsize=(10, 2))
    fig_image_manifold, ax_image_manifold = plt.subplots(1, 1)
    for epoch_i in range(n_epochs):
        print('--- Epoch', epoch_i)
        train_cost = 0
        for batch_i in range(mnist.train.num_examples // batch_size):
            batch_xs, _ = mnist.train.next_batch(batch_size)
            train_cost += sess.run([ae['cost'], optimizer],
                                   feed_dict={ae['x']: batch_xs})[0]
            if batch_i == 1:
                # %%
                # Plot example reconstructions from latent layer
                imgs = []
                for img_i in np.linspace(-3, 3, n_examples):
                    for img_j in np.linspace(-3, 3, n_examples):
                        z = np.array([[img_i, img_j]], dtype=np.float32)
                        recon = sess.run(ae['y'], feed_dict={ae['z']: z})
                        imgs.append(np.reshape(recon, (1, 28, 28, 1)))
                imgs_cat = np.concatenate(imgs)
                ax_manifold.imshow(montage_batch(imgs_cat))
                fig_manifold.savefig('manifold_%08d.png' % t_i)

                # %%
                # Plot example reconstructions
                recon = sess.run(ae['y'], feed_dict={ae['x']: test_xs})
                print(recon.shape)
                for example_i in range(n_examples):
                    axs_reconstruction[0][example_i].imshow(
                        np.reshape(test_xs[example_i, :], (28, 28)),
                        cmap='gray')
                    axs_reconstruction[1][example_i].imshow(
                        np.reshape(
                            np.reshape(recon[example_i, ...], (784,)),
                            (28, 28)),
                        cmap='gray')
                    axs_reconstruction[0][example_i].axis('off')
                    axs_reconstruction[1][example_i].axis('off')
                fig_reconstruction.savefig('reconstruction_%08d.png' % t_i)

                # %%
                # Plot manifold of latent layer
                zs = sess.run(ae['z'], feed_dict={ae['x']: xs})
                ax_image_manifold.clear()
                ax_image_manifold.scatter(zs[:, 0], zs[:, 1],
                    c=np.argmax(ys, 1), alpha=0.2)
                ax_image_manifold.set_xlim([-6, 6])
                ax_image_manifold.set_ylim([-6, 6])
                ax_image_manifold.axis('off')
                fig_image_manifold.savefig('image_manifold_%08d.png' % t_i)

                t_i += 1


        print('Train cost:', train_cost /
              (mnist.train.num_examples // batch_size))

        valid_cost = 0
        for batch_i in range(mnist.validation.num_examples // batch_size):
            batch_xs, _ = mnist.validation.next_batch(batch_size)
            valid_cost += sess.run([ae['cost']],
                                   feed_dict={ae['x']: batch_xs})[0]
        print('Validation cost:', valid_cost /
              (mnist.validation.num_examples // batch_size))

In [None]:
def montage_batch(images):
    """Draws all filters (n_input * n_output filters) as a
    montage image separated by 1 pixel borders.
    Parameters
    ----------
    batch : numpy.ndarray
        Input array to create montage of.
    Returns
    -------
    m : numpy.ndarray
        Montage image.
    """
    img_h = images.shape[1]
    img_w = images.shape[2]
    n_plots = int(np.ceil(np.sqrt(images.shape[0])))
    m = np.ones(
        (images.shape[1] * n_plots + n_plots + 1,
         images.shape[2] * n_plots + n_plots + 1, 3)) * 0.5

    for i in range(n_plots):
        for j in range(n_plots):
            this_filter = i * n_plots + j
            if this_filter < images.shape[0]:
                this_img = images[this_filter, ...]
                m[1 + i + i * img_h:1 + i + (i + 1) * img_h,
                  1 + j + j * img_w:1 + j + (j + 1) * img_w, :] = this_img
    return m
