# A simple MLP VAE in TensorFlow

In [1]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime
import time

In [2]:
# X_train = np.load('total_numpys/total_spectrograms.npy')
X_train = np.load('spectrograms_fold_1.npy')
X_val = np.load('spectrograms_fold_2.npy')
# y_train = np.load('total_numpys/total_labels.npy')
y_train = np.load('labels_fold_1.npy')
y_val = np.load('labels_fold_2.npy')

n_train_samples = X_train.shape[0]
n_valid_samples = X_val.shape[0]
n_classes = 10
y_train_one_hot = np.eye(n_classes)[y_train]
y_valid_one_hot = np.eye(n_classes)[y_val]


print('Dataset sizes:')
print('X_train:', X_train.shape)
print('X_val:', X_val.shape)
print('y_train:', y_train_one_hot.shape)
print('y_val:', y_valid_one_hot.shape)

Dataset sizes:
X_train: (873, 257, 251, 1)
X_val: (888, 257, 251, 1)
y_train: (873, 10)
y_val: (888, 10)


In [3]:
# Input to the graph 
rows = 257
cols = 251
n_pixels = rows*cols
X = tf.placeholder(tf.float32, shape=([None, n_pixels]))

In [4]:
#layer creation functions
def weight_variable(shape, name):
    initial = tf.truncated_normal(shape, stddev=0.1)
    return tf.Variable(initial, name=name)

def bias_variable(shape, name):
    initial = tf.truncated_normal(shape, stddev=0.1)
    return tf.Variable(initial, name=name)

def FC_layer(X, W, b):
    return tf.matmul(X, W) + b

# Encoder
The encoder consists of a 2 layer, fully connected feed forward network that reduces the dimensionality from the original number of features (e.g. pixel) to the dimensionality of the latent space. This network has two outputs -- the mean and (log) standard deviation of a gaussian distribution.

In [5]:
# encoder
latent_dim = 20
h_dim = 500

W_enc = weight_variable([n_pixels, h_dim], 'W_enc')
b_enc = bias_variable([h_dim], 'b_enc')
# tanh activation function to replicate original model
h_enc = tf.nn.tanh(FC_layer(X, W_enc, b_enc))

W_mu = weight_variable([h_dim, latent_dim], 'W_mu')
b_mu = bias_variable([latent_dim], 'b_mu')
mu = FC_layer(h_enc, W_mu, b_mu)

W_logstd = weight_variable([h_dim, latent_dim], 'W_logstd')
b_logstd = bias_variable([latent_dim], 'b_logstd')
logstd = FC_layer(h_enc, W_logstd, b_logstd)

# reparameterization trick
noise = tf.random_normal([1, latent_dim])
z = mu + tf.multiply(noise, tf.exp(.5*logstd))

# Decoder
The decoder is also a feedforward, fully connected network -- however, it goes from the dimensionality of the latent space back to the original dimensionality of the data (e.g. pixels). The output of the network is squashed between 0 and 1 with a sigmoid function

In [6]:
# decoder
W_dec = weight_variable([latent_dim, h_dim], 'W_dec')
b_dec = bias_variable([h_dim], 'b_dec')
h_dec = tf.nn.tanh(FC_layer(z, W_dec, b_dec))

W_reconstruct = weight_variable([h_dim, n_pixels], 'W_reconstruct')
b_reconstruct = bias_variable([n_pixels], 'b_reconstruct')
reconstruction = tf.nn.sigmoid(FC_layer(h_dec, W_reconstruct, b_reconstruct))

# Loss function
Each of these outputs is taken to be the mean of a Bernoulli distribution (in this example, a Bernoulli distribution is appropriate because our data is binary). The variational lower bound is given by:

$$
\mathcal{L} = \mathbb{E}_{z\sim Q(z|X)}\log P(X|z) - D(Q(z|X)||P(z))
$$
When $P$ is a Bernouli distribution, the log likelihood is given by

$$
\log P(X|z) = \sum_i^N X^{(i)}\log y^{(i)} + (1 − X^{(i)}) \cdot \log(1 − y^{(i)})
$$
where $N$ is the number of training samples (in our case, the batchsize), and $y^{(i)}$ is the reconstruction from the latent code $z^{(i)}$. The KL divergence between a gaussian $Q$ with mean $\mu$ and standard deviation $\sigma$ and a standard normal distribution $P$ is given by:

$$
D(Q||P) = -\frac{1}{2}\sum_j^J \big(1 + \log((\sigma_j)^2) - (\mu_j)^2 - (\sigma_j)^2\big)
$$
We want to maximize this lower bound, but because tensorflow doesn't have a 'maximizing' optimizer, we minimize the negative lower bound.

In [7]:
# variational lower bound

# add epsilon to log to prevent numerical overflow
log_likelihood = tf.reduce_sum(X*tf.log(reconstruction + 1e-9)+(1 - X)*tf.log(1 - reconstruction + 1e-9), 
                               reduction_indices=1)
KL_term = -.5*tf.reduce_sum(1 + 2*logstd - tf.pow(mu,2) - tf.exp(2*logstd), reduction_indices=1)
variational_lower_bound = tf.reduce_mean(log_likelihood - KL_term)
optimizer = tf.train.AdadeltaOptimizer().minimize(-variational_lower_bound)

# Training
Now all we have to do is run the optimizer until convergence.

In [8]:
init = tf.global_variables_initializer()
sess = tf.InteractiveSession()
sess.run(init)
saver = tf.train.Saver()

batch_size = 16
num_iterations = 1000000
recording_interval = 1000
variational_lower_bound_array = []
log_likelihood_array = []
KL_term_array = []
iteration_array = [i*recording_interval for i in range(num_iterations//recording_interval)]

n_train_batches = n_train_samples // batch_size
for i in range(num_iterations):
    for i in range(n_train_batches):
        start = i * batch_size
        end = (i + 1) * batch_size

        x_batch = X_train[start:end]
        
        sess.run(optimizer, feed_dict={X: x_batch})
        if (i%recording_interval == 0):
            vlb_eval = variational_lower_bound.eval(feed_dict={X: x_batch})
            print("Iteration: {}, Loss: {}".format(i, vlb_eval))
            variational_lower_bound_array.append(vlb_eval)
            log_likelihood_array.append(np.mean(log_likelihood.eval(feed_dict={X: x_batch})))
            KL_term_array.append(np.mean(KL_term.eval(feed_dict={X: x_batch})))

ValueError: Cannot feed value of shape (16, 257, 251, 1) for Tensor 'Placeholder:0', which has shape '(?, 64507)'

In [None]:
plt.figure()
plt.plot(iteration_array, variational_lower_bound_array)
plt.plot(iteration_array, KL_term_array)
plt.plot(iteration_array, log_likelihood_array)
plt.legend(['Variational Lower Bound', 'KL divergence', 'Log Likelihood'], bbox_to_anchor=(1.05, 1), loc=2)
plt.title('Loss per iteration')

In [None]:
load_model = False
if load_model:
    saver.restore(sess, os.path.join(os.getcwd(), "Trained Bernoulli VAE"))

num_pairs = 10
image_indices = np.random.randint(0, 200, num_pairs)
for pair in range(num_pairs):
    x = np.reshape(mnist.test.images[image_indices[pair]], (1, n_pixels))
    plt.figure()
    x_image = np.reshape(x, (rows,cols))
    plt.subplot(121)
    plt.imshow(x_image)
    x_reconstruction = reconstruction.eval(feed_dict={X: x})
    x_reconstruction_image = (np.reshape(x_reconstruction, (rows,cols)))
    plt.subplot(122)
    plt.imshow(x_reconstruction_image)