MODELO GENERADOR DE SEQUENCIAS BASADO EN GAN.
Su intención es mostrar ejemplos de modelos generativos para predecir sequencias de numeros segun la condicion de los resultados historicos. Funcionaria como el modelo GAN de generador de imagenes pero basado en sequencias de numeros. Me ayudado de Chat GPT para hacerlo y de un libro de IA de consulta.
Realizado en TensorFlow 2 y Keras.

In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras import layers
from keras.optimizers import Adam

# Supongamos que ya tienes cargado el dataframe con los resultados de la lotería

df = pd.read_csv('./loteria/Lotoideas.com - Histórico de Resultados - Primitiva - 2013 a 2024.csv')

# Preprocesamiento de datos
# Limitar los datos a la columna deseada
numeros_ganadores = df[['B1', 'B2', 'B3', 'B4', 'B5', 'B6']].to_numpy() 

# Normalizar los datos
numeros_ganadores = numeros_ganadores / 49  # Asumiendo que los números de la lotería son de 1 a 49

# Dimensiones
LATENT_DIM = 10  # Dimensión del vector latente
NUM_FEATURES = numeros_ganadores.shape[1]  # Número de características (números de lotería)

# Construir el Generador
def build_generator(latent_dim, num_features):
    model = tf.keras.Sequential([
        layers.Dense(128, activation='relu', input_dim=latent_dim, kernel_initializer='he_normal'),
        layers.Dense(256, activation='relu', kernel_initializer='he_normal'),
        layers.Dense(512, activation='relu', kernel_initializer='he_normal'),
        layers.Dense(num_features, activation='sigmoid')  # Sigmoid para escalar entre 0 y 1
    ])
    return model

# Construir el Discriminador
def build_discriminator(num_features):
    model = tf.keras.Sequential([
        layers.Dense(512, activation='relu', input_dim=num_features, kernel_initializer='he_normal'),
        layers.Dense(256, activation='relu', kernel_initializer='he_normal'),
        layers.Dense(128, activation='relu', kernel_initializer='he_normal'),
        layers.Dropout(0.3),
        layers.Dense(1, activation='sigmoid')  # Salida binaria (real o falso)
    ])
    return model

# Instanciar los modelos
generator = build_generator(LATENT_DIM, NUM_FEATURES)
discriminator = build_discriminator(NUM_FEATURES)

# Optimizadores
optimizer_discriminator = Adam(learning_rate=0.0001, beta_1=0.5)
optimizer_generator = Adam(learning_rate=0.0001, beta_1=0.5)

# Compilar el discriminador
discriminator.compile(optimizer=optimizer_discriminator, loss='binary_crossentropy', metrics=['accuracy'])

# Congelar el discriminador en el modelo combinado
discriminator.trainable = False

# Crear el modelo combinado (Generador + Discriminador)
gan_input = layers.Input(shape=(LATENT_DIM,))
generated_numbers = generator(gan_input)
gan_output = discriminator(generated_numbers)
gan = tf.keras.Model(gan_input, gan_output)

# Compilar el modelo combinado
gan.compile(optimizer=optimizer_generator, loss='binary_crossentropy')

# Preparar los datos reales
# Convertir los números ganadores en valores entre 0 y 1
batch_size = 32
num_epochs = 150

# Crear etiquetas para los datos reales y falsos
real_labels = tf.ones((batch_size, 1))
fake_labels = tf.zeros((batch_size, 1))

# Entrenar el GAN
for epoch in range(num_epochs):
    # Seleccionar una muestra aleatoria de datos reales
    idx = np.random.randint(0, numeros_ganadores.shape[0], batch_size)
    real_samples = numeros_ganadores[idx]

    # Generar datos falsos
    noise = np.random.normal(0, 1, (batch_size, LATENT_DIM))
    fake_samples = generator.predict(noise)

    # Entrenar el discriminador
    d_loss_real = discriminator.train_on_batch(real_samples, real_labels)
    d_loss_fake = discriminator.train_on_batch(fake_samples, fake_labels)
    d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)

    # Entrenar el generador (a través del modelo combinado)
    noise = np.random.normal(0, 1, (batch_size, LATENT_DIM))
    g_loss = gan.train_on_batch(noise, real_labels)  # Usar etiquetas reales para el generador

    # Imprimir las pérdidas
    if (epoch + 1) % 10 == 0 or epoch == 0:
        print(f"Epoch {epoch + 1}/{num_epochs} - Loss D: {d_loss[0]:.4f}, Loss G: {g_loss[0]:.4f}")

# Evaluar el modelo generador
def generar_numeros_loteria(modelo_generador, num_samples):
    noise = np.random.normal(0, 1, (num_samples, LATENT_DIM))
    generated_samples = modelo_generador.predict(noise)
    return generated_samples * 49  # Desnormalizar para obtener números reales

