In [3]:
# data.py
import numpy as np
import tensorflow as tf
from tensorflow.keras.datasets import mnist


def load_and_preprocess(mix: str = None, validation_split: float = 0.0833):
    """
    Carga MNIST, normaliza y genera train/val/test.
    Si mix=='MAX' o 'AVERAGE', genera imágenes superpuestas.
    """
    # Carga
    (x_train, y_train), (x_test, y_test) = mnist.load_data()
    x_train = x_train.astype('float32') / 255.0
    x_test = x_test.astype('float32') / 255.0

    # Flatten
    original_dim = x_train.shape[1] * x_train.shape[2]
    x_train = x_train.reshape(-1, original_dim)
    x_test = x_test.reshape(-1, original_dim)
    
    # One-hot
    y_train = tf.keras.utils.to_categorical(y_train)
    y_test = tf.keras.utils.to_categorical(y_test)

    # Split
    n_val = int(x_train.shape[0] * validation_split)
    x_val, y_val = x_train[-n_val:], y_train[-n_val:]
    x_train, y_train = x_train[:-n_val], y_train[:-n_val]

    # Opcional: mix
    if mix == 'MAX' or mix == 'AVERAGE':
        perm = np.random.permutation(x_train.shape[0])
        x2, y2 = x_train[perm], y_train[perm]
        if mix == 'MAX':
            x_train = np.maximum(x_train, x2)
            x_test = np.maximum(x_test, x_test[perm])
        else:
            x_train = ((x_train + x2) / 2).astype(np.float32)
            x_test = ((x_test + x_test[perm]) / 2).astype(np.float32)

    return (x_train, y_train), (x_val, y_val), (x_test, y_test)


# models.py
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, Model, Input

class Sampling(layers.Layer):
    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        eps = tf.keras.backend.random_normal(shape=(batch, dim))
        return z_mean + tf.keras.backend.exp(0.5 * z_log_var) * eps


def build_cvae(original_dim: int,
               cond_dim: int,
               latent_dim: int = 128,
               intermediate_encoder: int = 256,
               intermediate_decoder: int = 256,
               beta: float = 1.0,
               lr: float = 1e-3):
    """Construye y compila el CVAE."""
    # Encoder
    x_in = Input(shape=(original_dim,), name='x_in')
    c_in = Input(shape=(cond_dim,), name='c_in')
    concat_enc = layers.Concatenate()([x_in, c_in])
    h = layers.Dense(intermediate_encoder, activation='relu')(concat_enc)
    z_mean = layers.Dense(latent_dim, name='z_mean')(h)
    z_log_var = layers.Dense(latent_dim, name='z_log_var')(h)
    z = Sampling()([z_mean, z_log_var])
    encoder = Model([x_in, c_in], [z_mean, z_log_var, z], name='encoder')

    # Decoder
    z_in = Input(shape=(latent_dim,), name='z_in')
    c_dec = Input(shape=(cond_dim,), name='c_dec')
    concat_dec = layers.Concatenate()([z_in, c_dec])
    h_dec = layers.Dense(intermediate_decoder, activation='relu')(concat_dec)
    x_out = layers.Dense(original_dim, activation='sigmoid')(h_dec)
    decoder = Model([z_in, c_dec], x_out, name='decoder')

    # CVAE
    x, c = Input(shape=(original_dim,)), Input(shape=(cond_dim,))
    z_mean_, z_log_var_, z_ = encoder([x, c])
    x_rec = decoder([z_, c])
    cvae = Model([x, c], x_rec, name='cvae')

    # Loss
    recon_loss = tf.reduce_mean(
        tf.keras.losses.MeanSquaredError(reduction=tf.keras.losses.Reduction.NONE)(x, x_rec)
        * original_dim, axis=-1)
    kl_loss = tf.reduce_mean(
        -0.5 * tf.reduce_sum(1 + z_log_var_ - tf.square(z_mean_) - tf.exp(z_log_var_), axis=-1)
    )
    cvae.add_loss(tf.reduce_mean(recon_loss + beta * kl_loss))

    cvae.compile(optimizer=tf.keras.optimizers.Adam(lr))
    return cvae, encoder, decoder

# train.py
#from data import load_and_preprocess
#from models import build_cvae

def train_cvae(
    mix: str = None,
    latent_dim: int = 128,
    enc_dim: int = 256,
    dec_dim: int = 256,
    beta: float = 1.0,
    lr: float = 1e-3,
    epochs: int = 50,
    batch_size: int = 128,
    validation_split: float = 0.0833
):
    """
    Función para entrenar el CVAE en Colab u otro entorno interactivo.
    Devuelve el modelo entrenado, encoder, decoder y el historial de entrenamiento.
    """
    # Carga datos
    (x_train, y_train), (x_val, y_val), _ = load_and_preprocess(mix=mix, validation_split=validation_split)
    original_dim = x_train.shape[1]
    cond_dim = y_train.shape[1]

    # Construcción del modelo
    cvae, encoder, decoder = build_cvae(
        original_dim=original_dim,
        cond_dim=cond_dim,
        latent_dim=latent_dim,
        intermediate_encoder=enc_dim,
        intermediate_decoder=dec_dim,
        beta=beta,
        lr=lr
    )

    # Entrenamiento
    history = cvae.fit(
        x=[x_train, y_train],
        epochs=epochs,
        batch_size=batch_size,
        validation_data=([x_val, y_val], None)
    )

    return cvae, encoder, decoder, history

# Ejemplo de uso en Colab:
# from train import train_cvae
# cvae_model, enc, dec, hist = train_cvae(mix=None, latent_dim=128, enc_dim=256, dec_dim=256, beta=1.0, lr=1e-3, epochs=10)


In [4]:
cvae_model, enc, dec, hist = train_cvae(mix=None, latent_dim=128, enc_dim=256, dec_dim=256, beta=1.0, lr=1e-3, epochs=10)





ValueError: A KerasTensor cannot be used as input to a TensorFlow function. A KerasTensor is a symbolic placeholder for a shape and dtype, used when constructing Keras Functional models or Keras Functions. You can only use it as input to a Keras layer or a Keras operation (from the namespaces `keras.layers` and `keras.ops`). You are likely doing something like:

```
x = Input(...)
...
tf_fn(x)  # Invalid.
```

What you should do instead is wrap `tf_fn` in a layer:

```
class MyLayer(Layer):
    def call(self, x):
        return tf_fn(x)

x = MyLayer()(x)
```
