In [6]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import norm
from keras.layers import Input, Dense, Lambda, Add
from keras.models import Model
from keras import backend as K
from keras import metrics
from keras.datasets import mnist
from distributions import *

In [7]:
batch_size = 100
original_dim = 784
latent_dim = 2
intermediate_dim = 256
epochs = 50
epsilon_std = 1.0

## Encoder

In [8]:
x = Input(batch_shape=(batch_size, original_dim))
h = Dense(intermediate_dim, activation='relu')(x)
z_mean_0 = Dense(latent_dim)(h)
z_std_0 = Dense(latent_dim, activation="softplus")(h)
hp = Dense(latent_dim)(h)
encoder = Model(x, [z_mean_0,z_std_0, hp])

## Latent models

In [None]:
def _mask_matrix_made(dim):
    """
    see https://arxiv.org/pdf/1502.03509.pdf
    Section 4 explains how to define the masks
    """
    mask_vector = np.random.randint(1, dim, dim)
    mask_matrix0 = np.fromfunction(lambda k, d: mask_vector[k] >= d, (dim, dim), dtype=int).astype(np.int32).astype(np.float32)
    mask_matrix1 = np.fromfunction(lambda d, k: d > mask_vector[k], (dim, dim), dtype=int).astype(np.int32).astype(np.float32)
    return mask_matrix0, mask_matrix1


def MADE(mask_matrix0, mask_matrix1, latent_dim, activation=None):
    """
    2-layered MADE model
    see https://arxiv.org/pdf/1502.03509.pdf
    """
    def f(x):
        hl = MaskedDense(latent_dim, mask=mask_matrix0)(x)
        hl = PReLU()(hl)
        hl = MaskedDense(latent_dim, mask=mask_matrix1, activation=activation)(hl)
        return hl
    return f

In [9]:
n_latent = 10
latent_models = []

masks = [_mask_matrix_made(latent_dim) for k in range(n_latent)]
    
for k in range(n_latent):
    latent_input = Input(shape=(latent_dim,), batch_shape=(batch_size, latent_dim))

    mask0, mask1 = masks[k]
    mean = MADE(mask0, mask1, latent_dim)(latent_input)
    std = MADE(mask0, mask1, latent_dim, activation="sigmoid")(latent_input)

    latent_model = Model(latent_input, [mean, std])
    latent_models.append(latent_model)

## Decoder

In [None]:
def sample_eps(batch_size, latent_dim, epsilon_std):
    """Create a function to sample N(0, epsilon_std) vectors"""
    return lambda args: K.random_normal(shape=(batch_size, latent_dim),
                                        mean=0.,
                                        stddev=epsilon_std)
def sample_z0(args):
    """Sample from N(mu, sigma) where sigma is the stddev !!!"""
    z_mean, z_std, epsilon = args
    z0 = z_mean + K.exp(K.log(z_std + 1e-8)) * epsilon
    return z0
def iaf_transform_z(args):
    """Apply the IAF transform to input z"""
    z, mean, std = args
    z_ = z
    z_ *= std
    z_ += (1 - std) * mean
    return z_

In [10]:
eps = Lambda(sample_eps(batch_size, latent_dim, epsilon_std), name='sample_eps')([z_mean_0, z_std_0])
z0 = Lambda(sample_z0, name='sample_z0')([z_mean_0, z_std_0, eps])

z_means = [z_mean_0]
z_stds = [z_std_0]
zs = [z0]
for latent_model in latent_models:
    zz = Add()([Dense(latent_dim, activation='relu')(hp), zs[-1]])
    z_mean, z_std = latent_model(zz)
    z_means.append(z_mean)
    z_stds.append(z_std)
    z = Lambda(iaf_transform_z)([zs[-1], z_mean, z_std])
    zs.append(z)
z = zs[-1]

# we instantiate these layers separately so as to reuse them later
decoder_h = Dense(intermediate_dim, activation='relu')
decoder_mean = Dense(original_dim, activation='sigmoid')
h_decoded = decoder_h(z)
x_decoded_mean = decoder_mean(h_decoded)

## VAE

In [None]:
def log_stdnormal(x):
    """log density of a standard gaussian"""
    c = - 0.5 * math.log(2*math.pi)
    result = c - K.square(x) / 2
    return result

def log_normal2(x, mean, log_var):
    """log density of N(mu, sigma)"""
    c = - 0.5 * math.log(2*math.pi)
    result = c - log_var/2 - K.square(x - mean) / (2 * K.exp(log_var) + 1e-8)
    return result

