# **Pokémon Diffusion<a id="top"></a>**

> #### ``02-Diffusion-Model.ipynb``

<i><small>**Alumno:** Alejandro Pequeño Lizcano<br>Última actualización: 18/03/2024</small></i></div>

TODO: introducir el modelo

# 0. Imports

Una vez introducido el proyecto, se importan las librerías necesarias para el desarrollo de este apartado.

---

In [1]:
# Import necessary libraries
# =====================================================================

# Import libraries for data preprocessing
import configparser
import tensorflow as tf
from tensorflow.keras.utils import plot_model
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras import layers

# Import src code
from src.data import path_loader as pl
from src.data import create_dataset
from src.visualization import visualize
from src.model.diffusion import DiffusionModel
from src.utils.utils import PROJECT_DIR

2024-03-22 22:03:11.361995: I tensorflow/core/util/port.cc:113] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-03-22 22:03:11.393988: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [11]:
# Hyperparameters
# =====================================================================
config = configparser.ConfigParser()
config.read(PROJECT_DIR / "config.ini")
config_hp = config["hyperparameters"]

IMG_SIZE = int(config_hp["img_size"])
NUM_CLASSES = int(config_hp["num_classes"])

# Modelo de Difusión

TODO: Introducir mejor

El modelo usado para la predicción del ruido. Según muchos papers, se podría usar cualquier red neuronal, ya que no existe una arquitectura específica y depende del conjunto de datos con el que se entrene, no obstante, la más usada para la síntesis de imágenes y la que se ha usado en este proyecto es la arquitectura **U-Net** por sus características de recuperación de la información manteniendo la dimensionalidad de la imagen.

Esta arquitectura se caracteriza por tener una parte de codificación y una parte de decodificación. La parte de codificación se encarga de reducir la dimensionalidad de la imagen de entrada y la parte de decodificación se encarga de reconstruir la imagen original a partir de la imagen codificada.

Podemos encontrar dos funciones, por una parte, ``block()``, que se encarga de definir el bloque de difusión y, por otra parte, ``build_ddpm_model()``, que se encarga de definir la arquitectura **U-Net**.

- ``block()``: El bloque de difusión contiene tres parámetros:

    - ``x_parameter``: es el tensor de entrada que contiene la imagen original o la imagen con ruido en cada paso de difusión.
    - ``time_parameter`` es el tensor que indica el paso de difusión en el que nos encontramos. Se usa para calcular el valor de **$\beta$** según el *scheduler* que hayamos elegido.

    Dentro del bloque de difusión, se aplican transformaciones a cada uno de estos parámetros, lo que puede incluir capas densas, normalización y activación ReLU. Estas transformaciones capturan las relaciones y dependencias entre los diferentes aspectos de la entrada (imagen y tiempo). Finalmente, se calcula la imagen nueva con el ruido añadido.

-  El proceso de difusión utiliza una arquitectura de tipo **U-Net** modificada con bloques ``block`` que toman en cuenta la imagen (``x_parameter``) y el tensor tiempo (``time_parameter``). Posteriormente, se realizan operaciones de convolución y pooling para reducir la resolución de la imagen mientras se procesa la información temporal. Luego, se realiza un proceso de decodificación utilizando operaciones de upsampling y concatenación para generar una imagen de salida que tiene la misma resolución que la imagen de entrada. Después de este proceso, se añade una capa **MLP** para procesar la información temporal y generar una imagen de salida. Finalmente, se devuelve la imagen de salida.


