In [None]:
import os, glob, random
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing.image import load_img, img_to_array
import matplotlib.pyplot as plt
import wandb
from wandb.integration.keras import WandbCallback
import gc

# Decorador para que Hugging Face/Keras reconozcan la función
@tf.keras.utils.register_keras_serializable()
def calc_mvd(real_images, gen_images):
    """
    Calcula la distancia entre medias y desviaciones estándar (Mean-Variance Distance)
    entre un conjunto de imágenes reales y otro de imágenes generadas.
    """
    real_flat = real_images.reshape(real_images.shape[0], -1)
    gen_flat = gen_images.reshape(gen_images.shape[0], -1)
    mean_real = np.mean(real_flat, axis=0)
    std_real = np.std(real_flat, axis=0)
    mean_gen = np.mean(gen_flat, axis=0)
    std_gen = np.std(gen_flat, axis=0)
    mean_distance = np.linalg.norm(mean_real - mean_gen)
    std_distance = np.linalg.norm(std_real - std_gen)
    return mean_distance + std_distance

# Callback para loguear MVD al final de cada época
class MVDEpochCallback(tf.keras.callbacks.Callback):
    def __init__(self, real_data, decoder, latent_dim, label_prefix="Train"):
        super().__init__()
        self.real_data = real_data
        self.decoder = decoder
        self.latent_dim = latent_dim
        self.label_prefix = label_prefix

    def on_epoch_end(self, epoch, logs=None):
        # Generar muestras aleatorias en el espacio latente
        z_samples = np.random.normal(size=(len(self.real_data), self.latent_dim))
        gen_images = self.decoder.predict(z_samples)
        # Calcular MVD entre las imágenes reales y generadas
        mvd_value = calc_mvd(self.real_data, gen_images)
        # Loguear en W&B
        wandb.log({f"MVD_{self.label_prefix}": mvd_value, "epoch": epoch})


In [None]:
# Rutas base
base_path = "/content/Deep_learning/Proyecto 1 - DAE + VAE/processed_dataset"
train_path = os.path.join(base_path, "train")
val_path   = os.path.join(base_path, "val")
test_path  = os.path.join(base_path, "test")

# Obtener las rutas de las imágenes para cada categoría
train_paths_botellas = glob.glob(train_path + "/botella_de_vidrio/*.npy")
train_paths_relojes  = glob.glob(train_path + "/reloj_de_pared_circular_clasico/*.npy")

val_paths_botellas   = glob.glob(val_path + "/botella_de_vidrio/*.npy")
val_paths_relojes    = glob.glob(val_path + "/reloj_de_pared_circular_clasico/*.npy")

test_paths_botellas  = glob.glob(test_path + "/botella_de_vidrio/*.npy")
test_paths_relojes   = glob.glob(test_path + "/reloj_de_pared_circular_clasico/*.npy")

# Cargar las imágenes para cada categoría
X_train_botellas = np.array([np.load(p) for p in train_paths_botellas])
X_train_relojes  = np.array([np.load(p) for p in train_paths_relojes])

X_val_botellas   = np.array([np.load(p) for p in val_paths_botellas])
X_val_relojes    = np.array([np.load(p) for p in val_paths_relojes])

X_test_botellas  = np.array([np.load(p) for p in test_paths_botellas])
X_test_relojes   = np.array([np.load(p) for p in test_paths_relojes])

In [None]:
print("Botellas - Train:", X_train_botellas.shape, "Val:", X_val_botellas.shape, "Test:", X_test_botellas.shape)
print("Relojes - Train:", X_train_relojes.shape, "Val:", X_val_relojes.shape, "Test:", X_test_relojes.shape)

In [None]:
# Semilla para reproducibilidad
seed = 42
tf.random.set_seed(seed)
np.random.seed(seed)
random.seed(seed)

# Definir tamaño de imagen
IMG_SIZE = 256  # Ajusta según tu dataset

In [None]:
from tensorflow.keras import layers, Model, Input
import tensorflow.keras.backend as K