In [20]:
n_sample = 20
def vae_loss(x, x_decoded_mean):
    xent_loss = original_dim * metrics.binary_crossentropy(x, x_decoded_mean)
    # kl divergence
    # sampling for estimating the expectations
    for k in range(n_sample):
        epsilon = K.random_normal(shape=(batch_size, latent_dim), mean=0.,
                              stddev=1.0)  # used for every z_i sampling
        z0_ = z_mean_0 + K.exp(K.log(z_std_0+1e-8)) * epsilon
        z_ = z0_
        for z_mean, z_std in zip(z_means[1:], z_stds[1:]):
            z_ = iaf_transform_z([z_, z_mean, z_std])

        try:
            loss += K.sum(log_normal2(z0_, z_mean_0, 2*K.log(z_std_0+1e-8)), -1)
        except NameError:
            loss = K.sum(log_normal2(z0_, z_mean_0, 2*K.log(z_std_0+1e-8)), -1)
        loss -= K.sum(log_stdnormal(z_) , -1)
    # don't forget the log_std_sum.
    # BE CAUTIOUS!! THE LOG_STD_SUM_0 HAS ALREADY BEEN TAKEN INTO ACCOUNT IN LOSS
    kl_loss = loss / n_sample 
    for z_std in z_stds[1:]:
        kl_loss -= K.sum(K.log(1e-8+z_std), -1)    
    
    return xent_loss + kl_loss

In [21]:
vae = Model(x, x_decoded_mean)
vae.compile(optimizer='rmsprop', loss=vae_loss)

(<tf.Tensor 'dense_20_target_2:0' shape=(?, ?) dtype=float32>, <tf.Tensor 'dense_20/Sigmoid:0' shape=(100, 784) dtype=float32>)
(100,)


In [19]:
# train the VAE on MNIST digits
(x_train, y_train), (x_test, y_test) = mnist.load_data()

x_train = x_train.astype('float32') / 255.
x_test = x_test.astype('float32') / 255.
x_train = x_train.reshape((len(x_train), np.prod(x_train.shape[1:])))
x_test = x_test.reshape((len(x_test), np.prod(x_test.shape[1:])))

In [14]:
x_train.shape

(60000, 784)

In [15]:
from keras.callbacks import TensorBoard

tb = TensorBoard(log_dir='./log/MNIST/10_layer-additional_var')

In [16]:
vae.fit(x_train, x_train,
        shuffle=True,
        verbose=0,
        callbacks=[tb],
        epochs=epochs,
        batch_size=batch_size,
        validation_data=(x_test, x_test))

<keras.callbacks.History at 0x7f622eb52ed0>

## Play with the trained model

In [None]:
# build a model to project inputs on the latent space
encoder = Model(x, z_mean)

In [None]:
# display a 2D plot of the digit classes in the latent space
x_test_encoded = encoder.predict(x_test, batch_size=batch_size)
plt.figure(figsize=(6, 6))
plt.scatter(x_test_encoded[:, 0], x_test_encoded[:, 1], c=y_test)
plt.colorbar()
plt.show()

# build a digit generator that can sample from the learned distribution
decoder_input = Input(shape=(latent_dim,))
_h_decoded = decoder_h(decoder_input)
_x_decoded_mean = decoder_mean(_h_decoded)
generator = Model(decoder_input, _x_decoded_mean)

# display a 2D manifold of the digits
n = 15  # figure with 15x15 digits
digit_size = 28
figure = np.zeros((digit_size * n, digit_size * n))
# linearly spaced coordinates on the unit square were transformed through the inverse CDF (ppf) of the Gaussian
# to produce values of the latent variables z, since the prior of the latent space is Gaussian
grid_x = norm.ppf(np.linspace(0.05, 0.95, n))
grid_y = norm.ppf(np.linspace(0.05, 0.95, n))

for i, yi in enumerate(grid_x):
    for j, xi in enumerate(grid_y):
        z_sample = np.array([[xi, yi]])
        x_decoded = generator.predict(z_sample)
        digit = x_decoded[0].reshape(digit_size, digit_size)
        figure[i * digit_size: (i + 1) * digit_size,
               j * digit_size: (j + 1) * digit_size] = digit

plt.figure(figsize=(10, 10))
plt.imshow(figure, cmap='Greys_r')
plt.show()