# Generar algunos números de lotería
numeros_generados = generar_numeros_loteria(generator, 5)
print("Números de lotería generados:")
print(np.array(numeros_generados, dtype=np.int32))


MODELO DCGAN para GENERADOR de sequencias de numeros basado en un historico.

In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras import layers, models, metrics, losses, optimizers
from sklearn.preprocessing import MinMaxScaler

# Cargar datos de lotería desde el archivo CSV
csv_path = './loteria/Lotoideas.com - Histórico de Resultados - Primitiva - 2013 a 2024.csv'
df = pd.read_csv(csv_path)

# Seleccionar las columnas de números ganadores
numeros_ganadores = df.iloc[:, 1:7].values

# Normalizar los datos (0 a 1) suponiendo que los números van del 1 al 49
# Escalar los números a un rango de 0 a 1
scaler = MinMaxScaler(feature_range=(0, 1))
numeros_ganadores = scaler.fit_transform(numeros_ganadores)

# Convertir los datos a tf.float32
numeros_ganadores = tf.convert_to_tensor(numeros_ganadores, dtype=tf.float32)

# Dimensiones
LATENT_DIM = 4  # Dimensión del vector latente
NUM_FEATURES = numeros_ganadores.shape[1]  # Número de características (números de lotería)
NOISE_PARAM = 0.1

# Crear dataset de TensorFlow
batch_size = 32
dataset = tf.data.Dataset.from_tensor_slices(numeros_ganadores).shuffle(buffer_size=1024).batch(batch_size)

# Construir el Generador
def build_generator(latent_dim, num_features):
    model = tf.keras.Sequential([
        layers.Dense(128, activation='relu', input_dim=latent_dim),
        layers.Dense(256, activation='relu'),
        layers.Dense(512, activation='relu'),
        layers.Dense(num_features, activation='sigmoid')  # Sigmoid para escalar entre 0 y 1
    ])
    return model

# Construir el Discriminador
def build_discriminator(num_features):
    model = tf.keras.Sequential([
        layers.Dense(512, activation='relu', input_dim=num_features),
        layers.Dense(256, activation='relu'),
        layers.Dense(128, activation='relu'),
        layers.Dense(1, activation='sigmoid')  # Salida binaria (real o falso)
    ])
    return model

# Instanciar los modelos
generator = build_generator(LATENT_DIM, NUM_FEATURES)
discriminator = build_discriminator(NUM_FEATURES)

# Optimizadores
optimizer_discriminator = optimizers.Adam(learning_rate=0.0002, beta_1=0.5)
optimizer_generator = optimizers.Adam(learning_rate=0.0002, beta_1=0.5)

# Modelo DCGAN
class DCGAN(models.Model):
    def __init__(self, discriminator, generator, latent_dim):
        super(DCGAN, self).__init__()
        self.discriminator = discriminator
        self.generator = generator
        self.latent_dim = latent_dim

    def compile(self, d_optimizer, g_optimizer):
        super(DCGAN, self).compile()
        self.loss_fn = losses.BinaryCrossentropy()  # Usar BinaryCrossentropy
        self.d_optimizer = d_optimizer
        self.g_optimizer = g_optimizer
        self.d_loss_metric = metrics.Mean(name="d_loss")
        self.d_real_acc_metric = metrics.BinaryAccuracy(name="d_real_acc")
        self.d_fake_acc_metric = metrics.BinaryAccuracy(name="d_fake_acc")
        self.d_acc_metric = metrics.BinaryAccuracy(name="d_acc")
        self.g_loss_metric = metrics.Mean(name="g_loss")
        self.g_acc_metric = metrics.BinaryAccuracy(name="g_acc")

    @property
    def metrics(self):
        return [
            self.d_loss_metric,
            self.d_real_acc_metric,
            self.d_fake_acc_metric,
            self.d_acc_metric,
            self.g_loss_metric,
            self.g_acc_metric,
        ]

    def train_step(self, real_numbers):
        # Convertir real_numbers a tf.float32 si es necesario
        real_numbers = tf.cast(real_numbers, tf.float32)

        # Sample random points in the latent space
        batch_size = tf.shape(real_numbers)[0]
        random_latent_vectors = tf.random.normal(
            shape=(batch_size, self.latent_dim), dtype=tf.float32
        )

       # Train the discriminator on fake images
        with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
            generated_numbers = self.generator(
                random_latent_vectors, training=True
            )
            real_predictions = self.discriminator(real_numbers, training=True)
            fake_predictions = self.discriminator(
                generated_numbers, training=True
            )

            real_labels = tf.ones_like(real_predictions)
            real_noisy_labels = real_labels + NOISE_PARAM * tf.random.uniform(
                tf.shape(real_predictions)
            )
            fake_labels = tf.zeros_like(fake_predictions)
            fake_noisy_labels = fake_labels - NOISE_PARAM * tf.random.uniform(
                tf.shape(fake_predictions)
            )

            d_real_loss = self.loss_fn(real_noisy_labels, real_predictions)
            d_fake_loss = self.loss_fn(fake_noisy_labels, fake_predictions)
            d_loss = (d_real_loss + d_fake_loss) / 2.0

            g_loss = self.loss_fn(real_labels, fake_predictions)

        gradients_of_discriminator = disc_tape.gradient(
            d_loss, self.discriminator.trainable_variables
        )
        gradients_of_generator = gen_tape.gradient(
            g_loss, self.generator.trainable_variables
        )

        self.d_optimizer.apply_gradients(
            zip(gradients_of_discriminator, discriminator.trainable_variables)
        )
        self.g_optimizer.apply_gradients(
            zip(gradients_of_generator, generator.trainable_variables)
        )

        # Update metrics
        self.d_loss_metric.update_state(d_loss)
        self.d_real_acc_metric.update_state(real_labels, real_predictions)
        self.d_fake_acc_metric.update_state(fake_labels, fake_predictions)
        self.d_acc_metric.update_state(
            [real_labels, fake_labels], [real_predictions, fake_predictions]
        )
        self.g_loss_metric.update_state(g_loss)
        self.g_acc_metric.update_state(real_labels, fake_predictions)

        return {m.name: m.result() for m in self.metrics}