class VAE(Model):
    def __init__(self, encoder, decoder, img_size=IMG_SIZE, **kwargs):
        super().__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
        self.img_size = img_size
        self.loss_tracker = tf.keras.metrics.Mean(name="loss")
        self.reconstruction_loss_tracker = tf.keras.metrics.Mean(name="reconstruction_loss")
        self.kl_loss_tracker = tf.keras.metrics.Mean(name="kl_loss")

    @property
    def metrics(self):
        return [self.loss_tracker, self.reconstruction_loss_tracker, self.kl_loss_tracker]

    def vae_loss(self, y_true, y_pred, z_mean, z_log_var):
        reconstruction_loss = tf.keras.losses.mse(K.flatten(y_true), K.flatten(y_pred))
        reconstruction_loss *= self.img_size * self.img_size * 3
        kl_loss = 1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var)
        kl_loss = -0.5 * tf.reduce_mean(kl_loss)
        return reconstruction_loss + kl_loss

    def train_step(self, data):
        if isinstance(data, tuple):
            data = data[0]

        with tf.GradientTape() as tape:
            z_mean, z_log_var, z = self.encoder(data)
            reconstruction = self.decoder(z)
            loss = self.vae_loss(data, reconstruction, z_mean, z_log_var)

        gradients = tape.gradient(loss, self.trainable_variables)
        self.optimizer.apply_gradients(zip(gradients, self.trainable_variables))
        self.loss_tracker.update_state(loss)

        # Aquí, para no sobreescribir kl_loss, podrías calcularlo aparte si lo quisieras exacto.
        # Pero en este ejemplo, se actualiza la reconstrucción y KL por separado de forma aproximada.
        # Lo importante es que en logs se vea la métrica final.
        recon_loss_value = reconstruction_loss = tf.keras.losses.mse(
            K.flatten(data), K.flatten(reconstruction)
        ) * (self.img_size * self.img_size * 3)
        kl_value = 1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var)
        kl_value = -0.5 * tf.reduce_mean(kl_value)

        self.reconstruction_loss_tracker.update_state(recon_loss_value)
        self.kl_loss_tracker.update_state(kl_value)

        return {
            "loss": self.loss_tracker.result(),
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result(),
        }

    def call(self, inputs):
        z = self.encoder(inputs)[2]
        reconstructed = self.decoder(z)
        return reconstructed

