In [None]:
%tensorflow_version 2.x
from __future__ import absolute_import, division, print_function, unicode_literals

import tensorflow as tf

import os
import time
import numpy as np
import glob
import matplotlib.pyplot as plt
import PIL
import imageio

from IPython import display
from imgaug import augmenters 
from numpy import loadtxt
import numpy as np
from pylab import rcParams
import numpy 

import tensorflow as tf
from tensorflow.keras.layers import InputLayer, Conv2D, Flatten, Dense, Conv2DTranspose, Reshape

import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import matplotlib
import numpy as np
import os
import tensorflow as tf

# PLOT

In [None]:
# regenerate images from test set 
# 5 indicative images are reconstructed 
def plot_reconstructed_test_data(model, test_dataset):
    n = 5
    sample_dataset = test_dataset
    x_input, y_input = next(sample_dataset.__iter__())
    x_input_sample, y_input_sample = map(lambda x: x[:n], (x_input, y_input))
    z = model.encode(x_input_sample, y_input_sample)[0].numpy()

    fig, axarr = plt.subplots(2, n, figsize=(5, 2))
    x_input_sample = x_input_sample.numpy().reshape([n, 28, 28])
    x_output = model.decode(z, y_input_sample, apply_sigmoid=True).numpy().reshape([n, 28, 28])

    for i in range(n):
        axarr[0, i].axis('off')
        axarr[1, i].axis('off')
        axarr[0, i].imshow(x_input_sample[i], cmap='binary')
        axarr[1, i].imshow(x_output[i], cmap='binary')

In [None]:
# generate images giving to the decoder a latent vector 
def generated_images(model, test_dataset):
      # image generation 
      n = 16
      num_classes = 10
      f, axarr = plt.subplots(num_classes, n, figsize=(n, num_classes))
      f.subplots_adjust(hspace=0., wspace=-0.)
      for i in range(num_classes):
          for j, z_j in enumerate(np.linspace(-2, 2, n)):
              z = np.array([[z_j, 0]])
              z = tf.convert_to_tensor(z, dtype=tf.float32)
              generated_img = model.decode(z, [i], apply_sigmoid=True).numpy().reshape([28, 28])
              axarr[i, j].axis('off')
              axarr[i, j].imshow(generated_img, cmap='binary')




In [None]:

def plot_VAE(model, test_dataset):
  plot_reconstructed_test_data(model, test_dataset)
  generated_images(model, test_dataset)



# LOAD DATASETS

In [None]:
def load_dataset(batch_size=1000):
    (train_images, train_labels), (test_images, test_labels) = tf.keras.datasets.mnist.load_data()
    # add noise in initial dataset 
    noise = augmenters.SaltAndPepper(0.2)
    seq_object = augmenters.Sequential([noise])
    # normalize input data pixels 
    train_images = seq_object.augment_images(train_images * 255) / 255
    test_images = seq_object.augment_images(test_images * 255) / 255
    train_images = train_images.reshape(train_images.shape[0], 28, 28, 1).astype('float32')
    test_images = test_images.reshape(test_images.shape[0], 28, 28, 1).astype('float32')
    # train_images, test_images = normalize(train_images, test_images)

    TRAIN_BUF = 60000
    TEST_BUF = 10000

    BATCH_SIZE = batch_size

    train_dataset_image = tf.data.Dataset.from_tensor_slices(train_images).batch(BATCH_SIZE)
    train_dataset_label = tf.data.Dataset.from_tensor_slices(train_labels).batch(BATCH_SIZE)
    train_dataset = tf.data.Dataset.zip((train_dataset_image, train_dataset_label)).shuffle(TRAIN_BUF)

    test_dataset_image = tf.data.Dataset.from_tensor_slices(test_images).batch(BATCH_SIZE)
    test_dataset_label = tf.data.Dataset.from_tensor_slices(test_labels).batch(BATCH_SIZE)
    test_dataset = tf.data.Dataset.zip((test_dataset_image, test_dataset_label)).shuffle(TEST_BUF)

    return train_dataset, test_dataset

# MODEL Conditional Variational Autoencoder  

In [None]:
def encoder (latent_dim, num_classes):
        encoder=tf.keras.Sequential([
            # input layer : image  + label                                                   
            tf.keras.layers.InputLayer(input_shape=[28 * 28 * 1 + num_classes]),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(256, activation='relu'),
            tf.keras.layers.Dense(128, activation='relu'),
            tf.keras.layers.Dense(latent_dim * 2),    # [means, stds]
        ])
        return encoder

def decoder (latent_dim, num_classes):
      decoder=tf.keras.Sequential([
            tf.keras.layers.InputLayer(input_shape=[latent_dim + num_classes]),
            tf.keras.layers.Dense(128, activation='relu'),
            tf.keras.layers.Dense(256, activation='relu'),
            tf.keras.layers.Dense(28 * 28 * 1),
            tf.keras.layers.Reshape(target_shape=[28, 28, 1]),
        ])
      return decoder