# Instanciar y compilar el modelo DCGAN
dcgan = DCGAN(discriminator=discriminator, generator=generator, latent_dim=LATENT_DIM)

dcgan.compile(d_optimizer=optimizer_discriminator, g_optimizer=optimizer_generator)

# Entrenar el modelo DCGAN
EPOCHS = 400
dcgan.fit(dataset, epochs = EPOCHS, batch_size=batch_size)
"""
for epoch in range(EPOCHS):
    for real_numbers in dataset:
        metrics = dcgan.train_step(real_numbers)

    # Imprimir métricas de entrenamiento
    if (epoch + 1) % 10 == 0 or epoch == 0:
        print(
            f"Epoch {epoch + 1}/{EPOCHS} - "
            f"Loss D: {metrics['d_loss']:.4f}, "
            f"Acc Real D: {metrics['d_real_acc']:.4f}, "
            f"Acc Fake D: {metrics['d_fake_acc']:.4f}, "
            f"Loss G: {metrics['g_loss']:.4f}, "
            f"Acc G: {metrics['g_acc']:.4f}"
        )
"""
# Generar números de lotería después de entrenar
def generar_numeros_loteria(modelo_generador, num_samples):
    noise = np.random.normal(0, 1, (num_samples, LATENT_DIM))
    generated_samples = modelo_generador.predict(noise)
    return np.round(generated_samples * 49).astype(int)  # Desnormalizar y redondear

# Generar algunos números de lotería
numeros_generados = generar_numeros_loteria(generator, 5)
print("Números de lotería generados:")
print(numeros_generados)


MODELO WGAN_GP para GENERACION de sequencias de numeros basado en un historico previo.

In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras import layers, models, metrics, optimizers

# Cargar datos de lotería desde el archivo CSV
csv_path = './loteria/Lotoideas.com - Histórico de Resultados - Primitiva - 2013 a 2024.csv'
df = pd.read_csv(csv_path)

# Seleccionar las columnas de números ganadores
numeros_ganadores = df.iloc[:, 1:7].values

# Normalizar los datos (0 a 1) suponiendo que los números van del 1 al 49
numeros_ganadores = numeros_ganadores / 49.0

# Convertir los datos a tf.float32
numeros_ganadores = tf.convert_to_tensor(numeros_ganadores, dtype=tf.float32)

# Dimensiones
LATENT_DIM = 2 # Dimensión del vector latente
NUM_FEATURES = numeros_ganadores.shape[1]  # Número de características (números de lotería)

# Crear dataset de TensorFlow
batch_size = 32
dataset = tf.data.Dataset.from_tensor_slices(numeros_ganadores).shuffle(buffer_size=1024).batch(batch_size)

# Construir el Generador
def build_generator(latent_dim, num_features):
    model = tf.keras.Sequential([
        layers.Dense(128, activation='relu', input_dim=latent_dim),        
        layers.Dense(256, activation='relu', kernel_initializer="glorot_uniform"),        
        layers.Dense(512, activation='relu', kernel_initializer="glorot_uniform"),        
        layers.Dense(num_features, activation='sigmoid')  # Sigmoid para escalar entre 0 y 1
    ])
    return model

