# Variational Autoencoder in TensorFlow

code directly taken from: https://jmetzen.github.io/2015-11-27/vae.html

In [1]:
import numpy as np
import tensorflow as tf

import matplotlib.pyplot as plt
%matplotlib inline

np.random.seed(0)
tf.set_random_seed(0)


### train only on cpu
import os
os.environ['CUDA_VISIBLE_DEVICES'] = ''

  from ._conv import register_converters as _register_converters


In [2]:
from datetime import datetime
timestamp = datetime.now().strftime("%Y-%m-%d_%H:%M")
print(timestamp, "")

2018-07-29_20:06 


- Wrapper for the dataset

In [3]:
### load dataset

class KDD99Dataset:
    def __init__(self, filename_train, filename_test):
        with np.load(filename_train) as data: 
            self.x_train = data["x_train"]
            
        with np.load(filename_test) as data: 
            self.x_test = data["x_test"]
            
            
    def train_num_examples(self):
        return len(self.x_train)

    def train_next_batch(self,batch_size):
        choices = np.random.choice(len(self.x_train), size=batch_size, replace=False)

        batch_x_train = self.x_train[choices]

        batch_train = (batch_x_train, None)
        return batch_train
    
    
    
    def test_next_batch(self,batch_size):
        choices = np.random.choice(len(self.x_test), size=batch_size, replace=False)

        batch_x_test = self.x_test[choices]
        
        batch_test = (batch_x_test, None) 
        return batch_test
        

In [4]:
kdd99 = KDD99Dataset("../datasets/kddcup/kdd99_train-randomState_None.npz", 
                     "../datasets/kddcup/kdd99_test-randomState_None.npz")

n_samples = kdd99.train_num_examples()

In [5]:
def xavier_init(fan_in, fan_out, constant=1): 
    """ Xavier initialization of network weights"""
    # https://stackoverflow.com/questions/33640581/how-to-do-xavier-initialization-on-tensorflow
    low = -constant*np.sqrt(6.0/(fan_in + fan_out)) 
    high = constant*np.sqrt(6.0/(fan_in + fan_out))
    return tf.random_uniform((fan_in, fan_out), 
                             minval=low, maxval=high, 
                             dtype=tf.float32)

