##### Adopted from  'Copyright 2018 The TensorFlow Authors.'


In [None]:
# to generate gifs
!pip install imageio

In [None]:
!wget  https://storage.cloud.google.com/cartoonset_public_files/cartoonset10k.tgz

In [None]:
import os

import tensorflow as tf
import tensorflow_datasets as tfds

import os
import time
import numpy as np
import glob
import matplotlib.pyplot as plt
import PIL
import imageio
from PIL import Image

from IPython import display
from PIL import Image
import cv2

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
path = '/content/drive/My Drive/datasets/cartoons_jpg'
files = os.listdir(path)
files = [os.path.join(path, f) for f in files]

In [None]:
img = plt.imread(files[0])
print(img.shape)
plt.imshow(img)

In [None]:
images = []
for p in files:
  img = plt.imread(p)[80:-80,80:-80,:]
  images.append(cv2.resize(img, (64, 64)))
  if len(images) > 2000: break

train = np.array(images[int(len(images)*0.2):])
test = np.array(images[:int(len(images)*0.8)])


print(len(images))
print(len(train))
print(len(test))
plt.imshow(train[32])

In [None]:
print(len(train))

In [None]:
print(len(test))

In [None]:
train_images = train.astype('float32')
test_images = test.astype('float32')

# Normalizing the images to the range of [0., 1.]
train_images /= 255.
test_images /= 255.

# Binarization
# train_images[train_images >= .5] = 1.
# train_images[train_images < .5] = 0.
# test_images[test_images >= .5] = 1.
# test_images[test_images < .5] = 0.

plt.imshow(train_images[8])

In [None]:
train_images[8].min()

In [None]:
TRAIN_BUF = 6000
BATCH_SIZE = 300

TEST_BUF = 1000

## Use *tf.data* to create batches and shuffle the dataset

In [None]:
train_dataset = tf.data.Dataset.from_tensor_slices(train_images).shuffle(TRAIN_BUF).batch(BATCH_SIZE)
test_dataset = tf.data.Dataset.from_tensor_slices(test_images).shuffle(TEST_BUF).batch(BATCH_SIZE)

## Wire up the generative and inference network with *tf.keras.Sequential*

In our VAE example, we use two small ConvNets for the generative and inference network. Since these neural nets are small, we use `tf.keras.Sequential` to simplify our code. Let $x$ and $z$ denote the observation and latent variable respectively in the following descriptions.

### Generative Network
This defines the generative model which takes a latent encoding as input, and outputs the parameters for a conditional distribution of the observation, i.e. $p(x|z)$. Additionally, we use a unit Gaussian prior $p(z)$ for the latent variable.

### Inference Network
This defines an approximate posterior distribution $q(z|x)$, which takes as input an observation and outputs a set of parameters for the conditional distribution of the latent representation. In this example, we simply model this distribution as a diagonal Gaussian. In this case, the inference network outputs the mean and log-variance parameters of a factorized Gaussian (log-variance instead of the variance directly is for numerical stability).

### Reparameterization Trick
During optimization, we can sample from $q(z|x)$ by first sampling from a unit Gaussian, and then multiplying by the standard deviation and adding the mean. This ensures the gradients could pass through the sample to the inference network parameters.

### Network architecture
For the inference network, we use two convolutional layers followed by a fully-connected layer. In the generative network, we mirror this architecture by using a fully-connected layer followed by three convolution transpose layers (a.k.a. deconvolutional layers in some contexts). Note, it's common practice to avoid using batch normalization when training VAEs, since the additional stochasticity due to using mini-batches may aggravate instability on top of the stochasticity from sampling.