# Construir el Discriminador
def build_discriminator(num_features):
    model = tf.keras.Sequential([
        layers.Dense(512, activation='relu', input_dim=num_features),
        layers.Dense(256, activation='relu', kernel_initializer="glorot_uniform"),
        layers.Dense(128, activation='relu', kernel_initializer="glorot_uniform"),
        layers.Dense(1)  # Salida de un solo valor para la función de pérdida Wasserstein
    ])
    return model

# Instanciar los modelos
generator = build_generator(LATENT_DIM, NUM_FEATURES)
discriminator = build_discriminator(NUM_FEATURES)

# Optimizadores
optimizer_discriminator = optimizers.Adam(learning_rate=0.00001, beta_1=0.5)
optimizer_generator = optimizers.Adam(learning_rate=0.00001, beta_1=0.5)

# Pérdida Wasserstein
def wasserstein_loss(y_true, y_pred):
    return tf.reduce_mean(y_true * y_pred)

# Penalización del Gradiente
def gradient_penalty(real_numbers, fake_numbers, batch_size):
    # Calcula el gradiente de interpolación
    alpha = tf.random.normal([batch_size, 1], 0.0, 1.0)
    diff = fake_numbers - real_numbers
    interpolated = real_numbers + alpha * diff

    with tf.GradientTape() as tape:
        tape.watch(interpolated)
        pred = discriminator(interpolated)

    grads = tape.gradient(pred, [interpolated])[0]
    norm = tf.sqrt(tf.reduce_sum(tf.square(grads), axis=[1]))
    penalty = tf.reduce_mean((norm - 1.0) ** 2)
    return penalty