---
<i><small>**Más infromación** sobre el porqué matemático de la función de pérdida, aunque ya explicado, se puede encontrar en el paper [Denoising Diffusion Probabilistic Models](https://arxiv.org/abs/2006.11239) y una explicación más clara en la página [Diffusion Model Clearly Explained!](https://medium.com/@steinsfu/diffusion-model-clearly-explained-cd331bd41166).

<span style="color: red; font-size: 1.5em;">&#9888;</span> **NOTA:** El proceso matemático para llegar a esta fórmula es muy complejo para explicarlo en este notebook. Sin embargo, en el futuro informe se explicará con más detalle. 
</small></i>

In [None]:
# Create a tensorboard callback for monitoring training progress
tensorboard_callback = tf.keras.callbacks.TensorBoard(
    log_dir="./logs", histogram_freq=1
)

# Compile the model
model = build_ddpm_model()
model.compile(optimizer="adam", loss="mse", metrics=["accuracy"])

# Train the model
model.fit(x_train, y_train, epochs=10, callbacks=[tensorboard_callback])

# Evaluate the model
loss, accuracy = model.evaluate(x_test, y_test)
print("Test Loss:", loss)
print("Test Accuracy:", accuracy)

In [None]:
def block(x_img: tf.Tensor, x_ts: tf.Tensor, x_label: tf.Tensor) -> tf.Tensor:
    """The block of the diffusion model

    :param x_img: The image to process
    :param x_ts: The time steps to process
    :param x_label: The label to process
    :return: The processed image
    """

    x_parameter = layers.Conv2D(128, kernel_size=3, padding="same")(x_img)
    x_parameter = layers.Activation("relu")(x_parameter)

    time_parameter = layers.Dense(128)(x_ts)
    time_parameter = layers.Activation("relu")(time_parameter)
    time_parameter = layers.Reshape((1, 1, 128))(time_parameter)

    label_parameter = layers.Dense(128)(x_label)
    label_parameter = layers.Activation("relu")(label_parameter)
    label_parameter = layers.Reshape((1, 1, 128))(label_parameter)

    x_parameter = x_parameter * label_parameter + time_parameter

    # -----
    x_out = layers.Conv2D(128, kernel_size=3, padding="same")(x_img)
    x_out = x_out + x_parameter
    x_out = layers.LayerNormalization()(x_out)
    x_out = layers.Activation("relu")(x_out)

    return x_out

2. El proceso de difusión utiliza una arquitectura de tipo **U-Net** modificada con bloques ``block`` que toman en cuenta la imagen (``x_parameter``), el tensor tiempo (``time_parameter``) y la etiqueta (``label_parameter``). Posteriormente, se realizan operaciones de convolución y pooling para reducir la resolución de la imagen mientras se procesa la información temporal y de etiqueta. Luego, se realiza un proceso de decodificación utilizando operaciones de upsampling y concatenación para generar una imagen de salida que tiene la misma resolución que la imagen de entrada. Después de este proceso, se añade una capa **MLP** para procesar la información temporal y de etiqueta y generar una imagen de salida. Finalmente, se devuelve la imagen de salida.

In [None]:
def build_ddpm_model() -> tf.keras.models.Model:
    """Creates the diffusion model

    :return: The diffusion model
    """

    x = x_input = layers.Input(shape=(IMG_SIZE, IMG_SIZE, 3), name="x_input")

    x_ts = x_ts_input = layers.Input(shape=(1,), name="x_ts_input")
    x_ts = layers.Dense(192)(x_ts)
    x_ts = layers.LayerNormalization()(x_ts)
    x_ts = layers.Activation("relu")(x_ts)

    x_label = x_label_input = layers.Input(shape=(NUM_CLASSES,), name="x_label_input")
    x_label = layers.Dense(192)(x_label)
    x_label = layers.LayerNormalization()(x_label)
    x_label = layers.Activation("relu")(x_label)

    # ----- left ( down ) -----
    x = x64 = block(x, x_ts, x_label)
    x = layers.MaxPool2D(2)(x)

    x = x32 = block(x, x_ts, x_label)
    x = layers.MaxPool2D(2)(x)

    x = x16 = block(x, x_ts, x_label)
    x = layers.MaxPool2D(2)(x)

    x = x8 = block(x, x_ts, x_label)

    # ----- MLP -----
    x = layers.Flatten()(x)
    x = layers.Concatenate()([x, x_ts, x_label])
    x = layers.Dense(128)(x)
    x = layers.LayerNormalization()(x)
    x = layers.Activation("relu")(x)

    x = layers.Dense(8 * 8 * 32)(x)
    x = layers.LayerNormalization()(x)
    x = layers.Activation("relu")(x)
    x = layers.Reshape((8, 8, 32))(x)

    # ----- right ( up ) -----
    x = layers.Concatenate()([x, x8])
    x = block(x, x_ts, x_label)
    x = layers.UpSampling2D(2)(x)

    x = layers.Concatenate()([x, x16])
    x = block(x, x_ts, x_label)
    x = layers.UpSampling2D(2)(x)

    x = layers.Concatenate()([x, x32])
    x = block(x, x_ts, x_label)
    x = layers.UpSampling2D(2)(x)

    x = layers.Concatenate()([x, x64])
    x = block(x, x_ts, x_label)

    # ----- output -----
    x = layers.Conv2D(3, kernel_size=1, padding="same")(x)
    model = tf.keras.models.Model([x_input, x_ts_input, x_label_input], x)
    return model

In [None]:
# Create the model
# =====================================================================
model = build_ddpm_model()

# Compile the model
# =====================================================================
optimizer = tf.keras.optimizers.Adam(learning_rate=0.0001)
loss_fn = tf.keras.losses.MeanSquaredError()
model.compile(loss=loss_fn, optimizer=optimizer)

# Show the model summary
# =====================================================================
model.summary()

In [None]:
plot_model(model, show_shapes=True, show_layer_names=True)

[BACK TO TOP](#top)