In [None]:
class CVAE(tf.keras.Model):
  def __init__(self, latent_dim):
    super(CVAE, self).__init__()
    self.latent_dim = latent_dim
    self.inference_net = tf.keras.Sequential(
      [
          tf.keras.layers.InputLayer(input_shape=(64, 64, 3)),
          tf.keras.layers.Conv2D(
              filters=32, kernel_size=3, strides=(2, 2), activation='relu'),
          tf.keras.layers.Conv2D(
              filters=64, kernel_size=3, strides=(2, 2), activation='relu'),
          tf.keras.layers.Flatten(),
          # No activation
          tf.keras.layers.Dense(latent_dim + latent_dim),
      ]
    )

    self.generative_net = tf.keras.Sequential(
        [
          tf.keras.layers.InputLayer(input_shape=(latent_dim,)),
          tf.keras.layers.Dense(units=16*16*32, activation=tf.nn.relu),
          tf.keras.layers.Reshape(target_shape=(16,16, 32)),
          tf.keras.layers.Conv2DTranspose(
              filters=64,
              kernel_size=3,
              strides=(2, 2),
              padding="SAME",
              activation='relu'),
          tf.keras.layers.Conv2DTranspose(
              filters=32,
              kernel_size=3,
              strides=(2, 2),
              padding="SAME",
              activation='relu'),
          # No activation
          tf.keras.layers.Conv2DTranspose(
              filters=10, kernel_size=3, strides=(1, 1), padding="SAME"),
         tf.keras.layers.Conv2D(
              filters=3, kernel_size=2, strides=(1, 1), padding="SAME")
        ]
    )

  @tf.function
  def sample(self, eps=None):
    if eps is None:
      eps = tf.random.normal(shape=(100, self.latent_dim))
    return self.decode(eps, apply_sigmoid=True)

  def encode(self, x):
    mean, logvar = tf.split(self.inference_net(x), num_or_size_splits=2, axis=1)
    return mean, logvar

  def reparameterize(self, mean, logvar, rep=True):
    eps = 1
    if rep: eps = tf.random.normal(shape=mean.shape)
    return eps * tf.exp(logvar * .5) + mean

  def decode(self, z, apply_sigmoid=False):
    logits = self.generative_net(z)
    if apply_sigmoid:
      probs = tf.sigmoid(logits)
      return probs

    return logits

In [None]:
# test output image size
model = CVAE(100)
model.generative_net.build()
model.generative_net.summary()

## Define the loss function and the optimizer

VAEs train by maximizing the evidence lower bound (ELBO) on the marginal log-likelihood:

$$\log p(x) \ge \text{ELBO} = \mathbb{E}_{q(z|x)}\left[\log \frac{p(x, z)}{q(z|x)}\right].$$

In practice, we optimize the single sample Monte Carlo estimate of this expectation:

$$\log p(x| z) + \log p(z) - \log q(z|x),$$
where $z$ is sampled from $q(z|x)$.

**Note**: we could also analytically compute the KL term, but here we incorporate all three terms in the Monte Carlo estimator for simplicity.

$$\ \mathbb{E}_{q(z|x)}[ \log p(x|z)] - KL(q(z|x)||p(z)) $$
\$$ reconstruction \space loss - \space distribution \space similarity \space loss $$
q(z|x) - normal ditribution given x <br>
p(z) - normal distribution given x <br>

In [None]:


def log_normal_pdf(sample, mean, logvar, raxis=1):
  log2pi = tf.math.log(2. * np.pi)
  return tf.reduce_sum(
      -.5 * ((sample - mean) ** 2. * tf.exp(-logvar) + logvar + log2pi),
      axis=raxis)
  
# def kl_div(log_one, log_two):
#   return tf.reduce_sum(tf.exp(log_one)*(log_one-log_two))

@tf.function
def compute_loss(model, x):
  mean, logvar = model.encode(x)
  # mean shape == hidden ecnoded size
  z = model.reparameterize(mean, logvar)  # eps * tf.exp(logvar * .5) + mean, some random sampling from normal
  x_logit = model.decode(z)

  cross_ent = tf.nn.sigmoid_cross_entropy_with_logits(logits=x_logit, labels=x)
  
  # recon loss
  logpx_z = -tf.reduce_sum(cross_ent, axis=[1, 2, 3])  # sum over all axis per item in batch
  logpz = log_normal_pdf(z, 0., 0.)
  logqz_x = log_normal_pdf(z, mean, logvar)  # probability of z to be normal with such variance & mean 
  
  #print(f' logqz_x {logqz_x}')
  #print(f' logpz {logpz}')

  # recon_loss = 0.5 * np.log(2 * np.pi) + log_std_x + \
  #                    (x - mu_x) ** 2 * torch.exp(-2 * log_std_x) * 0.5
  #       recon_loss = recon_loss.sum(1).mean()

  # Compute KL
  # kl_loss_direct = 0
  # kl_loss_p = 0
  kl_loss_direct = -logvar - 0.5 + (tf.exp(2 * logvar) + mean ** 2) * 0.5
  kl_loss_direct = tf.reduce_mean(tf.reduce_sum(kl_loss_direct,1))

  #kl_loss_p = tf.reduce_sum(tf.exp(logpz)*(logpz-logqz_x))

  kl_loss_p = tf.reduce_mean(logqz_x -logpz)

  
  return -tf.reduce_mean(logpx_z + logpz - logqz_x), kl_loss_direct, kl_loss_p, -tf.reduce_mean(logpx_z) 

@tf.function
def compute_apply_gradients(model, x, optimizer):
  with tf.GradientTape() as tape:
    losses = compute_loss(model, x)
    vlb, _, _ , _  = losses
    #tf.print(losses)
  gradients = tape.gradient(vlb, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))

optimizer = tf.keras.optimizers.Adam(1e-4)## Training