In [None]:



class CVAE(tf.keras.Model):
    def __init__(self, latent_dim: int):
        super(CVAE, self).__init__()
        self.latent_dim = latent_dim
        self.num_classes = 10
        self.inference_net = encoder (self.latent_dim, self.num_classes)
        self.generative_net = decoder (self.latent_dim, self.num_classes)

    def encode(self, x, y):
        conditional_x = tf.concat([Flatten()(x), tf.one_hot(y, self.num_classes)], 1)
        mean_logvar = self.inference_net(conditional_x)
        N = mean_logvar.shape[0]
        mean = tf.slice(mean_logvar, [0, 0], [N, self.latent_dim])
        logvar = tf.slice(mean_logvar, [0, self.latent_dim], [N, self.latent_dim])
        return mean, logvar

    def decode(self, z, y, apply_sigmoid=False):
        conditional_z = tf.concat([Flatten()(z), tf.one_hot(y, self.num_classes)], 1)
        logits = self.generative_net(conditional_z)
        if apply_sigmoid:
            probs = tf.sigmoid(logits)
            return probs
        else :
            return logits

    def reparameterize(self, mean, logvar):
        eps = tf.random.normal(shape=mean.shape)
        return eps * tf.exp(logvar * .5) + mean



# TRAIN

In [None]:
# For training our algorithm we have to maximize the ELBO 
# Instead of that convert this maximization problem to a minimization problem 
# by calculating the -ELBO

class CVAETrain:
    @staticmethod
    def compute_loss(model, x, y):
        mean, logvar = model.encode(x, y)
        z = model.reparameterize(mean, logvar)
        x_logits = model.decode(z, y)

        # cross_ent = - marginal likelihood
        cross_ent = tf.nn.sigmoid_cross_entropy_with_logits(logits=x_logits, labels=x)
        marginal_likelihood = - tf.reduce_sum(cross_ent, axis=[1, 2, 3])
        marginal_likelihood = tf.reduce_mean(marginal_likelihood)


        # Our KL divergence loss can be rewritten in the formula defined above (Wiseodd, 2016).
        # https://wiseodd.github.io/techblog/2016/12/10/variational-autoencoder/
        KL_divergence = tf.reduce_sum(mean ** 2 + tf.exp(logvar) - logvar - 1, axis=1)
        KL_divergence = tf.reduce_mean(KL_divergence)
        
        # Adam optimizer used finds the min of a function 
        # for this reason we convert ELBO (mazimization problem)
        # to -ELBO 
        # now using as a loos function = -ELBO we convert the initial maximization project 
        # to a minimization problem
        ELBO = marginal_likelihood - KL_divergence
        loss = -ELBO
        return loss


    @staticmethod
    def compute_gradients(model, x,y, optimizer):
        with tf.GradientTape() as tape:
            loss = CVAETrain.compute_loss(model, x, y)
        loss_init = loss
        gradients= tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))
        return loss_init

In [None]:

def train_CVAE(latent_dim=2, epochs=100, lr=1e-4, batch_size=1000):
    model = CVAE(latent_dim)
    # load and normalize data , split it to batches 
    train_dataset, test_dataset = load_dataset(batch_size=batch_size)
    # initialize Adam optimiser necessary for minimizing Loss Function 
    optimizer = tf.keras.optimizers.Adam(lr)

    for epoch in range(1, epochs + 1):
        last_loss = 0
        t = time.time()
        # For each epoch we train our model for the whole dataset ( all batches ) 
        for train_x, train_y in train_dataset:
            loss = CVAETrain.compute_gradients(model, train_x, train_y, optimizer)
            last_loss = loss
        if epoch % 50 == 0:
            print('Epoch {}, Loss: {}, Remaining Time at This Epoch: {:.2f}'.format(
                epoch, last_loss, time.time() - t
            ))

    plot_VAE(model, test_dataset)

    return model


# Denoising task Experiments 

In [None]:
train_CVAE(epochs=100)

In [None]:
train_CVAE(epochs=150)

# Higher dimention Latent space 

In [None]:
def plot_CVAE(model, test_dataset):
    n = 10
    sample_dataset = test_dataset
    x_input, y_input = next(sample_dataset.__iter__())
    x_input_sample, y_input_sample = map(lambda x: x[:n], (x_input, y_input))
    z = model.encode(x_input_sample, y_input_sample)[0].numpy()

    fig, axarr = plt.subplots(2, n, figsize=(10,4))
    x_input_sample = x_input_sample.numpy().reshape([n, 28, 28])
    x_output = model.decode(z, y_input_sample, apply_sigmoid=True).numpy().reshape([n, 28, 28])

    for i in range(n):
        axarr[0, i].axis('off')
        axarr[1, i].axis('off')
        axarr[0, i].imshow(x_input_sample[i], cmap='binary')
        axarr[1, i].imshow(x_output[i], cmap='binary')



train_CVAE(latent_dim=10,epochs=150)