# Modelo WGAN-GP
class WGAN_GP(models.Model):
    def __init__(self, discriminator, generator, latent_dim, critic_steps, gradient_penalty_weight):
        super(WGAN_GP, self).__init__()
        self.discriminator = discriminator
        self.generator = generator
        self.latent_dim = latent_dim
        self.gradient_penalty_weight = gradient_penalty_weight
        self.critic_steps = critic_steps

    def compile(self, d_optimizer, g_optimizer):
        super(WGAN_GP, self).compile()
        self.d_optimizer = d_optimizer
        self.g_optimizer = g_optimizer
        self.c_wass_loss_metric = metrics.Mean(name="c_wass_loss")
        self.c_gp_metric = metrics.Mean(name="c_gp")
        self.c_loss_metric = metrics.Mean(name="c_loss")
        self.g_loss_metric = metrics.Mean(name="g_loss")

    @property
    def metrics(self):
        return [
            self.c_loss_metric,
            self.c_wass_loss_metric,
            self.c_gp_metric,
            self.g_loss_metric,
        ]
    """
    def train_step(self, real_numbers):
        # Convertir real_numbers a tf.float32 si es necesario
        real_numbers = tf.cast(real_numbers, tf.float32)

        # Sample random points in the latent space
        batch_size = tf.shape(real_numbers)[0]
        random_latent_vectors = tf.random.normal(
            shape=(batch_size, self.latent_dim), dtype=tf.float32
        )

        # Generate fake numbers
        fake_numbers = self.generator(random_latent_vectors, training=True)

        # Train the discriminator on fake and real numbers
        with tf.GradientTape() as disc_tape:
            real_predictions = self.discriminator(real_numbers, training=True)
            fake_predictions = self.discriminator(fake_numbers, training=True)

            # Calculate discriminator loss
            d_loss_real = wasserstein_loss(tf.ones_like(real_predictions), real_predictions)
            d_loss_fake = wasserstein_loss(tf.zeros_like(fake_predictions), fake_predictions)
            d_loss = d_loss_real + d_loss_fake
            c_wass_loss = tf.reduce_mean(fake_predictions) - tf.reduce_mean(
                    real_predictions
                )
            c_gp = self.gradient_penalty(
                    batch_size, real_numbers, fake_numbers
                    )
                
            c_loss = c_wass_loss + c_gp * self.gp_weight

            # Add gradient penalty
            penalty = gradient_penalty(real_numbers, fake_numbers, batch_size)
            d_loss += self.gradient_penalty_weight * penalty

        gradients_of_discriminator = disc_tape.gradient(
            d_loss, self.discriminator.trainable_variables
        )
        self.d_optimizer.apply_gradients(
            zip(gradients_of_discriminator, self.discriminator.trainable_variables)
        )

        # Train the generator
        with tf.GradientTape() as gen_tape:
            fake_predictions_for_gen = self.discriminator(
                self.generator(random_latent_vectors, training=True), training=True
            )
            g_loss = wasserstein_loss(tf.ones_like(fake_predictions_for_gen), fake_predictions_for_gen)

        gradients_of_generator = gen_tape.gradient(
            g_loss, self.generator.trainable_variables
        )
        self.g_optimizer.apply_gradients(
            zip(gradients_of_generator, self.generator.trainable_variables)
        )

        # Update metrics
        self.d_loss_metric.update_state(d_loss)
        self.g_loss_metric.update_state(g_loss)

        return {m.name: m.result() for m in self.metrics}
    """
    def gradient_penalty(self, batch_size, real_samples, fake_samples):
        """
        Computes the gradient penalty for the Wasserstein GAN.

        Parameters:
        - batch_size: Integer, the number of samples in each batch.
        - real_samples: Tensor, the real data samples.
        - fake_samples: Tensor, the generated data samples.

        Returns:
        - gp: Tensor, the computed gradient penalty.
        """
        
        # Generate random interpolation coefficients
        alpha = tf.random.uniform([batch_size, 1], minval=0.0, maxval=1.0)
        
        # Interpolate between real and fake samples
        interpolated_samples = alpha * real_samples + (1 - alpha) * fake_samples
        
        # Watch the interpolated samples for gradient computation
        with tf.GradientTape() as gp_tape:
            gp_tape.watch(interpolated_samples)
            # Forward pass through the discriminator
            interpolated_predictions = self.discriminator(interpolated_samples, training=True)
        
        # Compute the gradients with respect to the interpolated samples
        gradients = gp_tape.gradient(interpolated_predictions, [interpolated_samples])[0]
        
        # Compute the L2 norm of the gradients for each sample
        gradients_l2_norm = tf.sqrt(tf.reduce_sum(tf.square(gradients), axis=[1]))
        
        # Compute the mean squared deviation of the L2 norms from 1
        gradient_penalty = tf.reduce_mean(tf.square(gradients_l2_norm - 1.0))
        
        return gradient_penalty

    def train_step(self, real_numbers):
        # Convertir real_numbers a tf.float32 si es necesario
        real_numbers = tf.cast(real_numbers, tf.float32)

        # Sample random points in the latent space
        batch_size = tf.shape(real_numbers)[0]
        
        for i in range(self.critic_steps):
            random_latent_vectors = tf.random.normal(
                shape=(batch_size, self.latent_dim), dtype=tf.float32
            )
            with tf.GradientTape() as tape:
                fake_numbers = self.generator(
                    random_latent_vectors, training=True
                )
                fake_predictions = self.discriminator(fake_numbers, training=True)
                real_predictions = self.discriminator(real_numbers, training=True)

                c_wass_loss = tf.reduce_mean(fake_predictions) - tf.reduce_mean(
                    real_predictions
                )
                c_gp = self.gradient_penalty(
                    batch_size, real_numbers, fake_numbers
                )
                c_loss = c_wass_loss + c_gp * self.gradient_penalty_weight

            c_gradient = tape.gradient(c_loss, self.discriminator.trainable_variables)
            self.d_optimizer.apply_gradients(
                zip(c_gradient, self.discriminator.trainable_variables)
            )

        random_latent_vectors = tf.random.normal(
            shape=(batch_size, self.latent_dim)
        )
        with tf.GradientTape() as tape:
            fake_numbers = self.generator(random_latent_vectors, training=True)
            fake_predictions = self.discriminator(fake_numbers, training=True)
            g_loss = -tf.reduce_mean(fake_predictions)

        gen_gradient = tape.gradient(g_loss, self.generator.trainable_variables)
        self.g_optimizer.apply_gradients(
            zip(gen_gradient, self.generator.trainable_variables)
        )

        self.c_loss_metric.update_state(c_loss)
        self.c_wass_loss_metric.update_state(c_wass_loss)
        self.c_gp_metric.update_state(c_gp)
        self.g_loss_metric.update_state(g_loss)

        return {m.name: m.result() for m in self.metrics}

# Instanciar y compilar el modelo WGAN-GP
wgan_gp = WGAN_GP(discriminator=discriminator, generator=generator, latent_dim=LATENT_DIM, gradient_penalty_weight=10.0, critic_steps=1)

wgan_gp.compile(d_optimizer=optimizer_discriminator, g_optimizer=optimizer_generator)

# Entrenar el modelo WGAN-GP
EPOCHS = 200
"""
for epoch in range(EPOCHS):
    for real_numbers in dataset:
        metrics = wgan_gp.train_step(real_numbers)

    # Imprimir métricas de entrenamiento
    if (epoch + 1) % 10 == 0 or epoch == 0:
        print(
            f"Epoch {epoch + 1}/{EPOCHS} - "
            f"Loss C: {metrics['c_loss']:.4f}, "
            f"Loss G: {metrics['g_loss']:.4f}"
        )
"""
wgan_gp.fit(dataset, epochs=EPOCHS, batch_size=batch_size)