Based on this, we define now a class "VariationalAutoencoder" with a [sklearn](http://scikit-learn.org)-like interface that can be trained incrementally with mini-batches using partial_fit. The trained model can be used to reconstruct unseen input, to generate new samples, and to map inputs to the latent space.

In [6]:
class VariationalAutoencoder(object):
    """ Variation Autoencoder (VAE) with an sklearn-like interface implemented using TensorFlow.
    
    This implementation uses probabilistic encoders and decoders using Gaussian 
    distributions and  realized by multi-layer perceptrons. The VAE can be learned
    end-to-end.
    
    See "Auto-Encoding Variational Bayes" by Kingma and Welling for more details.
    """
    def __init__(self, network_architecture, transfer_fct=tf.nn.softplus, 
                 learning_rate=0.001, batch_size=100):
        self.network_architecture = network_architecture
        self.transfer_fct = transfer_fct
        self.learning_rate = learning_rate
        self.batch_size = batch_size
        
        # tf Graph input
        self.x = tf.placeholder(tf.float32, [None, network_architecture["n_input"]])
        
        # Create autoencoder network
        self._create_network()
        # Define loss function based variational upper-bound and 
        # corresponding optimizer
        self._create_loss_optimizer()
        
        # Initializing the tensor flow variables
        init = tf.global_variables_initializer()

        # Launch the session
        self.sess = tf.InteractiveSession()
        self.sess.run(init)
    
    def _create_network(self):
        # Initialize autoencode network weights and biases
        network_weights = self._initialize_weights(**self.network_architecture)

        # Use recognition network to determine mean and 
        # (log) variance of Gaussian distribution in latent
        # space
        self.z_mean, self.z_log_sigma_sq = \
            self._recognition_network(network_weights["weights_recog"], 
                                      network_weights["biases_recog"])

        # Draw one sample z from Gaussian distribution
        n_z = self.network_architecture["n_z"]
        eps = tf.random_normal((self.batch_size, n_z), 0, 1, 
                               dtype=tf.float32)
        # z = mu + sigma*epsilon
        self.z = tf.add(self.z_mean, 
                        tf.multiply(tf.sqrt(tf.exp(self.z_log_sigma_sq)), eps))

        # Use generator to determine mean of
        # Bernoulli distribution of reconstructed input
        self.x_reconstr_mean = \
            self._generator_network(network_weights["weights_gener"],
                                    network_weights["biases_gener"])
    def save_weights(self, filename = "./" + timestamp + "-tfsave"):  
        ### save model weights
        saver = tf.train.Saver()
        saver.save(self.sess, filename)
        print("saved to:",filename)
        
        
        
    def load_weights(self, filename = "./" + timestamp + "-tfsave.meta"):
        ### load weights for test notebook
        saver = tf.train.import_meta_graph(filename)
        saver.restore(self.sess, tf.train.latest_checkpoint("./"))
            
            
    def _initialize_weights(self, n_hidden_recog_1, n_hidden_recog_2, n_hidden_recog_3, n_hidden_recog_4,
                            n_hidden_gener_1,  n_hidden_gener_2, n_hidden_gener_3, n_hidden_gener_4,
                            n_input, n_z):
        all_weights = dict()
        all_weights['weights_recog'] = {
            'h1': tf.Variable(xavier_init(n_input, n_hidden_recog_1)),
            'h2': tf.Variable(xavier_init(n_hidden_recog_1, n_hidden_recog_2)),
            'h3': tf.Variable(xavier_init(n_hidden_recog_2, n_hidden_recog_3)),
            'h4': tf.Variable(xavier_init(n_hidden_recog_3, n_hidden_recog_4)),
            'out_mean': tf.Variable(xavier_init(n_hidden_recog_4, n_z)),
            'out_log_sigma': tf.Variable(xavier_init(n_hidden_recog_4, n_z))}
        all_weights['biases_recog'] = {
            'b1': tf.Variable(tf.zeros([n_hidden_recog_1], dtype=tf.float32)),
            'b2': tf.Variable(tf.zeros([n_hidden_recog_2], dtype=tf.float32)),
            'b3': tf.Variable(tf.zeros([n_hidden_recog_3], dtype=tf.float32)),
            'b4': tf.Variable(tf.zeros([n_hidden_recog_4], dtype=tf.float32)),
            'out_mean': tf.Variable(tf.zeros([n_z], dtype=tf.float32)),
            'out_log_sigma': tf.Variable(tf.zeros([n_z], dtype=tf.float32))}
        all_weights['weights_gener'] = {
            'h1': tf.Variable(xavier_init(n_z, n_hidden_gener_1)),
            'h2': tf.Variable(xavier_init(n_hidden_gener_1, n_hidden_gener_2)),
            'h3': tf.Variable(xavier_init(n_hidden_gener_2, n_hidden_gener_3)),
            'h4': tf.Variable(xavier_init(n_hidden_gener_3, n_hidden_gener_4)),
            'out_mean': tf.Variable(xavier_init(n_hidden_gener_4, n_input)),
            'out_log_sigma': tf.Variable(xavier_init(n_hidden_gener_4, n_input))}
        all_weights['biases_gener'] = {
            'b1': tf.Variable(tf.zeros([n_hidden_gener_1], dtype=tf.float32)),
            'b2': tf.Variable(tf.zeros([n_hidden_gener_2], dtype=tf.float32)),
            'b3': tf.Variable(tf.zeros([n_hidden_gener_3], dtype=tf.float32)),
            'b4': tf.Variable(tf.zeros([n_hidden_gener_4], dtype=tf.float32)),
            'out_mean': tf.Variable(tf.zeros([n_input], dtype=tf.float32)),
            'out_log_sigma': tf.Variable(tf.zeros([n_input], dtype=tf.float32))}
        return all_weights
            
    def _recognition_network(self, weights, biases):
        # Generate probabilistic encoder (recognition network), which
        # maps inputs onto a normal distribution in latent space.
        # The transformation is parametrized and can be learned.
        
        layer_1 = self.transfer_fct(tf.add(tf.matmul(self.x, weights['h1']), 
                                           biases['b1'])) 
        batch_normed = tf.keras.layers.BatchNormalization()(layer_1)
        layer_2 = self.transfer_fct(tf.add(tf.matmul(batch_normed, weights['h2']), 
                                           biases['b2'])) 
        batch_normed = tf.keras.layers.BatchNormalization()(layer_2)
        layer_3 = self.transfer_fct(tf.add(tf.matmul(batch_normed, weights['h3']), 
                                           biases['b3'])) 
        batch_normed = tf.keras.layers.BatchNormalization()(layer_3)
        layer_4 = self.transfer_fct(tf.add(tf.matmul(batch_normed, weights['h4']), 
                                           biases['b4'])) 
        batch_normed = tf.keras.layers.BatchNormalization()(layer_4)
        z_mean = tf.add(tf.matmul(batch_normed, weights['out_mean']),
                        biases['out_mean'])
        z_log_sigma_sq = \
            tf.add(tf.matmul(batch_normed, weights['out_log_sigma']), 
                   biases['out_log_sigma'])
        return (z_mean, z_log_sigma_sq)

    def _generator_network(self, weights, biases):
        # Generate probabilistic decoder (decoder network), which
        # maps points in latent space onto a Bernoulli distribution in data space.
        # The transformation is parametrized and can be learned.
        layer_1 = self.transfer_fct(tf.add(tf.matmul(self.z, weights['h1']), 
                                           biases['b1'])) 
        batch_normed = tf.keras.layers.BatchNormalization()(layer_1)
        layer_2 = self.transfer_fct(tf.add(tf.matmul(batch_normed, weights['h2']), 
                                           biases['b2'])) 
        batch_normed = tf.keras.layers.BatchNormalization()(layer_2)
        layer_3 = self.transfer_fct(tf.add(tf.matmul(batch_normed, weights['h3']), 
                                           biases['b3'])) 
        batch_normed = tf.keras.layers.BatchNormalization()(layer_3)
        layer_4 = self.transfer_fct(tf.add(tf.matmul(batch_normed, weights['h4']), 
                                           biases['b4'])) 
        batch_normed = tf.keras.layers.BatchNormalization()(layer_4)
        x_reconstr_mean = \
            tf.nn.sigmoid(tf.add(tf.matmul(batch_normed, weights['out_mean']), 
                                 biases['out_mean']))
        return x_reconstr_mean
            
    def _create_loss_optimizer(self):
        # The loss is composed of two terms:
        # 1.) The reconstruction loss (the negative log probability
        #     of the input under the reconstructed Bernoulli distribution 
        #     induced by the decoder in the data space).
        #     This can be interpreted as the number of "nats" required
        #     for reconstructing the input when the activation in latent
        #     is given.
        # Adding 1e-10 to avoid evaluation of log(0.0)
        self.reconstr_loss = \
            -tf.reduce_sum(self.x * tf.log(1e-6 + self.x_reconstr_mean)
                           + (1-self.x) * tf.log(1e-6 + 1 - self.x_reconstr_mean),
                           1)
        # 2.) The latent loss, which is defined as the Kullback Leibler divergence 
        ##    between the distribution in latent space induced by the encoder on 
        #     the data and some prior. This acts as a kind of regularizer.
        #     This can be interpreted as the number of "nats" required
        #     for transmitting the the latent space distribution given
        #     the prior.
        self.latent_loss = -0.5 * tf.reduce_sum(1 + self.z_log_sigma_sq 
                                           - tf.square(self.z_mean) 
                                           - tf.exp(self.z_log_sigma_sq), 1)
        self.cost = tf.reduce_mean(self.reconstr_loss + self.latent_loss)   # average over batch
        # Use ADAM optimizer
        self.optimizer = \
            tf.train.AdamOptimizer(learning_rate=self.learning_rate).minimize(self.cost)
        
    def partial_fit(self, X):
        """Train model based on mini-batch of input data.
        
        Return cost of mini-batch.
        """
        opt, cost = self.sess.run((self.optimizer, self.cost), 
                                  feed_dict={self.x: X})
        return cost
    
    def transform(self, X):
        """Transform data by mapping it into the latent space."""
        # Note: This maps to mean of distribution, we could alternatively
        # sample from Gaussian distribution
        mu = self.sess.run(self.z_mean, feed_dict={self.x: X})
        sigma = self.sess.run(self.z_log_sigma_sq, feed_dict={self.x: X}) ###!!! (doğru mu/gerekli mi??)
        return mu, sigma
    
    def generate(self, z_mu=None):
        """ Generate data by sampling from latent space.
        
        If z_mu is not None, data for this point in latent space is
        generated. Otherwise, z_mu is drawn from prior in latent 
        space.        
        """
        if z_mu is None:
            z_mu = np.random.normal(size=self.network_architecture["n_z"])
        # Note: This maps to mean of distribution, we could alternatively
        # sample from Gaussian distribution
        return self.sess.run(self.x_reconstr_mean, 
                             feed_dict={self.z: z_mu})
    
    def reconstruct(self, X):
        """ Use VAE to reconstruct given data. """
        return self.sess.run(self.x_reconstr_mean, 
                             feed_dict={self.x: X})
    
    
    def reconstruct_error(self, X):
        """ Use VAE to reconstruct given data. """
        return self.sess.run((self.reconstr_loss, self.latent_loss), 
                             feed_dict={self.x: X})

In general, implementing a VAE in tensorflow is relatively straightforward (in particular since we don not need to code the gradient computation). A bit confusing is potentially that all the logic happens at initialization of the class (where the graph is generated), while the actual sklearn interface methods are very simple one-liners.

We can now define a simple fuction which trains the VAE using mini-batches:

In [7]:
import time

def train(network_architecture, 
          learning_rate=0.0001,
          batch_size=1024, 
          training_epochs=70, 
          display_step=1):
    
    vae = VariationalAutoencoder(network_architecture, 
                                 learning_rate=learning_rate, 
                                 batch_size=batch_size)
    history_loss = dict()
    

    # Training cycle
    for epoch in range(training_epochs):
        avg_cost = 0.
        total_batch = int(n_samples / batch_size)
        # Loop over all batches
        
        loss_batch = []
        
        start = time.time()
        for i in range(total_batch):
            batch_xs, _ = kdd99.train_next_batch(batch_size) #mnist.train.next_batch(batch_size)

            # Fit training using batch data
            cost = vae.partial_fit(batch_xs)
            
            loss_batch.append(cost)
            
            #print(" batch:",i,"cost:",cost)
            # Compute average loss
            avg_cost += cost / n_samples * batch_size

        end = time.time()
        
        # Display logs per epoch step
        if epoch % display_step == 0:
            print("--- end of epoch:", '%04d' % (epoch+1), 
                  "avg cost=", "{:.9f}".format(avg_cost),
                  "time=", "{:.3f} seconds".format(end-start), )
            
        ### append loss to history
        history_loss[str(epoch)] = loss_batch
        history_loss[str(epoch)+"-avg"] = avg_cost
            
    return vae, history_loss

## Illustrating reconstruction quality

We can now train a VAE on MNIST by just specifying the network topology. We start with training a VAE with a 20-dimensional latent space.

In [None]:

network_architecture = \
    dict(n_hidden_recog_1=60, # 1st layer encoder neurons
         n_hidden_recog_2=40, # 2nd layer encoder neurons
         n_hidden_recog_3=20, # 2nd layer encoder neurons
         n_hidden_recog_4=10, # 2nd layer encoder neurons
         n_hidden_gener_1=10, # 1st layer decoder neurons
         n_hidden_gener_2=20, # 2nd layer decoder neurons
         n_hidden_gener_3=40, # 1st layer decoder neurons
         n_hidden_gener_4=60, # 2nd layer decoder neurons
         n_input=120,  # kdd99 data input dimension
         n_z=5,
        )  # dimensionality of latent space

vae, history_loss = train(network_architecture, 
            training_epochs=100,
            learning_rate = 0.0001,
            display_step=1, 
            batch_size=1024)

--- end of epoch: 0001 avg cost= 70.265295904 time= 4.162 seconds
--- end of epoch: 0002 avg cost= 27.693592305 time= 3.488 seconds
--- end of epoch: 0003 avg cost= 12.843173820 time= 3.593 seconds
--- end of epoch: 0004 avg cost= 11.207451733 time= 3.536 seconds
--- end of epoch: 0005 avg cost= 10.790388352 time= 3.322 seconds
--- end of epoch: 0006 avg cost= 10.424556433 time= 3.352 seconds
--- end of epoch: 0007 avg cost= 9.282161063 time= 3.417 seconds
--- end of epoch: 0008 avg cost= 6.719495321 time= 3.504 seconds
--- end of epoch: 0009 avg cost= 5.126751181 time= 3.353 seconds
--- end of epoch: 0010 avg cost= 4.696079080 time= 3.352 seconds
--- end of epoch: 0011 avg cost= 4.553808110 time= 3.478 seconds
--- end of epoch: 0012 avg cost= 4.496733281 time= 3.674 seconds
--- end of epoch: 0013 avg cost= 4.415130458 time= 3.473 seconds
--- end of epoch: 0014 avg cost= 4.395545683 time= 3.527 seconds
--- end of epoch: 0015 avg cost= 4.291409600 time= 3.598 seconds
--- end of epoch: 0

### Save network params

In [None]:
vae.save_weights()

In [None]:
vae.load_weights(filename = "./" + timestamp + "-tfsave.meta")

### Save loss history

In [None]:
filename = timestamp + "_history"
np.savez_compressed(filename, history_loss=history_loss)

Based on this we can sample some test inputs and visualize how well the VAE can reconstruct those. In general the VAE does really well.

In [None]:
x_sample =  kdd99.test_next_batch(1024)[0]  #mnist.test.next_batch(100)[0]
x_reconstruct = vae.reconstruct(x_sample)

cmap = plt.cm.jet

plt.figure(figsize=(8, 12))
for i in range(5):
    plt.subplot(5, 2, 2*i + 1)
    plt.imshow(x_sample[i].reshape(10, 12), vmin=0, vmax=1, cmap=cmap)
    plt.title("Test input")
    plt.colorbar()
    plt.subplot(5, 2, 2*i + 2)
    plt.imshow(x_reconstruct[i].reshape(10, 12), vmin=0, vmax=1, cmap=cmap)
    plt.title("Reconstruction")
    plt.colorbar()
plt.tight_layout()

filename = timestamp + "-reconstruction.png".format(hist_bins)
plt.savefig(filename)

# Test

- ### Load dataset

In [None]:
import numpy as np

with np.load("../datasets/kddcup/kdd99_test-randomState_None.npz") as data:  ### kdd99_test
    x_test = data['x_test']

In [None]:
normal_data = x_test[np.where(x_test[:,-1] == 1)]

anomaly_data = x_test[np.where(x_test[:,-1] == 0)]

In [None]:
normal_data.shape

In [None]:
anomaly_data.shape

### Obtain sample energies

- Energies from normal class

In [None]:
batch_size = 1024

In [None]:
losses_normal = []

### from first batch until last batch (not processing last batch)
for i in range(0, len(normal_data)//batch_size -1):
    start_idx = i * batch_size
      
    batch = normal_data[start_idx : start_idx+batch_size]

    print("running step:",i)
        
    losses_normal.append(vae.reconstruct_error(batch))
    
    
### TODO: process last batch

losses_normal = np.asarray(losses_normal)

loss_reconstr_normal =  losses_normal[:,0,:]
loss_latent_normal =  losses_normal[:,1,:]

loss_reconstr_normal = loss_reconstr_normal.ravel()
loss_latent_normal = loss_latent_normal.ravel() 


In [None]:
### print the range in losses
print(loss_latent_normal.min())
print(loss_latent_normal.max())

print(loss_reconstr_normal.min())
print(loss_reconstr_normal.max())

In [None]:
hist_bins = 500

In [None]:
### plot losses

plt.hist(loss_latent_normal, bins=hist_bins, log=True)

plt.title("Latent losses for \"normal\" class")
plt.xlabel("loss")
plt.ylabel("counts")
plt.tight_layout()

filename = timestamp + "-latent-normalHist_bins{}.png".format(hist_bins)
plt.savefig(filename)

In [None]:
plt.hist(loss_reconstr_normal, bins=hist_bins, log=True)

plt.title("Reconstruction losses for \"normal\" class")
plt.xlabel("loss")
plt.ylabel("counts")
plt.tight_layout()

filename = timestamp + "-reconst-normalHist_bins{}.png".format(hist_bins)
plt.savefig(filename)

- Energies from anomaly class

In [None]:
losses_anomaly = []

### from first batch until last batch (not processing last batch)
for i in range(0, len(anomaly_data)//batch_size - 1):
    start_idx = i * batch_size
      
    batch = anomaly_data[start_idx : start_idx+batch_size]

    print("running step:",i)
    
    losses_anomaly.append(vae.reconstruct_error(batch))
    
    
### TODO: process last batch


losses_anomaly = np.asarray(losses_anomaly)

loss_reconstr_anomaly =  losses_anomaly[:,0,:]
loss_latent_anomaly =  losses_anomaly[:,1,:]

loss_reconstr_anomaly = loss_reconstr_anomaly.ravel()
loss_latent_anomaly = loss_latent_anomaly.ravel() 


In [None]:
### print the range in losses
print(loss_latent_anomaly.min())
print(loss_latent_anomaly.max())

print(loss_reconstr_anomaly.min())
print(loss_reconstr_anomaly.max())

In [None]:
### plot losses

plt.hist(loss_latent_anomaly, bins=hist_bins, color="red", log=True)

plt.title("Latent losses for \"anomaly\" class")
plt.xlabel("loss")
plt.ylabel("counts")
plt.tight_layout()

filename = timestamp + "-latent-anomalyHist_bins{}.png".format(hist_bins)
plt.savefig(filename)

In [None]:
plt.hist(loss_reconstr_anomaly, bins=hist_bins, color="red", log=True)

plt.title("Reconstruction losses for \"anomaly\" class")
plt.xlabel("loss")
plt.ylabel("counts")
plt.tight_layout()

filename = timestamp + "-reconst-anomalyHist_bins{}.png".format(hist_bins)
plt.savefig(filename)

### Show both normal and anomaly in same graph

In [None]:
hist_bins = 120

plt.hist((loss_reconstr_normal, loss_reconstr_anomaly), 
         bins=hist_bins, 
         color=["blue", "red"], 
         histtype="bar", 
         label=["normal", "anomaly"], 
         rwidth=1.0,
         stacked=False,
         log=True)

plt.title("Reconstruction losses")
plt.xlabel("loss")
plt.ylabel("counts")
plt.tight_layout()

filename = timestamp + "-reconstr-allHist_bins{}.png".format(hist_bins)
plt.savefig(filename)


In [None]:
hist_bins = 120

plt.hist((loss_latent_normal, loss_latent_anomaly), 
         bins=hist_bins, 
         color=["blue", "red"], 
         histtype="bar", 
         label=["normal", "anomaly"], 
         rwidth=1.0,
         stacked=False,
         log=True)

plt.title("Latent losses")
plt.xlabel("loss")
plt.ylabel("counts")
plt.tight_layout()

filename = timestamp + "-latent-allHist_bins{}.png".format(hist_bins)
plt.savefig(filename)


### Find the threshold for classifying as anomaly: "top %20 of highest energy"

In [None]:
energies_all = np.concatenate((energies_normal, energies_anomaly), axis=0)

In [None]:
energies_all.shape

- Sort calculated energies descending

In [None]:
sorted_energies = np.sort(energies_all)[::-1]

In [None]:
sorted_energies

- Top %20 percent is the first %20 part of it

In [None]:
threshold_index = int(np.floor(len(sorted_energies) * 0.2))

In [None]:
sorted_energies[:threshold_index]

In [None]:
threshold = sorted_energies[threshold_index]

- Below this threshold, samples are classified as normal

In [None]:
threshold

### Calculate metrics

- "Anomaly class is positive" (from paper)

|                   | anomaly | normal |   |   |
|-------------------|---------|--------|---|---|
| predicted anomaly | TP      | FP     |   |   |
| predicted normal  | FN      | TN     |   |   |
|                   |         |        |   |   |

In [None]:
tp = len(energies_anomaly[energies_anomaly > threshold])
fp = len(energies_normal[energies_normal > threshold])

tn = len(energies_normal[energies_normal < threshold])
fn = len(energies_anomaly[energies_anomaly < threshold])

print('tp', tp)
print('fp', fp)
print('tn', tn)
print('fn', fn)

In [None]:
precision = tp / (tp + fp)

In [None]:
precision

In [None]:
recall = tp / (tp + fn)

In [None]:
recall

In [None]:
f1 = 2*tp / (2*tp + fp + fn)

In [None]:
f1

## Illustrating latent space (2D)

In [None]:
stop!!!

Next, we train a VAE with 2d latent space and illustrates how the encoder (the recognition network) encodes some of the labeled inputs (collapsing the Gaussian distribution in latent space to its mean). This gives us some insights into the structure of the learned manifold (latent space)

In [None]:

network_architecture = \
    dict(n_hidden_recog_1=60, # 1st layer encoder neurons
         n_hidden_recog_2=40, # 2nd layer encoder neurons
         n_hidden_recog_3=20, # 2nd layer encoder neurons
         n_hidden_recog_4=10, # 2nd layer encoder neurons
         n_hidden_gener_1=10, # 1st layer decoder neurons
         n_hidden_gener_2=20, # 2nd layer decoder neurons
         n_hidden_gener_3=40, # 1st layer decoder neurons
         n_hidden_gener_4=60, # 2nd layer decoder neurons
         n_input=120,  # kdd99 data input dimension
         n_z=2,
        )  # dimensionality of latent space

vae, history_loss = train(network_architecture, 
            training_epochs=20,
            learning_rate = 0.0001,
            display_step=1, 
            batch_size=1024)

In [None]:
x_sample, y_sample = kdd99.test_next_batch(5000) #mnist.test.next_batch(5000)
z_mu,_ = vae_2d.transform(x_sample)  #### TODO:also try z_sig?
plt.figure(figsize=(8, 6)) 
plt.scatter(z_mu[:, 0], z_mu[:, 1], s=20, c=x_sample[:,-1])   ###last column has the label info
plt.colorbar()
plt.grid()

An other way of getting insights into the latent space is to use the generator network to plot reconstrunctions at the positions in the latent space for which they have been generated:

In [None]:
nx = ny = 20
x_values = np.linspace(-3, 3, nx)
y_values = np.linspace(-3, 3, ny)

cmap = plt.cm.jet

canvas = np.empty((10*ny, 12*nx))
for i, yi in enumerate(x_values):
    for j, xi in enumerate(y_values):
        z_mu = np.array([[xi, yi]]*vae_2d.batch_size)
        x_mean = vae_2d.generate(z_mu)
        canvas[(nx-i-1)*10:(nx-i)*10, j*12:(j+1)*12] = x_mean[0].reshape(10, 12)

plt.figure(figsize=(12, 14))        
Xi, Yi = np.meshgrid(x_values, y_values)
plt.imshow(canvas, origin="upper", cmap=cmap)
plt.tight_layout()