**Curso de Inteligencia Artificial y Aprendizaje Profundo**


# Autoencoder Variacional  Dense MNIST

## Autores


1. Alvaro Mauricio Montenegro Díaz, ammontenegrod@unal.edu.co
2. Daniel Mauricio Montenegro Reyes, dextronomo@gmail.com 

## Contenido

* [Introducción](#Introducción)
* [Estructura Matemática del modelo](#Estructura-Matemática-del-modelo)
* [Diseño del autoencoder](#Diseño-del-autoencoder)
* [Modelo API funcional de Keras](#Modelo-API-funcional-de-Keras)
* [Modelo Orientado a Objetos (sub-classing)](#Modelo-Orientado-a-Objetos ) 
* [Conceptos Teóricos](VAE_Introduction.ipynb#Contenido)

# Introducción

En este cuaderno se implementa un Autoencoder Clásico para el conjunto de datos MINIST. La implementación se base únicamente en capas densas. vea el ejemplo con capas convolucionales para una versión más avanzada.


Se muestran dos implementaciones:

1. Modelo API funcional de tf.keras
2. Modelo Orientado a objetos (subclassing)

Adicionalmente, al final se muestra la implementación de un algortimo de entrenamiento personalizado.

## Estructura Matemática del modelo


Nuestro autocodificador (autoencoder) variacional (VAE) tendrá variables latentes gaussianas y una distribución posterior gaussiana   $q_{\phi}(\boldsymbol{z}|\boldsymbol{x})$  con una matriz de covarianza diagonal.

Recordemos que  un VAE de cuatro elementos escenciales:

1. Una  variable  latente $\boldsymbol{z}$ con distribución  $p(\boldsymbol{z})$  que en nuestro casoserá una variable  aleatoria Gaussiana con media cero y varianza 1 y que denotamos   $\epsilon$.
2. Un decodificador(decoder)  $p(\boldsymbol{x}|\boldsymbol{z})$  que mapea las  variables latentes  $\boldsymbol{z}$  a variables observables $\boldsymbol{x}$. En este ejemplo este codificador implementa un perceptron multicapa (MLP), es decir una red neuronal con una capa oculta.
3. Un codificador (encoder)  $q_{\phi}(\boldsymbol{z}|\boldsymbol{x})$  que mapea ejemplos de entrada al espacio latente. Como se stá cosntruyendo un autoencoder variacional se tiene que este mapeo se hace generando muestras aleatorias de distribciones Gaussianas con medias y varianzas que dependen de  la entrada:   $q_{\phi}(\boldsymbol{z}|\boldsymbol{x})=N(\boldsymbol{z},\boldsymbol{\mu}(x),\text{diag}(\boldsymbol{\sigma}^2(\boldsymbol{x})))$. 
4. Una función de costo que tiene dos términos: el  error de construcción que corresponde al modelo generativo implementado en el decoder y un término adicional de regularización que minimiza la divergencia KL. El error de reconstrucción es medido por el error cuadrático medio y la divergencia por el término
$-D_{KL}(q_{\phi}(\boldsymbol{z}|\boldsymbol{x})|p(\boldsymbol{z}))=\tfrac{1}{2}\sum_{j=1}^{J}(1+\log \boldsymbol{\sigma}^2_j(\boldsymbol{x})-\boldsymbol{\mu}^2_j(\boldsymbol{x})-\boldsymbol{\sigma}^2_j(\boldsymbol{x}))$.

### Diseño del autoencoder



#### Clase Sampling

Para implementar el muestreo se implementa la clase Sampling, la cual se  derivada de tf.keras.layers.Layer

#### Entrada

Los datos de MNIST entran como una areglo numérico de tamaño $28\times 28 = 748$, de valores en el intervalo $[0,1]$. Nótese que se usa el tipo *float32*. Se hace esto porque la implementacion de tf.keras de Tensorflow 2.XX es basada en este tipo de datos.

#### Capas intermedias
Se asume una capa intermedia de dimension 64.

#### Espacio Latente
La dimensión del espacio latente será 32.

#### Entrenamiento
se implementa 2 epochs y lotes de tamaño 64 para ser pasados al optimizador

#### Encoder 

El encoder tiene la siguiente estructura de grafo.

<figure>
<center>
<img src="../Imagenes/vae_encoder_dense_minist.png" width="500" height="500" align="center"/>
</center>
<figcaption>
<p style="text-align:center">Autoencoder Variacional: Encoder</p>
</figcaption>
</figure>

#### Decoder

<figure>
<center>
<img src="../Imagenes/vae_decoder_dense_minist.png" width="400" height="400" align="center"/>
</center>
<figcaption>
<p style="text-align:center">Autoencoder Variacional: Decoder</p>
</figcaption>
</figure>

## Modelo API funcional de Keras

In [None]:
# Imported modules
import tensorflow as tf
from tensorflow.keras.layers import Input, Dense, Layer
from tensorflow.keras.models import Model
from tensorflow.keras.utils import plot_model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import MeanSquaredError


# Net parameters
original_dim = 784
intermediate_dim = 64
latent_dim = 32


# Train parameters
epochs = 3
batch_size=64


#  Sampling layer
class Sampling(Layer):
    """Uses (z_mean, z_log_var) to sample z, the vector encoding a digit."""
    
    def __init__(self,name=None):
        super(Sampling,self).__init__(name=name)
    
    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = tf.random.normal(shape=(batch,dim))
        return z_mean + tf.exp(0.5*z_log_var)*epsilon
   

original_inputs = Input(shape=(original_dim,),name='encoder_input')
x = Dense(intermediate_dim, activation ='relu',name='intermediate_layer')(original_inputs)
z_mean = Dense(latent_dim, name='z_mean')(x)
z_log_var = Dense(latent_dim, name='z_log_var')(x)
z = Sampling(name='z_sample')([z_mean, z_log_var])
encoder = Model(inputs=original_inputs, outputs=z, name='encoder')

encoder.summary()
plot_model(encoder, to_file='./Images/vae_encoder_dense_minist.png', 
           show_shapes=True)


# Define decoder model
latent_inputs = Input(shape=(latent_dim,), name='z_sample')
x = Dense(intermediate_dim,activation='relu')(latent_inputs)
outputs = Dense(original_dim, activation='sigmoid')(x)
decoder = Model(inputs=latent_inputs, outputs=outputs, name='decoder')

decoder.summary()
plot_model(decoder, to_file='./Images/vae_decoder_dense_minist.png', 
           show_shapes=True)


# Define VAE model
outputs = decoder(z)
vae = Model(inputs=original_inputs,outputs=outputs, name='vae_model')


# Add KL divergencia regularization loss
kl_loss = -0.5*tf.reduce_mean(z_log_var - tf.square(z_mean) - tf.exp(z_log_var) +1 )
vae.add_loss(kl_loss)


# Compile
optimizer = Adam(learning_rate=1e-3)
loss_fn = MeanSquaredError()
vae.compile(optimizer= optimizer, loss = loss_fn)


# Data
(x_train, _), _ = tf.keras.datasets.mnist.load_data()
x_train = x_train.reshape(60000, 784).astype("float32") / 255


# Train
vae.fit(x_train, x_train, epochs = epochs, batch_size=batch_size)


# summary
vae.summary()
plot_model(vae, to_file='./Images/vae_minist.png', show_shapes=True)


## Modelo Orientado a Objetos 

En este caso Sampling Encoder y  Decoder son clases derivadas de tf.keras.layers.Layer. El modelo es una clase derivada  de f.keras.models.Model

In [None]:
# Imports
import tensorflow as tf
from tensorflow.keras.layers import Dense, Layer
from tensorflow.keras.models import Model
from tensorflow.keras.utils import plot_model

# Sampling
class Sampling(Layer):
    """Uses (z_mean, z_log_var) to sample z, the vector encoding a digit."""
    
    def __init__(self,name=None):
        super(Sampling,self).__init__(name=name)
    
    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = tf.random.normal(shape=(batch,dim))
        return z_mean + tf.exp(0.5*z_log_var)*epsilon

# Encoder
class Encoder(Layer):
    """Maps MNIST digits to a triplet (z_mean, z_log_var, z)."""
    
    def __init__(self, latent_dim=32, intermediate_dim=64, name='encoder', **kwargs):
        super(Encoder, self).__init__(name=name, **kwargs) 
        self.dense_proj = Dense(intermediate_dim, activation='relu', name='intermediate_layer')
        self.dense_mean = Dense(latent_dim, name='z_mean')
        self.dense_log_var = Dense(latent_dim, name='z_var_log')
        self.sampling = Sampling('z_sample')
        
    def call(self, inputs):
        x = self.dense_proj(inputs)
        z_mean = self.dense_mean(x)
        z_log_var = self.dense_log_var(x)
        z = self.sampling([z_mean,z_log_var])
        return z_mean, z_log_var, z
    

# Decoder
class Decoder(Layer):
    """Converts z, the encoded digit vector, back into a readable digit."""
    
    def __init__(self, original_dim, intermediate_dim=64, name='decoder', **kwargs):
        super(Decoder, self).__init__(name=name, **kwargs)
        self.dense_proj = Dense(intermediate_dim, activation='relu', 
                                name='intermediate_layer')
        self.dense_output = Dense(original_dim, activation='sigmoid',
                                  name='reconstruction_layer')

    def call(self, inputs):
        x= self.dense_proj(inputs)
        return self.dense_output(x)
    

# Model
class VariationalAutoEncoder(Model):
    def __init__(
        self,
        original_dim, intermediate_dim=64, latent_dim=32,
        name='autoencoder', **kwargs):
        
        super(VariationalAutoEncoder, self).__init__(name=name, **kwargs)
        self.original_dim = original_dim
        self.encoder = Encoder(latent_dim = latent_dim, intermediate_dim=intermediate_dim)
        self.decoder = Decoder(original_dim, intermediate_dim=intermediate_dim)
            
    def call(self, inputs):
        z_mean, z_log_var, z = self.encoder(inputs)
        reconstructed = self.decoder(z)
        kl_batch = -0.5 * tf.reduce_sum(1 + z_log_var -
                                 tf.square(z_mean) -
                                 tf.exp(z_log_var), axis=-1)
        self.add_loss(tf.reduce_mean(kl_batch))
        return reconstructed

    
# Compilation

#Instance the model
vae = VariationalAutoEncoder(784,64,32)

# Optimizer function
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)

# loss function
loss = tf.keras.losses.MeanSquaredError()

# compile
vae.compile(optimizer=optimizer, loss = loss)  


# Training 

# Net parameters
original_dim = 784
intermediate_dim = 64
latent_dim = 32


# Train parameters
epochs = 3
batch_size=64

# Data
(x_train, _), _ = tf.keras.datasets.mnist.load_data()
x_train = x_train.reshape(60000, 784).astype("float32") / 255

# 

# Run training
vae.fit(x_train, x_train, epochs=epochs, batch_size=batch_size)


# Extract latent vectors
z=vae.encoder(x_train[:10,]) # (z_mean, z_log_var, z)
z_sample = z[2]

#plots of latent vectors

# Simple training loop (additional)

In [None]:
original_dim = 784
vae = VariationalAutoEncoder(original_dim, 64, 32)

optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)
mse_loss_fn = tf.keras.losses.MeanSquaredError()

loss_metric = tf.keras.metrics.Mean()

(x_train, _), _ = tf.keras.datasets.mnist.load_data()
x_train = x_train.reshape(60000, 784).astype("float32") / 255

train_dataset = tf.data.Dataset.from_tensor_slices(x_train)
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(64)

epochs = 2

# Iterate over epochs
for epoch in range(epochs):
    print('Start of epoch %d' % (epoch,))
    
    # Iterate over the batches of the dataset
    for step, x_batch_train in enumerate(train_dataset):
        with tf.GradientTape() as tape:
            reconstructed = vae(x_batch_train)
            # Compute reconstruction loss
            loss = mse_loss_fn(x_batch_train, reconstructed)
            loss +=sum(vae.losses) # Add KLD regularization loss
        
        grads = tape.gradient(loss, vae.trainable_weights)
        optimizer.apply_gradients(zip(grads, vae.trainable_weights))
    
        loss_metric(loss)
    
        if step % 100 == 0:
            print('step %d: mean loss = %4f' % (step, loss_metric.result()))
    