# Generar números de lotería después de entrenar
def generar_numeros_loteria(modelo_generador, num_samples):
    noise = np.random.normal(0, 1, (num_samples, LATENT_DIM))
    generated_samples = modelo_generador.predict(noise)
    return np.round(generated_samples * 49).astype(int)  # Desnormalizar y redondear

# Generar algunos números de lotería
numeros_generados = generar_numeros_loteria(generator, 5)
print("Números de lotería generados:")
print(numeros_generados)


MODELO VAE para generacion de numeros basado en un historico.

In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras import layers, models, metrics, losses
from tensorflow.keras.layers import Input, Dense, Lambda
from tensorflow.keras.models import Model
from tensorflow.keras import backend as K
from sklearn.preprocessing import MinMaxScaler

# 1. Carga y Preprocesamiento del Dataset

# Cargar el archivo CSV
file_path = './loteria/Lotoideas.com - Histórico de Resultados - Primitiva - 2013 a 2024.csv'
df = pd.read_csv(file_path)

# Extraer las columnas que contienen los números
columns_of_interest = ['B1', 'B2', 'B3', 'B4', 'B5', 'B6']
sequences = df[columns_of_interest].values

# Normalizar los números entre 0 y 1
scaler = MinMaxScaler()
sequences_normalized = scaler.fit_transform(sequences)
# Crear dataset de TensorFlow
batch_size = 32
dataset = tf.data.Dataset.from_tensor_slices(sequences_normalized).shuffle(buffer_size=1024).batch(batch_size)

# 2. Definición del Encoder y Decoder

class Sampling(layers.Layer):
    """Layer to sample from the latent space."""
    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = K.random_normal(shape=(batch, dim))
        return z_mean + K.exp(0.5 * z_log_var) * epsilon

def build_encoder(input_dim, latent_dim, hidden_dims):
    inputs = Input(shape=(input_dim,))
    x = inputs
    for dim in hidden_dims:
        x = Dense(dim, activation='relu', kernel_initializer='he_normal')(x)
    z_mean = Dense(latent_dim, kernel_initializer='he_normal')(x)
    z_log_var = Dense(latent_dim, kernel_initializer='he_normal')(x)
    z = Sampling()([z_mean, z_log_var])
    return Model(inputs, [z_mean, z_log_var, z], name="encoder")

def build_decoder(latent_dim, output_dim, hidden_dims):
    latent_inputs = Input(shape=(latent_dim,))
    x = latent_inputs
    for dim in hidden_dims[::-1]:
        x = Dense(dim, activation='relu', kernel_initializer='he_normal')(x)
    outputs = Dense(output_dim, activation='sigmoid', kernel_initializer='he_normal')(x)
    return Model(latent_inputs, outputs, name="decoder")

# 3. Definición de la clase LotteryVAE utilizando la clase VAE proporcionada

class LotteryVAE(models.Model):
    def __init__(self, encoder, decoder, **kwargs):
        super(LotteryVAE, self).__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
        self.total_loss_tracker = metrics.Mean(name="total_loss")
        self.reconstruction_loss_tracker = metrics.Mean(name="reconstruction_loss")
        self.kl_loss_tracker = metrics.Mean(name="kl_loss")

    @property
    def metrics(self):
        return [
            self.total_loss_tracker,
            self.reconstruction_loss_tracker,
            self.kl_loss_tracker,
        ]

    def call(self, inputs):
        z_mean, z_log_var, z = self.encoder(inputs)
        reconstruction = self.decoder(z)
        return z_mean, z_log_var, reconstruction

    def train_step(self, data):
        with tf.GradientTape() as tape:
            z_mean, z_log_var, reconstruction = self(data, training=True)
            reconstruction_loss = tf.reduce_mean(losses.mse(data, reconstruction))
            kl_loss = tf.reduce_mean(
                -0.5 * tf.reduce_sum(1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var), axis=1)
            )
            total_loss = reconstruction_loss + kl_loss

        grads = tape.gradient(total_loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))

        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.kl_loss_tracker.update_state(kl_loss)

        return {
            "loss": self.total_loss_tracker.result(),
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result(),
        }

    def test_step(self, data):
        if isinstance(data, tuple):
            data = data[0]

        z_mean, z_log_var, reconstruction = self(data)
        reconstruction_loss = tf.reduce_mean(losses.mse(data, reconstruction))
        kl_loss = tf.reduce_mean(
            -0.5 * tf.reduce_sum(1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var), axis=1)
        )
        total_loss = reconstruction_loss + kl_loss

        return {
            "loss": total_loss,
            "reconstruction_loss": reconstruction_loss,
            "kl_loss": kl_loss,
        }

