In [None]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
from tensorflow.contrib import autograph as ag
mnist = tf.contrib.learn.datasets.load_dataset("mnist")


# **Defining our input and output data**

MNIST images have a dimension of 28 * 28 pixels with one color channel. Our inputs _Xin will be batches of MNIST characters, while our network will learn to reconstruct them and output them in a placeholder Y, which thus has the same dimensions. _Yflat will be used later, when computing losses. _keepprob will be used when applying dropouts as a means of regularization. During training, it will have a value of 0.8. When generating new data, we won't apply dropout, so the value will be 1. The function lrelu is being defined as tensorflow unfortunately doesn't come up with a predefined leaky ReLU.





In [None]:
tf.reset_default_graph()

batch_size = 64
dec_in_channels = 1
n_latent = 8

X_in = tf.placeholder(dtype=tf.float32, shape=[None, 28, 28], name='X')
Z_in = tf.placeholder(dtype=tf.float32, shape=[None, n_latent], name = 'Z')
keep_prob = tf.placeholder(dtype=tf.float32, shape=(), name='keep_prob')

# Only used in VAE

# Y    = tf.placeholder(dtype=tf.float32, shape=[None, 28, 28], name='Y')
# mean = tf.placeholder(dtype=tf.float32, shape=[None, n_latent], name = 'mean')
# std = tf.placeholder(dtype=tf.float32, shape=[None, n_latent], name = 'std')
# Y_flat = tf.reshape(Y, shape=[-1, 28 * 28])

reshaped_dim = [-1, 7, 7, dec_in_channels]
inputs_decoder = np.floor(7 * 7 * dec_in_channels / 2)

def lrelu(x, alpha=0.3):
    return tf.maximum(x, tf.multiply(x, alpha))


# **Defining the encoder**

As our inputs are images, it's most reasonable to apply some convolutional transformations to them. What's most noteworthy is the fact that we are creating two vectors in our encoder, as the encoder is supposed to create objects following a Gaussian Distribution:

*   A vector of means
*   A vector of standard deviations

You will see later how we "force" the encoder to make sure it really creates values following a Normal Distribution. The returned values that will be fed to the decoder are the z-values. We will need the mean and standard deviation of our distributions later, when computing losses.

In [None]:
def encoder(X_in, keep_prob):
    activation = lrelu
    with tf.variable_scope("encoder", reuse=None):
        X = tf.reshape(X_in, shape=[-1, 28, 28, 1])
        x = tf.layers.conv2d(X, 
                             filters=64, 
                             kernel_size=4, 
                             strides=2, 
                             padding='same', 
                             activation=activation)
        x = tf.nn.dropout(x, keep_prob)
        x = tf.layers.conv2d(x, 
                             filters=64, 
                             kernel_size=4, 
                             strides=2, 
                             padding='same', 
                             activation=activation)
        x = tf.nn.dropout(x, keep_prob)
        x = tf.layers.conv2d(x, 
                             filters=64, 
                             kernel_size=4, 
                             strides=1, 
                             padding='same', 
                             activation=activation)
        x = tf.nn.dropout(x, keep_prob)
        x = tf.contrib.layers.flatten(x)
        
        mean = tf.layers.dense(x, units=n_latent)
        std = 0.5 * tf.layers.dense(x, units=n_latent)            
        epsilon = tf.random_normal(tf.stack([tf.shape(x)[0], n_latent])) 
        z  = mean + tf.multiply(epsilon, tf.exp(std))
        
        return z, mean, std

# **Interpolation**
For any two latent vectors, we can compute the difference between them to find out evenly spaced interpolates. If we assume there exist two latent vectors $L1$ and $L2$, then the interpolate $L_n$ can be computed by:


>  $L_n = L1 + \frac{n}{N}(L2 - L1)$

where $N$ is the total number of spaces and $n$ is the $n_{th}$ segment along the vector $L2-L1$.

# **Defining the decoder**
The decoder does not care about whether the input values are sampled from some specific distribution that has been defined by us. It simply will try to reconstruct the input images. To this end, we use a series of transpose convolutions.

In [None]:
def decoder(Z_in, keep_prob):
    with tf.variable_scope("decoder", reuse=None):
        x = tf.layers.dense(Z_in, units=inputs_decoder, activation=lrelu)
        x = tf.layers.dense(x, units=inputs_decoder * 2 + 1, activation=lrelu)
        x = tf.reshape(x, reshaped_dim)
        x = tf.layers.conv2d_transpose(x, 
                                       filters=64, 
                                       kernel_size=4, 
                                       strides=2, 
                                       padding='same', 
                                       activation=tf.nn.relu)
        x = tf.nn.dropout(x, keep_prob)
        x = tf.layers.conv2d_transpose(x, 
                                       filters=64, 
                                       kernel_size=4, 
                                       strides=1, 
                                       padding='same', 
                                       activation=tf.nn.relu)
        x = tf.nn.dropout(x, keep_prob)
        x = tf.layers.conv2d_transpose(x, 
                                       filters=64, 
                                       kernel_size=4, 
                                       strides=1, 
                                       padding='same', 
                                       activation=tf.nn.relu)       
        x = tf.contrib.layers.flatten(x)
        x = tf.layers.dense(x, units=28*28, activation=tf.nn.sigmoid)
        img = tf.reshape(x, shape=[-1, 28, 28])
        return img

