#Variational AutoEncoder


Basado en https://keras.io/examples/generative/vae/

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import matplotlib.pyplot as plt

In [None]:
# El primer paso será definir una función que reciba como parámetros las medias
# y los logaritmos de las varianzas de la distribución latente, y devuelva un tensor
# que representa las muestras obtenidas de la distribución latente.

def sampling(z_mean, z_log_var):
    # Obtiene el tamaño del lote (batch) y la dimensión de z_mean
    lote = tf.shape(z_mean)[0]
    dim = tf.shape(z_mean)[1]

    # Genera un tensor epsilon a partir de una distribución normal estándar
    epsilon = tf.keras.backend.random_normal(shape=(lote, dim))

    # Realiza el muestreo utilizando la fórmula z = z_mean + exp(0.5 * z_log_var) * epsilon
    # Esto forma parte del proceso de reparametrización para entrenar la red de manera diferenciable
    return z_mean + tf.exp(0.5 * z_log_var) * epsilon

In [None]:
#Construimos el módulo ENCODER
# Definimos la dimensión latente del VAE
latent_dim = 2

# Creamos las entradas del codificador, que son imágenes de tamaño 28x28 con un canal de escala de grises
encoder_inputs = keras.Input(shape=(28, 28, 1))
# Capa de convolución 2D con 32 filtros, tamaño del filtro 3x3, activación ReLU
x = layers.Conv2D(32, 3, activation="relu", strides=2, padding="same")(encoder_inputs)
# Capa de convolución 2D con 64 filtros, tamaño del filtro 3x3, activación ReLU
x = layers.Conv2D(64, 3, activation="relu", strides=2, padding="same")(x)
# Capa de convolución 2D con 64 filtros, tamaño del filtro 3x3, activación ReLU
x = layers.Conv2D(64, 3, activation="relu", strides=1, padding="same")(x)
# Aplicamos la capa Flatten para convertir el tensor a un vector 1D
x = layers.Flatten()(x)
# Capa totalmente conectada con 16 neuronas y activación ReLU
x = layers.Dense(16, activation="relu")(x)
# Capa de salida para las medias de la distribución latente
z_mean = layers.Dense(latent_dim, name="z_mean")(x)
# Capa de salida para los logaritmos de las varianzas de la distribución latente
z_log_var = layers.Dense(latent_dim, name="z_log_var")(x)
# Utilizamos la función de muestreo definida previamente para obtener la muestra z de la distribución latente
z = sampling(z_mean, z_log_var)
# Creamos el modelo del codificador con las entradas y salidas definidas
encoder = keras.Model(encoder_inputs, [z_mean, z_log_var, z], name="encoder")
encoder.summary()


In [None]:
#Construimos el módulo DECODER


# Creamos las entradas para el decodificador, que son muestras de la distribución latente
latent_inputs = keras.Input(shape=(latent_dim,))
# Capa totalmente conectada para expandir la dimensión
x = layers.Dense(7 * 7 * 64, activation="relu")(latent_inputs)
# Reshape para obtener un tensor 3D (7x7x64)
x = layers.Reshape((7, 7, 64))(x)
# Capa de convolución transpuesta para aumentar las dimensiones
x = layers.Conv2DTranspose(64, 3, activation="relu", strides=2, padding="same")(x)
# Otra capa de convolución transpuesta
x = layers.Conv2DTranspose(64, 3, activation="relu", strides=1, padding="same")(x)
# Capa de convolución transpuesta adicional
x = layers.Conv2DTranspose(32, 3, activation="relu", strides=2, padding="same")(x)
# Capa de salida con una sola dimensión y activación sigmoidal para obtener valores entre 0 y 1
decoder_outputs = layers.Conv2DTranspose(1, 3, activation="sigmoid", padding="same")(x)
# Creamos el modelo del decodificador
decoder = keras.Model(latent_inputs, decoder_outputs, name="decoder")
decoder.summary()


In [None]:
#Definimos qué hacer para cada paso de entrenamiento
def vae_train_step(data, encoder, decoder, optimizer):
    with tf.GradientTape() as tape:
        # Pasamos los datos por el modelo del codificador para obtener las estadísticas de la distribución latente
        z_mean, z_log_var, z = encoder(data)

        # Generamos una reconstrucción a partir de las estadísticas obtenidas con el codificador
        reconstruction = decoder(z)

        # Calculamos la pérdida de reconstrucción utilizando la binary cross-entropy
        reconstruction_loss = tf.reduce_mean(
            tf.reduce_sum(
                keras.losses.binary_crossentropy(data, reconstruction),
                axis=(1, 2)
            )
        )

        # Calculamos la pérdida de regularización KL (Kullback-Leibler)
        kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
        kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))


        # Calculamos la pérdida total sumando las dos pérdidas
        total_loss = reconstruction_loss + kl_loss

    # Obtenemos los gradientes respecto a los pesos de los modelos del codificador y decodificador
    grads = tape.gradient(total_loss, encoder.trainable_weights + decoder.trainable_weights)

    # Actualizamos los pesos de los modelos utilizando el optimizador
    optimizer.apply_gradients(zip(grads, encoder.trainable_weights + decoder.trainable_weights))

    # Retorna un diccionario con las pérdidas calculadas
    return {
        "loss": total_loss,
        "reconstruction_loss": reconstruction_loss,
        "kl_loss": kl_loss,
    }