# 4. Construcción del VAE, Entrenamiento y Generación

input_dim = sequences_normalized.shape[1]
latent_dim = 10
hidden_dims = [64, 32]

# Crear encoder y decoder
encoder = build_encoder(input_dim, latent_dim, hidden_dims)
decoder = build_decoder(latent_dim, input_dim, hidden_dims)

# Crear la instancia del VAE
vae = LotteryVAE(encoder, decoder)

# Compilar el modelo
vae.compile(optimizer='adam')

# Entrenar el VAE
vae.fit(dataset, epochs=100, batch_size=batch_size)

# 5. Generación de Nuevas Secuencias de Números de Lotería

def generate_lottery_numbers(vae, scaler, num_samples=1):
    latent_sample = np.random.normal(size=(num_samples, latent_dim))
    generated = vae.decoder.predict(latent_sample)
    return np.round(scaler.inverse_transform(generated)).astype(int)

generated_sequences = generate_lottery_numbers(vae, scaler, num_samples=5)
print("Secuencias generadas:\n", generated_sequences)


MODELO EBM para GENERACION de sequencias basado en un historico

In [None]:
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models, metrics, optimizers
from tensorflow.keras.layers import Dense, Input
from sklearn.preprocessing import MinMaxScaler

# Parámetros globales
SEQUENCE_LENGTH = 6  # 6 números en cada secuencia de lotería
LATENT_DIM = 32  # Dimensión del espacio latente
STEPS = 10  # Pasos para la generación
STEP_SIZE = 0.1  # Tamaño de paso para la dinámica
NOISE = 0.01  # Ruido aplicado
ALPHA = 0.1  # Regularización
BUFFER_SIZE = 1000  # Tamaño del buffer
BATCH_SIZE = 32  # Tamaño de batch

# 1. Cargar y normalizar los datos de lotería
file_path = './loteria/Lotoideas.com - Histórico de Resultados - Primitiva - 2013 a 2024.csv'
df = pd.read_csv(file_path)
columns_of_interest = ['B1', 'B2', 'B3', 'B4', 'B5', 'B6']
sequences = df[columns_of_interest].values

# Asegurar que el número de secuencias sea par
if np.shape(sequences)[0] % 2 != 0:
    sequences = np.vstack([sequences, sequences[-1]])  # Duplicar la última secuencia

# Normalizar los números entre -1 y 1
scaler = MinMaxScaler((-1, 1))
sequences_normalized = scaler.fit_transform(sequences)

# 2. Definir la arquitectura del EBM
ebm_input = Input(shape=(SEQUENCE_LENGTH,))
x = Dense(64, activation='relu')(ebm_input)
x = Dense(128, activation='relu')(x)
x = Dense(64, activation='relu')(x)
ebm_output = Dense(1)(x)
model = models.Model(ebm_input, ebm_output)
model.summary()

# 3. Definir la función de generación de secuencias usando Langevin Dynamics
@tf.function
def generate_samples(model, inp_seqs, steps, step_size, noise, return_seq_per_step=False):
    seqs_per_step = []
    for _ in range(steps):
        inp_seqs += tf.random.normal(inp_seqs.shape, mean=0, stddev=noise)
        inp_seqs = tf.clip_by_value(inp_seqs, -1.0, 1.0)
        with tf.GradientTape() as tape:
            tape.watch(inp_seqs)
            out_score = model(inp_seqs)
        grads = tape.gradient(out_score, inp_seqs)
        inp_seqs += step_size * grads
        inp_seqs = tf.clip_by_value(inp_seqs, -1.0, 1.0)
        if return_seq_per_step:
            seqs_per_step.append(inp_seqs)
    if return_seq_per_step:
        return tf.stack(seqs_per_step, axis=0)
    return inp_seqs

# 4. Buffer de ejemplos
class Buffer:
    def __init__(self, model):
        self.model = model
        self.examples = [tf.random.uniform(shape=(1, SEQUENCE_LENGTH)) for _ in range(BATCH_SIZE)]
    
    @tf.function
    def sample_new_exmps(self, steps, step_size, noise):
        n_new = np.random.binomial(BATCH_SIZE, 0.05)
        rand_seqs = tf.random.uniform((n_new, SEQUENCE_LENGTH))
        old_seqs = tf.concat(
            [self.examples[i] for i in np.random.choice(len(self.examples), BATCH_SIZE - n_new)], axis=0
        )
        inp_seqs = tf.concat([rand_seqs, old_seqs], axis=0)
        inp_seqs = generate_samples(
            self.model, inp_seqs, steps=steps, step_size=step_size, noise=noise
        )
        self.examples = tf.split(inp_seqs, BATCH_SIZE, axis=0) + self.examples[:BUFFER_SIZE - BATCH_SIZE]
        return inp_seqs