In [None]:
def discriminator(D_in):
    with tf.variable_scope("discriminator", reuse=None):
        x = tf.contrib.layers.flatten(D_in)
        x = tf.layers.dense(x, units=512, activation=activation)
        x = tf.layers.dense(x, units=256, activation=activation)
        d = tf.layers.dense(x, units=1, activation=tf.nn.sigmoid)
        return d

In [None]:
def same(digit, labels):
    idx = np.where(digit == labels)
    rand_idx = np.random.choice(idx[0], 2, False)
    return rand_idx[0], rand_idx[1]

def different(digit, labels):
    idx = np.where(digit == labels)
    rand_idx1 = np.random.choice(idx[0], 1, False)
    idx = np.where(digit != labels)
    rand_idx2 = np.random.choice(idx[0], 1, False)
    return rand_idx1[0], rand_idx2[0]

In [None]:
sampled_z, mn, sd = encoder(X_in, keep_prob)
dec = decoder(Z_in, keep_prob)

In [None]:
# VAE loss

# unreshaped = tf.reshape(dec, [-1, 28*28])
# img_loss = tf.reduce_sum(tf.squared_difference(unreshaped, Y_flat), 1)
# latent_loss = -0.5 * tf.reduce_sum(1.0 + 2.0 * std - tf.square(mean) - tf.exp(2.0 * std), 1)
# loss = tf.reduce_mean(img_loss + latent_loss)
# optimizer = tf.train.AdamOptimizer(0.0005).minimize(loss)
# sess = tf.Session()
# sess.run(tf.global_variables_initializer())

In [None]:
# VAE train

# labels = mnist.test.labels
# epochs = 30000
# for i in range(epochs):
#     batch = [np.reshape(b, [28, 28]) for b in mnist.train.next_batch(batch_size=batch_size)[0]]
#     s_z, mu, sigma = sess.run([sampled_z, mn, sd], feed_dict = {X_in: batch, keep_prob: 0.8})
#     sess.run(optimizer, feed_dict = {Z_in: s_z, mean: mu, std: sigma, Y: batch, keep_prob: 0.8})

In [None]:
# VAE+GAN loss

data_prob = discriminator(X_in)
G_prob = discriminator(dec)
D_loss = tf.reduce_sum(tf.log(data_prob) + tf.log(1-G_prob), 1)
G_loss = tf.reduce_sum(tf.log(G_prob), 1)
D_optimizer = tf.train.AdamOptimizer(0.0005).maximize(D_loss)
G_optimizer = tf.train.AdamOptimizer(0.0005).maximize(G_loss)
sess = tf.Session(tf.ConfigProto().gpu_option.allow_growth=True)
sess.run(tf.global_variables_initializer())

In [None]:
# VAE+GAN train

labels = mnist.test.labels
epochs = 10
for i in range(epochs):
    batch = [np.reshape(b, [28, 28]) for b in mnist.train.next_batch(batch_size=batch_size)[0]]
    s_z, mu, sigma = sess.run([sampled_z, mn, sd], feed_dict = {X_in: batch, keep_prob: 0.8})
    d = sess.run(dec, feed_dict = {Z_in: s_z, mean: mu, std: sigma, Y: batch, keep_prob: 0.8})
    for k in range(3):
        sess.run(D_optimizer, feed_dict = {X_in: batch, Z_in: d, keep_prob: 0.8})
    sess.run(G_optimizer, feed_dict = {Z_in: d, keep_prob = 0.8})

In [None]:
# digit = np.random.choice(np.arange(10), 1, False)
# # interpolate with same digit
# idx1, idx2 = same(digit, labels)
# img1 = np.reshape(mnist.test.images[idx1], (-1, 28, 28))
# img2 = np.reshape(mnist.test.images[idx2], (-1, 28, 28))
# s_z1, mu1, sigma1 = sess.run([sampled_z, mn, sd], feed_dict = {X_in: img1, keep_prob: 1})
# s_z2, mu2, sigma2 = sess.run([sampled_z, mn, sd], feed_dict = {X_in: img2, keep_prob: 1})
# diff = s_z1 - s_z2
# plt.figure()
# plt.imshow(np.reshape(img1, (28, 28)), cmap='gray')
# for j in range(9):
#     s_z = s_z1 + (j/8) * diff
#     d, _ = sess.run([dec,optimizer], feed_dict = {Z_in: s_z, mean: mu1, std: sigma1, Y: img1, keep_prob: 1})
#     plt.figure()
#     plt.imshow(np.reshape(d, (28, 28)), cmap='gray')

# # interpolate with different digit
# idx1, idx2 = different(digit, labels)
# img1 = np.reshape(mnist.test.images[idx1], (-1, 28, 28))
# img2 = np.reshape(mnist.test.images[idx2], (-1, 28, 28))
# s_z1, mu1, sigma1 = sess.run([sampled_z, mn, sd], feed_dict = {X_in: img1, keep_prob: 1})
# s_z2, mu2, sigma2 = sess.run([sampled_z, mn, sd], feed_dict = {X_in: img2, keep_prob: 1})
# diff = s_z1 - s_z2
# plt.figure()
# plt.imshow(np.reshape(img1, (28, 28)), cmap='gray')
# for j in range(9):
#     s_z = s_z1 + (j/8) * diff
#     d, _ = sess.run([dec,optimizer], feed_dict = {Z_in: s_z, mean: mu1, std: sigma1, Y: img1, keep_prob: 1})
#     plt.figure()
#     plt.imshow(np.reshape(d, (28, 28)), cmap='gray')