## Setup

In [None]:
!pip install tensorflow-probability

# to generate gifs
!pip install imageio
!pip install git+https://github.com/tensorflow/docs

Collecting git+https://github.com/tensorflow/docs
  Cloning https://github.com/tensorflow/docs to /tmp/pip-req-build-vhpzgw7b
  Running command git clone -q https://github.com/tensorflow/docs /tmp/pip-req-build-vhpzgw7b


In [None]:
from IPython import display

import glob
import imageio
import matplotlib.pyplot as plt
import numpy as np
import PIL
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import tensorflow_probability as tfp
import time

## Load the MNIST dataset



In [None]:
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()


y_train = tf.one_hot(y_train.astype('float32'), 10)
y_test = tf.one_hot(y_test.astype('float32'), 10)

In [None]:
def preprocess_images(images):
  images = images.reshape((images.shape[0], -1)) / 255.
  return np.where(images > .5, 1.0, 0.0).astype('float32')

x_train = preprocess_images(x_train)
x_test = preprocess_images(x_test)

print(x_train.shape)

(60000, 784)


In [None]:
train_size = 60000
batch_size = 32
test_size = 10000
print(y_train)
y_dim = y_train.shape[1]


tf.Tensor(
[[0. 0. 0. ... 0. 0. 0.]
 [1. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 ...
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 1. 0.]], shape=(60000, 10), dtype=float32)


In [None]:
class Sampling(tf.keras.layers.Layer):
    """Uses (z_mean, z_log_var) to sample z, the vector encoding a digit."""

    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon


In [None]:
class CVAE(tf.keras.Model):
  """Convolutional variational autoencoder."""

  def __init__(self, latent_dim, y_dim):
    super(CVAE, self).__init__()
    self.latent_dim = latent_dim
    self.y_dim = y_dim
    self.input_dim = 784

    x_input = keras.Input(shape=(self.input_dim))
    y_input = keras.Input(shape=(self.y_dim))
    encoder_inputs = keras.layers.Concatenate(axis = 1)([x_input,y_input])
    x = layers.Dense(128, activation="relu")(encoder_inputs)
    x = layers.Dense(32, activation="relu")(x)
    z_mean = layers.Dense(latent_dim, name="z_mean")(x)
    z_log_var = layers.Dense(latent_dim, name="z_log_var")(x)
    z = Sampling()([z_mean, z_log_var])
    self.encoder = keras.Model(inputs = [x_input,y_input], outputs = [z_mean, z_log_var,z], name="encoder")


    decode_inputs = keras.Input(shape=(latent_dim))
    yd_input = keras.Input(shape=(self.y_dim))
    print(latent_dim,self.y_dim)
    latent_inputs = keras.layers.Concatenate(axis = 1)([decode_inputs,yd_input])
    x = layers.Dense(32, activation="relu")(latent_inputs)
    x = layers.Dense(128, activation="relu")(x)
    decoder_outputs = layers.Dense(self.input_dim, activation="sigmoid")(x)


    self.decoder = keras.Model([decode_inputs, yd_input], decoder_outputs, name="decoder")

    self.total_loss_tracker = keras.metrics.Mean(name="total_loss")
    self.reconstruction_loss_tracker = keras.metrics.Mean(
        name="reconstruction_loss")
    self.kl_loss_tracker = keras.metrics.Mean(name="kl_loss")


  def train_step(self, data):
      with tf.GradientTape() as tape:
          entry, verif = data
          z_mean, z_log_var, z = self.encoder(entry)
          x = entry[0]
          y = entry[1]
          reconstruction = self.decoder([z,y])
          reconstruction_loss = tf.reduce_mean(
              tf.reduce_sum(
                  keras.losses.binary_crossentropy(x, reconstruction))
          )
          kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
          kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
          total_loss = reconstruction_loss + kl_loss
      grads = tape.gradient(total_loss, self.trainable_weights)
      self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
      self.total_loss_tracker.update_state(total_loss)
      self.reconstruction_loss_tracker.update_state(reconstruction_loss)
      self.kl_loss_tracker.update_state(kl_loss)
      return {
          "loss": self.total_loss_tracker.result(),
          "reconstruction_loss": self.reconstruction_loss_tracker.result(),
          "kl_loss": self.kl_loss_tracker.result(),
      }

In [None]:
latent_dim = 20
y_dim = y_dim 



vae = CVAE(latent_dim , y_dim)
vae.compile(optimizer=tf.keras.optimizers.Adam())
print(x_train.shape, y_train.shape)
vae.fit(x = [x_train, y_train],  y = x_train, epochs=50, batch_size=128)


2 10
(60000, 784) (60000, 10)
Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
 69/469 [===>..........................] - ETA: 4s - loss: 24.1499 - reconstruction_loss: 21.7723 - kl_loss: 2.2720

KeyboardInterrupt: ignored

In [None]:
from keras.utils import np_utils
digit_size = 28

z_sample = np.random.normal(0,1,(10,2))
for j in range(10):
    c = np.array(np_utils.to_categorical(j,y_dim))
    plt.figure(figsize=(20, 2))
    input = (z_sample,np.tile(c,(10,1)))
    x_decoded = vae.decoder(input)
    digit = tf.reshape(x_decoded,(10, digit_size, digit_size))
    for i in range(10):
        plt.subplot(1, y_dim, i+1)
        plt.axis('off')
        plt.imshow(digit[i], cmap='Greys_r',)
        
plt.show()


### Display a generated image from the last training epoch