# 5. Definir la clase EBM
class LotteryEBM(models.Model):
    def __init__(self):
        super(LotteryEBM, self).__init__()
        self.model = model
        self.buffer = Buffer(self.model)
        self.alpha = ALPHA
        self.loss_metric = metrics.Mean(name="loss")
        self.reg_loss_metric = metrics.Mean(name="reg")
        self.cdiv_loss_metric = metrics.Mean(name="cdiv")
        self.real_out_metric = metrics.Mean(name="real")
        self.fake_out_metric = metrics.Mean(name="fake")

    @property
    def metrics(self):
        return [
            self.loss_metric,
            self.reg_loss_metric,
            self.cdiv_loss_metric,
            self.real_out_metric,
            self.fake_out_metric,
        ]

    def train_step(self, real_seqs):
        real_seqs = tf.convert_to_tensor(real_seqs)
        real_seqs += tf.random.normal(shape=tf.shape(real_seqs), mean=0, stddev=NOISE)
        real_seqs = tf.clip_by_value(real_seqs, -1.0, 1.0)
        fake_seqs = self.buffer.sample_new_exmps(steps=STEPS, step_size=STEP_SIZE, noise=NOISE)
               
        inp_seqs = tf.concat([real_seqs, fake_seqs], axis=0)
       
        with tf.GradientTape() as training_tape:
            real_out, fake_out = tf.split(self.model(inp_seqs), num_or_size_splits=2, axis=0)
            cdiv_loss = tf.reduce_mean(fake_out, axis=0) - tf.reduce_mean(real_out, axis=0)
            reg_loss = self.alpha * tf.reduce_mean(real_out**2 + fake_out**2, axis=0)
            loss = cdiv_loss + reg_loss
        
        grads = training_tape.gradient(loss, self.model.trainable_variables)
        self.optimizer.apply_gradients(zip(grads, self.model.trainable_variables))

        self.loss_metric.update_state(loss)
        self.reg_loss_metric.update_state(reg_loss)
        self.cdiv_loss_metric.update_state(cdiv_loss)
        self.real_out_metric.update_state(tf.reduce_mean(real_out, axis=0))
        self.fake_out_metric.update_state(tf.reduce_mean(fake_out, axis=0))
        return {m.name: m.result() for m in self.metrics}

    def test_step(self, real_seqs):
        batch_size = real_seqs.shape[0]
        fake_seqs = tf.random.uniform((batch_size, SEQUENCE_LENGTH))
        inp_seqs = tf.concat([real_seqs, fake_seqs], axis=0)

        # Asegurando que tengamos un número par de secuencias para dividir
        total_seqs = tf.shape(inp_seqs)[0]
        if total_seqs % 2 != 0:
            # Duplicar la última secuencia si el total es impar
            last_seq = tf.expand_dims(inp_seqs[-1], axis=0)
            inp_seqs = tf.concat([inp_seqs, last_seq], axis=0)
        
        real_out, fake_out = tf.split(self.model(inp_seqs), num_or_size_splits=2, axis=0)
        cdiv = tf.reduce_mean(fake_out, axis=0) - tf.reduce_mean(real_out, axis=0)
        self.cdiv_loss_metric.update_state(cdiv)
        self.real_out_metric.update_state(tf.reduce_mean(real_out, axis=0))
        self.fake_out_metric.update_state(tf.reduce_mean(fake_out, axis=0))
        return {m.name: m.result() for m in self.metrics[2:]}

# 6. Compilación y entrenamiento del EBM
ebm = LotteryEBM()
ebm.compile(optimizer=optimizers.Adam(learning_rate=0.001))

# Convertir las secuencias normalizadas en tensores de entrenamiento
x_train = tf.convert_to_tensor(sequences_normalized, dtype=tf.float32)
x_train = tf.data.Dataset.from_tensor_slices(x_train).batch(BATCH_SIZE)

# Entrenamiento
ebm.fit(x_train, epochs=1000)

# 7. Generación de nuevas secuencias de lotería determinísticas
def generate_lottery_numbers(ebm, num_samples=5):
    latent_sample = tf.random.uniform((num_samples, SEQUENCE_LENGTH))
    generated = generate_samples(ebm.model, latent_sample, steps=STEPS, step_size=STEP_SIZE, noise=NOISE)
    return np.round(scaler.inverse_transform(generated)).astype(int)

generated_sequences = generate_lottery_numbers(ebm, num_samples=5)
print("Secuencias generadas:\n", generated_sequences)