In [None]:
#Leemos el dataset de dígitos (MNIST)
#Fijaros en que no estamos utilizando las labels, problema NO supervisado.
(x_train, _), (x_test, _) = keras.datasets.mnist.load_data()
mnist_digits = np.concatenate([x_train, x_test], axis=0)
digits_escalados = np.expand_dims(mnist_digits, -1).astype("float32") / 255

#Entrenamos el modelo
from tqdm import tqdm

EPOCHS = 40
BATCH_SIZE = 2048
optimizer = keras.optimizers.Adam(learning_rate=0.0001)

# Creamos el bucle de épocas
for epoch in range(EPOCHS):
    # Inicializamos una barra de progreso para ver cómo va el entrenamiento
    pbar = tqdm(total=digits_escalados.shape[0], desc=f'Época {epoch + 1}/{EPOCHS}', unit='lote')

    # Inicializamos las pérdidas por época
    total_loss_epoch = 0.0
    reconstruction_loss_epoch = 0.0
    kl_loss_epoch = 0.0

    # Iteramos sobre los lotes
    for i in range(0, digits_escalados.shape[0], BATCH_SIZE):
        # Entrenamos el VAE y obtenemos resultados
        train_results = vae_train_step(digits_escalados[i:i + BATCH_SIZE], encoder, decoder, optimizer)

        # Actualizamos las pérdidas acumuladas por época
        total_loss_epoch += train_results["loss"]
        reconstruction_loss_epoch += train_results["reconstruction_loss"]
        kl_loss_epoch += train_results["kl_loss"]

        # Actualizamos la barra de progreso con el tamaño del lote
        pbar.update(BATCH_SIZE)

        # Mostramos las pérdidas actuales
        pbar.set_postfix(pérdida_total=f'{total_loss_epoch / ((i // BATCH_SIZE) + 1):.4f}',
                         pérdida_reconstrucción=f'{reconstruction_loss_epoch / ((i // BATCH_SIZE) + 1):.4f}',
                         pérdida_kl=f'{kl_loss_epoch / ((i // BATCH_SIZE) + 1):.4f}')

    # Cerramos la barra de progreso al final de la época
    pbar.close()



In [None]:
#Vamos a ver cómo han quedado distribuidas las muestras de nuestro dataset
#en el latent space.
def plot_label_clusters(data, labels):
    # display a 2D plot of the digit classes in the latent space
    z_mean, _, _ = encoder.predict(data)
    plt.figure(figsize=(12, 10))
    plt.scatter(z_mean[:, 0], z_mean[:, 1], c=labels)
    plt.colorbar()
    plt.xlabel("z[0]")
    plt.ylabel("z[1]")
    plt.show()


(x_train, y_train), _ = keras.datasets.mnist.load_data()
x_train = np.expand_dims(x_train, -1).astype("float32") / 255

plot_label_clusters(x_train, y_train)

In [None]:
#Vamos a provar de generar dígitos.
#Haremos un grid.
def plot_latent_space(n=10, figsize=15):
    # display a n*n 2D manifold of digits
    digit_size = 28
    scale = 3 #Para escoger este valor, miramos la figura anterior.
    figure = np.zeros((digit_size * n, digit_size * n))
    # linearly spaced coordinates corresponding to the 2D plot
    # of digit classes in the latent space
    #Cogeremos n puntos que van desde -scale hasta scale.
    #Crearemos un array con el eje horizontal y otro con el vertical.
    grid_x = np.linspace(-scale, scale, n)
    grid_y = np.linspace(-scale, scale, n)[::-1]

    for i, yi in enumerate(grid_y):
        for j, xi in enumerate(grid_x):
            z_sample = np.array([[xi, yi]])
            x_decoded = decoder.predict(z_sample)
            digit = x_decoded[0].reshape(digit_size, digit_size)
            figure[
                i * digit_size : (i + 1) * digit_size,
                j * digit_size : (j + 1) * digit_size,
            ] = digit

    plt.figure(figsize=(figsize, figsize))
    start_range = digit_size // 2
    end_range = n * digit_size + start_range
    pixel_range = np.arange(start_range, end_range, digit_size)
    sample_range_x = np.round(grid_x, 1)
    sample_range_y = np.round(grid_y, 1)
    plt.xticks(pixel_range, sample_range_x)
    plt.yticks(pixel_range, sample_range_y)
    plt.xlabel("z[0]")
    plt.ylabel("z[1]")
    plt.imshow(figure, cmap="Greys_r")
    plt.show()


plot_latent_space()

In [None]:
#Ahora probad el código anterior utilizando el dataset de fashion mnist (el de los vestidos).