In [None]:
# Para realizar el Varitional Autoencoder es necesario modificar el Autoencoder
# en el Encoder y en la loss function

# En un autoencoder, cada imagen es mapeada directamente a un punto en el latent space.
# En un Varitional Autoencoder, cada imagen es mapeada hacia una distribución
# normal multivariable al rededor del punto del latent space.

# El Encoder solo necesita mapear acada entrada a un vector de media y a un vector de vacrianza
# El Varitional Autoencoder asume que no hay correlación entre las dimensiones en
# el espacio latente.

# El encoder tomará cada imagen de entrada y la codificara en dos vectores que
# juntos definen a distribución normal multivariable en latent space.
# 1. z_mean     ---> El punto medio de la distribución.
# 2. z_log_var  ---> El logaritmo de la varianza de cada dimensión

# En el Autoencoder, que una imagen en el punto (2,2) sea bien decodificada, no
# quiere decir que el punto (2.1,2.1) muestre algo similar.
# Al realizar una distribución normal, nos aseguramos que los puntos vecinos
# produzcan imágenes similares cuando son decodificadas.

In [22]:
import tensorflow as tf
import tensorflow.keras.backend as K
from tensorflow.keras import layers, models,datasets,callbacks,losses,optimizers,metrics
import numpy as np
import matplotlib.pyplot as plt

from scipy.stats import norm

In [23]:
# Cargar los datos
(x_train, y_train), (x_test, y_test) = datasets.fashion_mnist.load_data()

# Preprocess the data

def preprocess(imgs):
  # Normalizar y reajustar las imágenes
  igs= imgs.astype("float32")/255.0
  imgs = np.pad(imgs, ((0,0),(2,2),(2,2)), constant_values=0.0)
  imgs = np.expand_dims(imgs, -1)
  return imgs

x_train = preprocess(x_train)
x_test = preprocess(x_test)

display(x_train)