* We start by iterating over the dataset
* During each iteration, we pass the image to the encoder to obtain a set of mean and log-variance parameters of the approximate posterior $q(z|x)$
* We then apply the *reparameterization trick* to sample from $q(z|x)$
* Finally, we pass the reparameterized samples to the decoder to obtain the logits of the generative distribution $p(x|z)$
* **Note:** Since we use the dataset loaded by keras with 60k datapoints in the training set and 10k datapoints in the test set, our resulting ELBO on the test set is slightly higher than reported results in the literature which uses dynamic binarization of Larochelle's MNIST.

## Generate Images

* After training, it is time to generate some images
* We start by sampling a set of latent vectors from the unit Gaussian prior distribution $p(z)$
* The generator will then convert the latent sample $z$ to logits of the observation, giving a distribution $p(x|z)$
* Here we plot the probabilities of Bernoulli distributions


In [None]:
optimizer = tf.keras.optimizers.Adam(1e-3)

epochs = 100
latent_dim = 100
num_examples_to_generate = 16

# keeping the random vector constant for generation (prediction) so
# it will be easier to see the improvement.
random_vector_for_generation = tf.random.normal(
    shape=[num_examples_to_generate, latent_dim])
model = CVAE(latent_dim)

In [None]:
def generate_and_save_images(model, epoch, test_input):
  predictions = model.sample(test_input)
  fig = plt.figure(figsize=(10,10))

  for i in range(predictions.shape[0]):
      plt.subplot(4, 4, i+1)
      plt.imshow(predictions[i])
      plt.axis('off')

  # tight_layout minimizes the overlap between 2 sub-plots
  plt.savefig('image_at_epoch_{:04d}.png'.format(epoch))
  plt.show()

In [None]:
a = tf.convert_to_tensor(2, tf.float32)

In [None]:
tf.math.exp(a)

In [None]:
#generate_and_save_images(model, 0, random_vector_for_generation)

history = []

for epoch in range(1, epochs + 1):
  start_time = time.time()
  for train_x in train_dataset:
    compute_apply_gradients(model, train_x, optimizer)
  end_time = time.time()

  vlb_sum = 0
  kl_loss_p = 0
  kl_loss_direct = 0
  reconstruction_loss = 0

  if epoch % 1 == 0:
    #loss = tf.keras.metrics.Mean()
    for i, test_x in enumerate(test_dataset):
      vlb, kl_d, kl_p, rec= compute_loss(model, test_x)
      

      vlb_sum += vlb
      kl_loss_p += kl_p
      kl_loss_direct += kl_d
      reconstruction_loss += rec
    
    #elbo = -loss.result()
    display.clear_output(wait=False)
    i+=1
    history.append({'vlb_sum':vlb_sum/i,'kl_loss_p':kl_loss_p/i, 'kl_loss_direct':kl_loss_direct/i, 'reconstruction_loss':reconstruction_loss/i})
    print(f"""Epoch: {epoch}, Test set ELBO: {-vlb_sum/i} \nkl_loss_p {kl_loss_p/i} \nkl_loss_direct {kl_loss_direct/i}\nreconstruction_loss {reconstruction_loss/i}""")
    generate_and_save_images(
       model, epoch, random_vector_for_generation)

In [None]:
keys = list(history[0].keys())
plt.figure(figsize=(10,5))

for key in keys:
  items = []  
  for item in history:
    items.append(item[key])
  plt.plot(range(len(history)),items, label=key)
plt.legend()
plt.show()

In [None]:
tf.reduce_mean(kl_loss_p)

In [None]:
tf.reduce_mean(kl_loss_direct)

### Display an image using the epoch number

In [None]:
def display_image(epoch_no):
  return PIL.Image.open('image_at_epoch_{:04d}.png'.format(epoch_no))

In [None]:
plt.imshow(display_image(epochs))
plt.axis('off')# Display images

### Generate a GIF of all the saved images.

In [None]:
anim_file = 'cvae.gif'

with imageio.get_writer(anim_file, mode='I') as writer:
  filenames = glob.glob('image*.png')
  filenames = sorted(filenames)
  last = -1
  for i,filename in enumerate(filenames):
    frame = 2*(i**0.5)
    if round(frame) > round(last):
      last = frame
    else:
      continue
    image = imageio.imread(filename)
    writer.append_data(image)
  image = imageio.imread(filename)
  writer.append_data(image)

import IPython
if IPython.version_info >= (6,2,0,''):
  display.Image(filename=anim_file)

If you're working in Colab you can download the animation with the code below:

In [None]:
try:
  from google.colab import files
except ImportError:
   pass
else:
  files.download(anim_file)

In [None]:
random_vector_for_generation = tf.random.normal(
    shape=[num_examples_to_generate, latent_dim])
generate_and_save_images(
        model, epoch, random_vector_for_generation)