def build_vae(latent_dim, img_size=IMG_SIZE):
    # --- Encoder ---
    encoder_inputs = Input(shape=(img_size, img_size, 3))
    x = layers.Conv2D(32, 3, strides=2, padding='same', activation='relu')(encoder_inputs)
    x = layers.BatchNormalization()(x)
    x = layers.Conv2D(64, 3, strides=2, padding='same', activation='relu')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Conv2D(128, 3, strides=2, padding='same', activation='relu')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Conv2D(256, 3, strides=2, padding='same', activation='relu')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Flatten()(x)
    x = layers.Dense(1024, activation='relu')(x)
    z_mean = layers.Dense(latent_dim, name="z_mean")(x)
    z_log_var = layers.Dense(latent_dim, name="z_log_var")(x)

    def sampling(args):
        z_mean, z_log_var = args
        epsilon = tf.random.normal(shape=tf.shape(z_mean), seed=42)
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon

    z = layers.Lambda(sampling, output_shape=(latent_dim,), name='z')([z_mean, z_log_var])
    encoder = Model(encoder_inputs, [z_mean, z_log_var, z], name="encoder")

    # --- Decoder ---
    latent_inputs = Input(shape=(latent_dim,))
    x = layers.Dense((img_size // 16) * (img_size // 16) * 256, activation='relu')(latent_inputs)
    x = layers.Reshape((img_size // 16, img_size // 16, 256))(x)
    x = layers.Conv2DTranspose(256, 3, strides=2, padding='same', activation='relu')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Conv2DTranspose(128, 3, strides=2, padding='same', activation='relu')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Conv2DTranspose(64, 3, strides=2, padding='same', activation='relu')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Conv2DTranspose(32, 3, strides=2, padding='same', activation='relu')(x)
    x = layers.BatchNormalization()(x)
    decoder_outputs = layers.Conv2D(3, 3, activation='sigmoid', padding='same')(x)

    decoder = Model(latent_inputs, decoder_outputs, name="decoder")
    vae = VAE(encoder, decoder, img_size=img_size)
    return vae, encoder, decoder

In [None]:
# Instanciar y compilar los VAEs para cada clase
LATENT_DIM = 256

# VAE para Botellas
vae_bottle, encoder_bottle, decoder_bottle = build_vae(LATENT_DIM, img_size=IMG_SIZE)
optimizer_bottle = tf.keras.optimizers.Adam(learning_rate=1e-3)
vae_bottle.compile(optimizer=optimizer_bottle, loss=tf.keras.losses.mse)

# VAE para Relojes
vae_clock, encoder_clock, decoder_clock = build_vae(LATENT_DIM, img_size=IMG_SIZE)
optimizer_clock = tf.keras.optimizers.Adam(learning_rate=1e-4)
vae_clock.compile(optimizer=optimizer_clock, loss=tf.keras.losses.mse)

vae_bottle.summary()
vae_clock.summary()

In [None]:
# Login a wandb
wandb.login()

In [None]:
# Definir ruta donde se guardarán los modelos
models_path = "/content/Deep_learning/Proyecto 1 - DAE + VAE/Models"
os.makedirs(models_path, exist_ok=True)

# Entrenamiento VAE Botellas
run_bottle = wandb.init(entity="arturo-torres-iteso", project="VAE + DAE", name="VAE_bottles_Arturo", reinit=True)

history_bottle = vae_bottle.fit(
    X_train_botellas,
    X_train_botellas,
    epochs=10,            # Ajusta las épocas
    batch_size=4,
    validation_data=(X_val_botellas, X_val_botellas),
    callbacks=[
        WandbCallback(save_model=False),
        MVDEpochCallback(X_train_botellas, decoder_bottle, LATENT_DIM, label_prefix="Bottles")
    ]
)

# Definir ruta completa para guardar el modelo de botellas
model_save_path_bottle = os.path.join(models_path, "vae_bottle_model_Arturo.keras")
vae_bottle.save(model_save_path_bottle)
wandb.save(model_save_path_bottle)
wandb.finish()

# Liberar memoria
tf.keras.backend.clear_session()
gc.collect()

# Entrenamiento VAE Relojes
run_clock = wandb.init(project="VAE + DAE", name="VAE_Clock_Arturo", reinit=True)

history_clock = vae_clock.fit(
    X_train_relojes,
    X_train_relojes,
    epochs=10,
    batch_size=4,
    validation_data=(X_val_relojes, X_val_relojes),
    callbacks=[
        WandbCallback(save_model=False),
        MVDEpochCallback(X_train_relojes, decoder_clock, LATENT_DIM, label_prefix="Clocks")
    ]
)

# Definir ruta completa para guardar el modelo de relojes
model_save_path_clock = os.path.join(models_path, "vae_clock_model_Arturo.keras")
vae_clock.save(model_save_path_clock)
wandb.save(model_save_path_clock)
wandb.finish()

tf.keras.backend.clear_session()

In [None]:
# Ejemplo de evaluación posterior con la métrica MVD en test
run_eval = wandb.init(project="VAE + DAE", name="Evaluation_MVD", reinit=True)

def evaluate_mvd(decoder, real_data, latent_dim, label_prefix):
    n_samples = len(real_data)
    z_samples = np.random.normal(size=(n_samples, latent_dim))
    gen_images = decoder.predict(z_samples)
    mvd_val = calc_mvd(real_data, gen_images)
    print(f"{label_prefix} VAE - MVD: {mvd_val:.2f}")
    wandb.log({f"MVD_{label_prefix}_test": mvd_val})

# Cargamos los modelos (si fuera un nuevo entorno, harías load_model con custom_objects)
# vae_bottle_loaded = tf.keras.models.load_model("vae_bottle_model_Arturo.keras", custom_objects={"calc_mvd": calc_mvd, "VAE": VAE})
# O en el mismo notebook simplemente reusas "decoder_bottle".

# Evaluación en test
evaluate_mvd(decoder_bottle, X_test_botellas, LATENT_DIM, "Bottle")
evaluate_mvd(decoder_clock, X_test_relojes, LATENT_DIM, "Clock")

wandb.finish()

In [None]:
# Visualización de algunas imágenes generadas
def display_generated_images(gen_images, title):
    plt.figure(figsize=(10, 5))
    n = min(5, gen_images.shape[0])
    for i in range(n):
        plt.subplot(1, n, i+1)
        plt.imshow(gen_images[i])
        plt.axis('off')
    plt.suptitle(title)
    plt.show()

# Generar imágenes con el decoder de botellas
z_samples_bottle = np.random.normal(size=(5, LATENT_DIM))
gen_images_bottle = decoder_bottle.predict(z_samples_bottle)
display_generated_images(gen_images_bottle, "Generated Images - Bottle VAE")

# Generar imágenes con el decoder de relojes
z_samples_clock = np.random.normal(size=(5, LATENT_DIM))
gen_images_clock = decoder_clock.predict(z_samples_clock)
display_generated_images(gen_images_clock, "Generated Images - Clock VAE")

# Log en W&B
run_vis = wandb.init(project="VAE + DAE", name="Generated_Images", reinit=True)
wandb.log({
    "Generated_Images_Bottle": [wandb.Image(img) for img in gen_images_bottle[:5]],
    "Generated_Images_Clock": [wandb.Image(img) for img in gen_images_clock[:5]]
})
wandb.finish()