array([[[[0],
         [0],
         [0],
         ...,
         [0],
         [0],
         [0]],

        [[0],
         [0],
         [0],
         ...,
         [0],
         [0],
         [0]],

        [[0],
         [0],
         [0],
         ...,
         [0],
         [0],
         [0]],

        ...,

        [[0],
         [0],
         [0],
         ...,
         [0],
         [0],
         [0]],

        [[0],
         [0],
         [0],
         ...,
         [0],
         [0],
         [0]],

        [[0],
         [0],
         [0],
         ...,
         [0],
         [0],
         [0]]],


       [[[0],
         [0],
         [0],
         ...,
         [0],
         [0],
         [0]],

        [[0],
         [0],
         [0],
         ...,
         [0],
         [0],
         [0]],

        [[0],
         [0],
         [0],
         ...,
         [0],
         [0],
         [0]],

        ...,

        [[0],
         [0],
         [0],
         ...,
         [0],


In [24]:
# 1. Neceistamos crear un nuevo tipo de Sampling layer (Muestreo) que nos permitirá
# muestrear desde la distribución definida por el z_mean y el z_log_var

# Creamos una nueva capa "Subclassing the Layer Class"
class Sampling(layers.Layer):
  def call(self, inputs):
    z_mean, z_log_var = inputs
    batch = tf.shape(z_mean)[0]
    dim = tf.shape(z_mean)[1]
    epsilon = K.random_normal(shape=(batch, dim))
    return z_mean + tf.exp(0.5*z_log_var)*epsilon # Usamos el truco de la
    # subparametrización para construir una muestra desde la distribución
    # normal parametrizada con z_mean y z_log_var

# --- Subclassig the layer class ---
# Se pueden crear nuevas capas de Keras subclassing the abstract Layer class y
# definiendo el método Call, que describe cómo un tensor es transformado por la capa

# Por ejemplo, en un AutoEncoder varicional, podemos crear un capara de muestreo
# que puede manejar el muestreo de z desde una distribución normal con
# parámetros definidos por z_mean y z_log_var.

# --- The Reparameterization Trick ---
# En lugar de muestrear directamente desde una distribución normal con parámetros
# z_mean y z_log_var, podemos tomar muestras epsilon desde una Normal Standar y
# entonces, ajustar manualmente la muestra para obtener la media y varainza correctas


In [25]:
# Para completar el encoder, incluímos la nueva Sampling Layer
encoder_input = layers.Input(
    shape=(32,32,1), name="encoder_input")
x = layers.Conv2D(32,(3,3),strides=2,activation="relu",padding='same')(encoder_input)
x = layers.Conv2D(64,(3,3),strides=2,activation='relu',padding='same')(x)
x = layers.Conv2D(128,(3,3),strides=2,activation='relu',padding='same')(x)
shape_before_flattening=K.int_shape(x)[1:]

x = layers.Flatten()(x)
# En lugar de conectar directamente la capa Flatten a el espacio latente 2D,
# la conectamos a las capas z_mean y z_log_var
z_mean = layers.Dense(2,name='z_mean')(x)
z_log_var = layers.Dense(2,name='z_log_var')(x)
# La capa Smapling muestrea un un punto z en el espacio latente desde la distribución
# normal definida por los parámetros z_mean y z_log_var
z = Sampling()([z_mean,z_log_var])

# El modelo de Kera que define el encoder: Un modelo que toma una imagen de entrada
# y las salidas z_mean, z_log_var, y los puntos muestreados de la distribución normmal
# definida con esos parámetros
encoder = models.Model(encoder_input, [z_mean, z_log_var,z], name='encoder')

encoder.summary()

Model: "encoder"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 encoder_input (InputLayer)     [(None, 32, 32, 1)]  0           []                               
                                                                                                  
 conv2d_3 (Conv2D)              (None, 16, 16, 32)   320         ['encoder_input[0][0]']          
                                                                                                  
 conv2d_4 (Conv2D)              (None, 8, 8, 64)     18496       ['conv2d_3[0][0]']               
                                                                                                  
 conv2d_5 (Conv2D)              (None, 4, 4, 128)    73856       ['conv2d_4[0][0]']               
                                                                                            

In [26]:
# DECODER
decoder_input = layers.Input(shape=(2,),name="decoder_input")
x = layers.Dense(np.prod(shape_before_flattening))(decoder_input)
x = layers.Reshape(shape_before_flattening)(x)
x = layers.Conv2DTranspose(128,(3,3),strides=2,activation="relu",padding="same")(x)
x = layers.Conv2DTranspose(64,(3,3),strides=2,activation="relu",padding="same")(x)
x = layers.Conv2DTranspose(32,(3,3),strides=2,activation="relu",padding="same")(x)
decoder_output=layers.Conv2D(1,(3,3),strides=1,activation="sigmoid",padding="same",
                             name="decoder_output")(x)
decoder=models.Model(decoder_input, decoder_output)
decoder.summary()

Model: "model_2"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 decoder_input (InputLayer)  [(None, 2)]               0         
                                                                 
 dense_4 (Dense)             (None, 2048)              6144      
                                                                 
 reshape_4 (Reshape)         (None, 4, 4, 128)         0         
                                                                 
 conv2d_transpose_10 (Conv2D  (None, 8, 8, 128)        147584    
 Transpose)                                                      
                                                                 
 conv2d_transpose_11 (Conv2D  (None, 16, 16, 64)       73792     
 Transpose)                                                      
                                                                 
 conv2d_transpose_12 (Conv2D  (None, 32, 32, 32)       1846

In [27]:
# La función de Pérdida

# Nuestra función de pérdida requiere un componente extra:
# the Kullback-Leibler (KL) divergence term

# KL divergence es una manera de medir qué tanto una distribución de probabilidad
# difiere de otra. En un VAE, queremos medir qué tanto nuestra distribución normal
# con parámetros z_mean y z_log_var difiere de una distribución normal estándar.
# En este casi, se puede mostrar de esta forma
# kl_loss = -0.5 * sum(1+z_log_var - z_mean^2 - exp(z_log_var))
# En resumen, la divergencia KL penaliza a la red por codificar observaciones a
# z_mean y z_log_var que difieren significativamente de los parámetros de la
# distribución estándar normal ---> z_mean=0 y z_log_var=0


In [28]:
class VAE(models.Model):
  def __init__(self,encoder,decoder, **kwargs):
    super(VAE,self).__init__(**kwargs)
    self.encoder = encoder
    self.decoder = decoder
    self.total_loss_tracker = metrics.Mean(name="total_loss")
    self.reconstruction_loss_tracker = metrics.Mean(
        name="reconstruction_loss"
    )
    self.kl_loss_tracker = metrics.Mean(name="kl_loss")

  @property
  def metrics(self):
    return[self.total_loss_tracker,
           self.reconstruction_loss_tracker,
           self.kl_loss_tracker]

  # Esta función describe lo que nos gustaría que devolviese lo que llamamos
  # VAE en una imagen de entrada en particular
  def call(self,inputs):
    z_mean,z_log_var,z=encoder(inputs)
    reconstruction=decoder(z)
    return z_mean, z_log_var, reconstruction

  # Esta función describe un paso de entrenamiento del VAE, incluyenndo el
  # cálculo de la loss function
  def train_step(self,data):
    with tf.GradientTape() as tape:
      z_mean, z_log_var, reconstruction = self(data)
      reconstruction_loss = tf.reduce_mean(
          500*losses.binary_crossentropy(data,reconstruction,axis=(1,2,3))
      )# Se utiliza un valor beta de 500 en la loss reconstruction
      kl_loss=tf.reduce_mean(
          tf.reduce_sum(-0.5*(1+z_log_var-tf.square(z_mean)-tf.exp(z_log_var)),
                        axis=1))
      # La pérdida total es la suma de la pérdida reconstrucción de pérdida y
      # la pérdida de divergencia KL
      total_loss = reconstruction_loss + kl_loss

    grads = tape.gradient(total_loss, self.trainable_weights)
    self.optimizer.apply_gradients(zip(grads, self.trainable_weights))

    self.total_loss_tracker.update_state(total_loss)
    self.reconstruction_loss_tracker.update_state(reconstruction_loss)
    self.kl_loss_tracker.update_state(kl_loss)

    return {m.name:m.result() for m in self.metrics}
    # Gradient Tape es un mecanismo que permite el cómputo de gradiente de operación,
    # ejecutados durante el paso hacia delante del modelo.

def test_step(self,data):
  """Step run during validation."""
  if isinstance(data,tuple):
    data = data[0]

  z_mean, z_log_var, reconstruction = self(data)
  reconstruction_loss = tf.reduce_mean(500*losses.binary_crossentropy(data,
                                                                      reconstruction,axis=(1,2,3)))
  kl_loss = tf.reduce_mean(tf.reduce_sum(-0.5*(1+z_log_var-tf.square(z_mean)-tf.exp(z_log_var)),
                                         axis=1))
  total_loss = reconstruction_loss + kl_loss

  return{
      "loss":total_loss,
      "reconstruction_loss": reconstruction_loss,
      "kl_loss": kl_loss
  }



In [29]:
# Compilar el VAE
vae = VAE(encoder, decoder)
optimizer = optimizers.Adam(learning_rate=0.0005)
vae.compile(optimizer=optimizer)

In [30]:
# Crear un checkpoint de guardado del modelo
model_checkpoint_callback = callbacks.ModelCheckpoint(
    filepath="./checkpoint",
    save_weights_only=False,
    save_freq="epoch",
    monitor="loss",
    mode="min",
    save_best_only=True,
    verbose=0
)
tensorboard_callback = callbacks.TensorBoard(log_dir="./logs")

In [31]:
# Entrenar el VAE
vae.fit(x_train,
        epochs=10,
        batch_size=100,
        shuffle=True,
        validation_data=(x_test,x_test),
        callbacks=[model_checkpoint_callback, tensorboard_callback])

Epoch 1/10



Epoch 2/10



Epoch 3/10



Epoch 4/10



Epoch 5/10



Epoch 6/10



Epoch 7/10



Epoch 8/10



Epoch 9/10



Epoch 10/10





<keras.callbacks.History at 0x7acb56f1b130>

In [32]:
# Guardar el modelo final
vae.save("./models/vae")
encoder.save("./models/encoder")
decoder.save("./models/decoder")



In [33]:
# Reconstruir usando el VAE

# Seleccionar un subset del test set
n_to_predict = 5000
example_images = x_test[:n_to_predict]
example_labels = y_test[:n_to_predict]

In [35]:
# Crear predicciones del autoencoder y mostrarlas


z_mean, z_log_var, reconstructions = vae.predict(example_images)
print("Example real clothing items")
display(example_images)
print("Reconstructions")
display(reconstructions)

Example real clothing items


array([[[[ 0],
         [ 0],
         [ 0],
         ...,
         [ 0],
         [ 0],
         [ 0]],

        [[ 0],
         [ 0],
         [ 0],
         ...,
         [ 0],
         [ 0],
         [ 0]],

        [[ 0],
         [ 0],
         [ 0],
         ...,
         [ 0],
         [ 0],
         [ 0]],

        ...,

        [[ 0],
         [ 0],
         [ 0],
         ...,
         [ 0],
         [ 0],
         [ 0]],

        [[ 0],
         [ 0],
         [ 0],
         ...,
         [ 0],
         [ 0],
         [ 0]],

        [[ 0],
         [ 0],
         [ 0],
         ...,
         [ 0],
         [ 0],
         [ 0]]],


       [[[ 0],
         [ 0],
         [ 0],
         ...,
         [ 0],
         [ 0],
         [ 0]],

        [[ 0],
         [ 0],
         [ 0],
         ...,
         [ 0],
         [ 0],
         [ 0]],

        [[ 0],
         [ 0],
         [ 0],
         ...,
         [ 0],
         [ 0],
         [ 0]],

        ...,

        [[ 0],
 

Reconstructions


array([[[[nan],
         [nan],
         [nan],
         ...,
         [nan],
         [nan],
         [nan]],

        [[nan],
         [nan],
         [nan],
         ...,
         [nan],
         [nan],
         [nan]],

        [[nan],
         [nan],
         [nan],
         ...,
         [nan],
         [nan],
         [nan]],

        ...,

        [[nan],
         [nan],
         [nan],
         ...,
         [nan],
         [nan],
         [nan]],

        [[nan],
         [nan],
         [nan],
         ...,
         [nan],
         [nan],
         [nan]],

        [[nan],
         [nan],
         [nan],
         ...,
         [nan],
         [nan],
         [nan]]],


       [[[nan],
         [nan],
         [nan],
         ...,
         [nan],
         [nan],
         [nan]],

        [[nan],
         [nan],
         [nan],
         ...,
         [nan],
         [nan],
         [nan]],

        [[nan],
         [nan],
         [nan],
         ...,
         [nan],
         [