### Redes Generativas Aplicadas a la Geografía Colombiana
#### Danny Garzon degarzonm@unal.edu.co, Universidad Nacional de Colombia
#### Noviembre 2023

#### Introducción
El presente notebook tiene como objetivo la implementación de una red generativa de adversarios (GAN) para la generación de mapas de la geografía colombiana. Para ello se utilizarán diversas librerias, estableceremos unas variables globales para el control del modelo generativo y otros aspectos para aprovechar el uso de la GPU.Luego haremos uso de técnicas de procsamiento de imagenes para establecer nuestro conjunto de datos, estableceremos el modelo y la arquitectura del sistema de redes neuronales, entrenaremos el modelo y finalmente evaluaremos los resultados obtenidos.

### Librerias

Realiza la importación de bibliotecas esenciales para el trabajo con Deep Learning, como TensorFlow, NumPy, y Matplotlib. También incluye comandos para verificar la disponibilidad de la GPU, listar dispositivos disponibles y comprobar las versiones de las bibliotecas importantes, asegurando un entorno adecuado para el procesamiento y análisis de datos.

In [None]:
import os
import re
import sys
import time
import numpy as np
import pydot
from PIL import Image
from pathlib import Path
import tensorflow as tf
from tensorflow.keras import Model, Sequential
from tensorflow.keras.layers import Input, Conv2D, Conv2DTranspose, LeakyReLU, ReLU, BatchNormalization, Dropout, Concatenate
from tensorflow.python.client import device_lib
import matplotlib.pyplot as plt

# comprobamos que estamos usando la GPU
print("GPU", "disponible" if tf.config.list_physical_devices("GPU") else "No disponible")
# comprobamos que dispositivos tenemos disponibles
print(tf.config.list_physical_devices())

# comprobamos que version de cada libreria estamos usando
print("version Python:", sys.version)
print("version Tensorflow: ", tf.__version__)
print("version Numpy:", np.__version__)
print("version Matplotlib: ", plt.matplotlib.__version__)

### Definición de Parámetros y Rutas para el Entrenamiento del Modelo
Esta celda establece variables globales clave para el entrenamiento del modelo. Incluye parámetros como el tamaño del buffer, tamaño del lote, dimensiones de las imágenes, semilla aleatoria, canales de salida, factor de regularización, número de épocas, configuración de autoguardado, y rutas para el dataset y checkpoints. Estos parámetros son fundamentales para configurar y controlar el proceso de entrenamiento y manejo de datos.

In [None]:
# variables globales
TAM_BUFFER = 8000                 # tamaño del dataset a utilizar
TAM_BATCH = 1                     # tamaño del lote
IMG_ANCHO = 256                   # ancho de la imagen
IMG_ALTO = 256                    # alto de la imagen
SEMILLA = 2023                    # semilla para la generacion de numeros aleatorios
CANALES_SALIDA = 3                # canales de salida de la imagen rgb
LAMBDA = 100                      # factor de regularizacion 
EPOCAS = 200                       # epocas del entrenamiento
AUTOGUARDADO_EPOCAS = 40           # epocas para autoguardar los checkpoints del entrenamiento
RANGO_NORMALIZACION = (-1, 1)     # rango de normalizacion de las imagenes de [0,255] A [-1,1]

RUTA_DATASET = 'dataset' # ruta del dataset relativa al notebook
RUTA_PANEL = RUTA_DATASET + '/panel_' # ruta del panel relativa al notebook
RUTA_IMG_PRUEBA = RUTA_DATASET + '/test_img' # ruta de la imagen de prueba relativa al notebook
RUTA_CHECKPOINTS = RUTA_DATASET + '/checkpoints' # ruta de los checkpoints relativa al notebook
RUTA_CHECKPOINTS_EXTERNO = 'CHECKPOINTS' # ruta de los checkpoints relativa al notebook
PREFIJO_CHECKPOINT = RUTA_CHECKPOINTS+ "/ckpt"  # prefijo de los checkpoints

### Preparación de Datos para Entrenamiento y Prueba
Esta celda se enfoca en la preparación de los datos para el entrenamiento y prueba. Inicialmente, lista los nombres de las imágenes en un directorio específico y calcula su cantidad total. Luego, separa las imágenes en conjuntos de entrenamiento y prueba, utilizando un porcentaje predefinido del total. Finalmente, imprime en pantalla el número de imágenes en cada conjunto, asegurando una distribución adecuada para el entrenamiento y la evaluación del modelo.

In [None]:
# listamos los nombres de las imagenes disponibles en el directorio para el entrenamiento
LISTA_TOTAL_NOMBRES = os.listdir(RUTA_PANEL+'0') # lista de nombres de imagenes panel 0 representa la informacion geografica

# comprobamos el numero de imagenes encontradas
N_IMGS_EN_TOTAL = len(LISTA_TOTAL_NOMBRES)
print("Número de imagenes encontradas:", N_IMGS_EN_TOTAL)

# numero de imagenes para el test
# elegimos el 20% de las imagenes para el test, dejando 80% para el entrenamiento
TAM_BUFFER_ENTRENAMIENTO = int(TAM_BUFFER * 0.8)
TAM_BUFFER_PRUEBA = int(TAM_BUFFER * 0.2)
# fijamos la semilla para la generacion de numeros aleatorios
np.random.seed(SEMILLA)
# barajamos la lista de nombres de imagenes
np.random.shuffle(LISTA_TOTAL_NOMBRES)

# particiones de imagenes para el entrenamiento y test
LISTA_NMS_ENTRENAMIENTO = LISTA_TOTAL_NOMBRES[0:TAM_BUFFER_ENTRENAMIENTO]
LISTA_NMS_PRUEBA = LISTA_TOTAL_NOMBRES[TAM_BUFFER_ENTRENAMIENTO:TAM_BUFFER]

# escribimos en pantalla las dimensiones de los conjuntos de entrenamiento y test
print("Numero de imagenes de entrenamiento ", len(LISTA_NMS_ENTRENAMIENTO))
print("Numero de imagenes de prueba ", len(LISTA_NMS_PRUEBA))


###  Funciones Auxiliares para el Preprocesamiento de Imágenes
Esta celda define funciones auxiliares para el preprocesamiento de imágenes. La función nombre_img_a_tensores carga y procesa imágenes, convirtiéndolas en tensores adecuados para el entrenamiento de modelos de Deep Learning. Incluye pasos como la decodificación de imágenes PNG, redimensionamiento, y normalización. Las funciones cambiar_tam y norm_tensor son utilizadas para cambiar el tamaño y normalizar los tensores respectivamente, facilitando su uso posterior en el entrenamiento del modelo.

In [None]:
# funciones auxiliares para el preprocesado de las imagenes
def nombre_img_a_tensores(nombre_img , paneles = 2, img_ancho = 256, img_alto = 256):
    imgs_cargadas = []
    # cargamos las imagenes
    for x in range(paneles):
        # cargamos las imagenes 
        archivo_img = tf.io.read_file(RUTA_PANEL + f"{x}/" + nombre_img)
        # decodificamos las imagenes en formato png y las convertimos a tensores float32, teniendo en cuenta los 3 canales de color
        tensor_panel_x = tf.cast(tf.image.decode_png(archivo_img),
                           tf.float32)[..., :3]
        # redimensionamos los tensores a las dimensiones de entrada de la red
        tensor_panel_x = cambiar_tam(tensor_panel_x, img_ancho, img_alto)
        # normalizamos los tensores al rango [-1,1]  
        tensor_panel_x = norm_tensor(tensor_panel_x)
        imgs_cargadas.append(tensor_panel_x)
    return imgs_cargadas

def cambiar_tam(input_image, ancho, alto):
    input_image = tf.image.resize(input_image, [ancho, alto], method=tf.image.ResizeMethod.NEAREST_NEIGHBOR)
    return input_image

def norm_tensor(tensor_entrada, range = (-1,1)):
    tensor_entrada = range[0] + ((range[1]-range[0])*tensor_entrada / 255)
    return tensor_entrada

### Ejemplos de Imágenes del Conjunto de Datos

In [None]:

### ejemplo de uso de las funciones auxiliares, obtenemos una pareja de imagenes de ejemplo aleatoria
nom_img_ejemplo = LISTA_TOTAL_NOMBRES[np.random.randint(0, len(LISTA_TOTAL_NOMBRES))]
tensor_panel_0, tensor_panel_1 = nombre_img_a_tensores(nom_img_ejemplo, paneles=2)

# Configuramos los subplots
fig, axes = plt.subplots(1, 2)  # 1 fila, 2 columnas

# Mostramos la primera imagen en el primer subplot
axes[0].imshow((tensor_panel_0 + 1) / 2)
axes[0].set_title('Panel 0')
axes[0].axis('off')

# Mostramos la segunda imagen en el segundo subplot
axes[1].imshow((tensor_panel_1 + 1) / 2)
axes[1].set_title('Panel 1')
axes[1].axis('off')

plt.show()

### Creación y Configuración de Datasets para Entrenamiento y Pruebas
creación y configuración de datasets para el entrenamiento y las pruebas. Utiliza las listas de nombres de imágenes de entrenamiento y prueba para crear datasets en TensorFlow, aplicando la función de preprocesamiento nombre_img_a_tensores a cada imagen. Los datasets se agrupan en lotes del tamaño definido por TAM_BATCH

In [None]:
# 1. Crea un dataset con los nombres de archivos de entrenamiento.
dataset_entrenamiento = tf.data.Dataset.from_tensor_slices(LISTA_NMS_ENTRENAMIENTO)
# Aplica una función para cargar y procesar cada pareja de imágenes.
dataset_entrenamiento = dataset_entrenamiento.map(nombre_img_a_tensores, num_parallel_calls=tf.data.experimental.AUTOTUNE)
# Agrupa las imágenes en lotes del tamaño definido por TAM_BATCH.
dataset_entrenamiento = dataset_entrenamiento.batch(TAM_BATCH)

# Carga de las imágenes de test.
# 2. Crea un dataset con los nombres de archivos de prueba.
dataset_prueba = tf.data.Dataset.from_tensor_slices(LISTA_NMS_PRUEBA)
dataset_prueba = dataset_prueba.map(nombre_img_a_tensores, num_parallel_calls=tf.data.experimental.AUTOTUNE)
dataset_prueba = dataset_prueba.batch(TAM_BATCH)

# Impresión de detalles de los datasets.
print("Dimensiones del conjunto de entrenamiento: ", dataset_entrenamiento)
print("Dimensiones del conjunto de prueba: ", dataset_prueba)
print("Tipo de datos del conjunto de entrenamiento: ", dataset_entrenamiento.element_spec)
print("Tipo de datos del conjunto de prueba: ", dataset_prueba.element_spec)


### Definición de Bloques Convolucionales
define dos funciones para crear bloques convolucionales que se utilizan en el modelo. La función downsample crea un bloque convolucional para reducir las dimensiones de la entrada, incluyendo una capa convolucional, normalización (opcional), y activación LeakyReLU. La función upsample realiza la operación inversa, aumentando las dimensiones a través de una capa convolucional transpuesta, normalización, dropout (opcional), y activación ReLU.

In [None]:
# definimos un bloque convolucional de reduccion de dimensiones
def downsample(filters, size=4, apply_batchnorm=True):
    # usamos la semilla para que los resultados sean reproducibles
    initializer = tf.random_normal_initializer(0., 0.02)

    result = Sequential ()
    # iniciamos con una distribucion normal
    
    # capa convolucional
    result.add(Conv2D(filters,
                    size,
                    strides=2,
                    padding='same',
                    kernel_initializer=initializer,
                    use_bias= not apply_batchnorm ))
    # capa de normalizacion
    if apply_batchnorm:
        result.add(BatchNormalization())
    # capa de activacion LeakyReLU (x) = max(0,x) + alpha * min(0,x)
    result.add(LeakyReLU())

    return result

# definimos un bloque convolucional de aumento de dimensiones
def upsample(filters, size=4, apply_dropout=False):
    # iniciamos con una distribucion normal
    initializer = tf.random_normal_initializer(0., 0.02)


    result = Sequential ()
    # capa convolucional
    result.add(Conv2DTranspose(filters,
                    size,
                    strides=2,
                    padding='same',
                    kernel_initializer=initializer,
                    use_bias= False ))
    # capa de normalizacion
    result.add(BatchNormalization())
    # capa de dropout
    if apply_dropout:
        result.add(Dropout(0.5))
    # capa de activacion
    result.add(ReLU())

    return result

In [None]:
# definimos el generador

def Generador():
    # La entrada tiene un shape de [None,None,3], lo que indica una imagen a color (RGB) con dimensiones variables.
    # 'None' en las dos primeras dimensiones permite flexibilidad en el tamaño de la imagen.
    # 'bs' denota "batch size", el número de imágenes procesadas en un lote.
    inputs = tf.keras.layers.Input(shape=[None, None, 3])

    # pila_reduccion contiene capas convolucionales que reducen la dimensión espacial de la imagen.
    down_stack = [
        downsample(64, apply_batchnorm=False), # Entrada: (bs, 256, 256, 3), Salida: (bs, 128, 128, 64)
        downsample(128),                       # Entrada: (bs, 128, 128, 64), Salida: (bs, 64, 64, 128)
        downsample(256),                       # Entrada: (bs, 64, 64, 128), Salida: (bs, 32, 32, 256)
        downsample(512),                       # Entrada: (bs, 32, 32, 256), Salida: (bs, 16, 16, 512)
        downsample(512),                       # Entrada: (bs, 16, 16, 512), Salida: (bs, 8, 8, 512)
        downsample(512),                       # Entrada: (bs, 8, 8, 512), Salida: (bs, 4, 4, 512)
        downsample(512),                       # Entrada: (bs, 4, 4, 512), Salida: (bs, 2, 2, 512)
        downsample(512),                       # Entrada: (bs, 2, 2, 512), Salida: (bs, 1, 1, 512)
    ]

      # pila_aumento contiene capas convolucionales que aumentan la dimensión espacial de la imagen.
    up_stack = [
        upsample(512, apply_dropout=True),# Entrada: (bs, 1, 1, 512),   # Salida: (bs, 2, 2, 1024)
        upsample(512, apply_dropout=True),# Entrada: (bs, 2, 2, 1024),  # Salida: (bs, 4, 4, 1024)
        upsample(512, apply_dropout=True),# Entrada: (bs, 4, 4, 1024),  # Salida: (bs, 8, 8, 1024)
        upsample(512),                    # Entrada: (bs, 8, 8, 1024),  # Salida: (bs, 16, 16, 1024)
        upsample(256),                    # Entrada: (bs, 16, 16, 1024),# Salida: (bs, 32, 32, 512)
        upsample(128),                    # Entrada: (bs, 32, 32, 512), # Salida: (bs, 64, 64, 256)
        upsample(64),                     # Entrada: (bs, 64, 64, 256), # Salida: (bs, 128, 128, 128)

    ]

    # inicializamos la capa de salida con una distribucion normal 
    initializer = tf.random_normal_initializer(0., 0.02)

    # capa de salida con una funcion de activacion tanh
    # Entrada: (bs, 128, 128, 128), Salida: (bs, 256, 256, CANALES_SALIDA)
    last = Conv2DTranspose(CANALES_SALIDA,
                           kernel_size=4,
                           strides=2,
                           padding='same',
                           kernel_initializer=initializer,
                           activation='tanh') 
    

    # conectamos las capas en cascada
    x = inputs
    skips = []

    concat = Concatenate()

    for reduccion in down_stack:
        x= reduccion(x) 
        skips.append(x)
    
    skips = reversed(skips[:-1])

    for aumento, salto in zip(up_stack, skips):
        x = aumento(x)
        x = concat([x, salto])
    
    last = last(x)
    return Model(inputs=inputs, outputs=last)


In [None]:
# arquitectura del discriminador

def Discriminator():
    img_entrada = Input(shape=[None, None, 3], name='input_image')
    img_generada = Input(shape=[None, None, 3], name='target_image')

    concat = Concatenate()([img_entrada, img_generada])

    initializer = tf.random_normal_initializer(0., 0.02)

    # definimos las capas convolucionales conectadas en cascada para la reduccion
    reduccion_1 = downsample(64, apply_batchnorm=False)(concat) 
    reduccion_2 = downsample(128)(reduccion_1)
    reduccion_3 = downsample(256)(reduccion_2)
    reduccion_4 = downsample(512)(reduccion_3)

    capa_final = tf.keras.layers.Conv2D(filters=1,
                                    kernel_size=4,
                                    strides=1,
                                    kernel_initializer=initializer,
                                    padding='same')(reduccion_4)
    return tf.keras.Model(inputs=[img_entrada, img_generada], outputs=capa_final)


### !!! AVISO !!!
debido a problemas con el guardado del modelo usando downsample y upsample, aqui está configurado un nuevo generador con las mismas características que el anterior, pero sin usar las funciones downsample y upsample. 

### Definición de los Modelos Generador y Discriminador
define dos modelos clave para una Red Generativa Antagónica (GAN): el Generador y el Discriminador. El modelo Generador utiliza capas convolucionales para transformar una entrada en una imagen generada, empleando una serie de capas de reducción y aumento para procesar los datos. El modelo Discriminador evalúa pares de imágenes (una real y una generada) y decide si la imagen generada es realista. Ambos modelos utilizan capas convolucionales, normalización por lotes, y activaciones LeakyReLU y ReLU, esenciales en la arquitectura de una GAN

In [None]:

def Generador():
    entradas = Input(shape=[None, None, 3])

    # Inicializador común para las capas convolucionales
    inicializador = tf.random_normal_initializer(0., 0.02)

    # Pila de reducción
    pila_reduccion = [
        # Capa convolucional, sin normalización batch
        # Entrada: (bs, 256, 256, 3) 
        Sequential([
            Conv2D(64, 4, strides=2, padding='same', kernel_initializer=inicializador, use_bias=True),
            LeakyReLU()
        ]),#Salida: (bs, 128, 128, 64)
        # Capas convolucionales con normalización batch
        # Entrada: (bs, 128, 128, 64)
        Sequential([
            Conv2D(128, 4, strides=2, padding='same', kernel_initializer=inicializador),
            BatchNormalization(),
            LeakyReLU()
        ]), # Salida: (bs, 64, 64, 128)
        # Entrada: (bs, 64, 64, 128),   
        Sequential([
            Conv2D(256, 4, strides=2, padding='same', kernel_initializer=inicializador),
            BatchNormalization(),
            LeakyReLU()
        ]),#Salida: (bs, 32, 32, 256) 
        # Entrada: (bs, 32, 32, 256),
        Sequential([
            Conv2D(512, 4, strides=2, padding='same', kernel_initializer=inicializador),
            BatchNormalization(),
            LeakyReLU()
        ]),#Salida: (bs, 16, 16, 512)
        # Entrada: (bs, 16, 16, 512),
        Sequential([
            Conv2D(512, 4, strides=2, padding='same', kernel_initializer=inicializador),
            BatchNormalization(),
            LeakyReLU()
        ]),#Salida: (bs, 8, 8, 512)
        # Entrada: (bs, 8, 8, 512),  
        Sequential([
            Conv2D(512, 4, strides=2, padding='same', kernel_initializer=inicializador),
            BatchNormalization(),
            LeakyReLU()
        ]),#Salida: (bs, 4, 4, 512)
        # Entrada: (bs, 4, 4, 512),  
        Sequential([
            Conv2D(512, 4, strides=2, padding='same', kernel_initializer=inicializador),
            BatchNormalization(),
            LeakyReLU()
        ]),#Salida: (bs, 2, 2, 512)
        # Entrada: (bs, 2, 2, 512),  
        Sequential([
            Conv2D(512, 4, strides=2, padding='same', kernel_initializer=inicializador),
            BatchNormalization(),
            LeakyReLU()
        ]),#Salida: (bs, 1, 1, 512)
    ]

    # Pila de aumento
    pila_aumento = [
        # Entrada: (bs, 1, 1, 512),   
        Sequential([
            Conv2DTranspose(512, 4, strides=2, padding='same', kernel_initializer=inicializador),
            BatchNormalization(),
            Dropout(0.5),
            ReLU()
        ]),# Salida: (bs, 2, 2, 1024)
        # Entrada: (bs, 2, 2, 1024),  
        Sequential([
            Conv2DTranspose(512, 4, strides=2, padding='same', kernel_initializer=inicializador),
            BatchNormalization(),
            Dropout(0.5),
            ReLU()
        ]),# Salida: (bs, 4, 4, 1024)
        # Entrada: (bs, 4, 4, 1024),  
        Sequential([
            Conv2DTranspose(512, 4, strides=2, padding='same', kernel_initializer=inicializador),
            BatchNormalization(),
            Dropout(0.5),
            ReLU()
        ]),# Salida: (bs, 8, 8, 1024)
        # Entrada: (bs, 8, 8, 1024),  
        Sequential([
            Conv2DTranspose(512, 4, strides=2, padding='same', kernel_initializer=inicializador),
            BatchNormalization(),
            ReLU()
        ]),# Salida: (bs, 16, 16, 1024)
        # Entrada: (bs, 16, 16, 1024),
        Sequential([
            Conv2DTranspose(256, 4, strides=2, padding='same', kernel_initializer=inicializador),
            BatchNormalization(),
            ReLU()
        ]),# Salida: (bs, 32, 32, 512)
        # Entrada: (bs, 32, 32, 512), 
        Sequential([
            Conv2DTranspose(128, 4, strides=2, padding='same', kernel_initializer=inicializador),
            BatchNormalization(),
            ReLU()
        ]),# Salida: (bs, 64, 64, 256)
        # Entrada: (bs, 64, 64, 256), 
        Sequential([
            Conv2DTranspose(64, 4, strides=2, padding='same', kernel_initializer=inicializador),
            BatchNormalization(),
            ReLU()
        ]),# Salida: (bs, 128, 128, 128)
    ]

    # Capa de salida final con activación tanh
    capa_final = Conv2DTranspose(CANALES_SALIDA, 4, strides=2, padding='same', kernel_initializer=inicializador, activation='tanh')

    # Conectar las capas en cascada
    x = entradas
    omisiones = []
    concat = Concatenate()

    for reduccion in pila_reduccion:
        x = reduccion(x)
        omisiones.append(x)

    omisiones = reversed(omisiones[:-1])

    for aumento, omision in zip(pila_aumento, omisiones):
        x = aumento(x)
        x = concat([x, omision])

    x = capa_final(x)

    return Model(inputs=entradas, outputs=x)



def Discriminador():
    img_entrada = Input(shape=[None, None, 3], name='img_entrada')
    img_generada = Input(shape=[None, None, 3], name='img_generada')

    concat = Concatenate()([img_entrada, img_generada])

    inicializador = tf.random_normal_initializer(0., 0.02)

    # Definimos las capas convolucionales conectadas en cascada para la reducción
    # Primera capa de reducción, sin normalización batch
    reduccion_1 = Sequential([
        Conv2D(64, 4, strides=2, padding='same', kernel_initializer=inicializador, use_bias=True),
        LeakyReLU()
    ])(concat)

    # Capas sucesivas de reducción con normalización batch
    reduccion_2 = Sequential([
        Conv2D(128, 4, strides=2, padding='same', kernel_initializer=inicializador),
        BatchNormalization(),
        LeakyReLU()
    ])(reduccion_1)

    reduccion_3 = Sequential([
        Conv2D(256, 4, strides=2, padding='same', kernel_initializer=inicializador),
        BatchNormalization(),
        LeakyReLU()
    ])(reduccion_2)

    reduccion_4 = Sequential([
        Conv2D(512, 4, strides=2, padding='same', kernel_initializer=inicializador),
        BatchNormalization(),
        LeakyReLU()
    ])(reduccion_3)

    # Capa final
    capa_final = Conv2D(filters=1,
                        kernel_size=4,
                        strides=1,
                        kernel_initializer=inicializador,
                        padding='same')(reduccion_4)

    return Model(inputs=[img_entrada, img_generada], outputs=capa_final)


### Funciones de Coste para el Entrenamiento de Generador y Discriminador
define las funciones de coste para entrenar los modelos de Generador y Discriminador en una Red Generativa Antagónica (GAN). Utiliza la entropía cruzada binaria para calcular la diferencia entre las distribuciones de probabilidad. La función perdida_generador evalúa la autenticidad de las imágenes generadas y la similitud L1 con las imágenes objetivo. La función perdida_discriminador evalúa la capacidad del Discriminador para distinguir entre imágenes reales y generadas. Estas funciones son cruciales para guiar el proceso de aprendizaje y optimización en las GANs

In [None]:
# funciones de coste

# BinaryCrossentropy es una funcion que calcula la perdida de entropia cruzada entre dos distribuciones de probabilidad
# from_logits=True indica que la funcion de perdida espera que la salida del discriminador sea una distribucion de probabilidad
objeto_funcion_perdida = tf.keras.losses.BinaryCrossentropy(from_logits=True)

# definimos la funcion de coste para el generador
def perdida_generador(disc_generated_output, gen_output, target):
    # calculamos la perdida para las imagenes generadas que es la salida del discriminador para las imagenes generadas
    # tf.ones_like devuelve un tensor con la misma forma que el tensor de entrada pero con todos los valores a 1
    # disc_generated_output es la salida del discriminador para las imagenes generadas
    perdida_img_generada = objeto_funcion_perdida(tf.ones_like(disc_generated_output), disc_generated_output)

    # calculamos la perdida L1 entre la imagen generada y la imagen objetivo
    # tf.reduce_mean calcula la media de los valores de un tensor
    # gen_output es la imagen generada
    # target es la imagen objetivo
    perdida_distancia_l1 = tf.reduce_mean(tf.abs(target - gen_output))

    # calculamos la perdida total como la suma de la perdida gan y la perdida L1
    # Lambda es un parametro que controla la importancia de la perdida L1 en comparacion con la perdida gan
    perdida_total_generador = perdida_img_generada + (LAMBDA * perdida_distancia_l1)

    return perdida_total_generador

def perdida_discriminador(disc_real_output, disc_generated_output):
    # calculamos la perdida para las imagenes reales que es la salida del discriminador para las imagenes reales
    # tf.ones_like devuelve un tensor con la misma forma que el tensor de entrada pero con todos los valores a 1
    # disc_real_output es la salida del discriminador para las imagenes reales
    #  
    perdida_img_real = objeto_funcion_perdida(tf.ones_like(disc_real_output), disc_real_output)

    # calculamos la perdida para las imagenes generadas que es la salida del discriminador para las imagenes generadas
    # tf.zeros_like devuelve un tensor con la misma forma que el tensor de entrada pero con todos los valores a 0
    # disc_generated_output es la salida del discriminador para las imagenes generadas
    perdida_img_generada = objeto_funcion_perdida(tf.zeros_like(disc_generated_output), disc_generated_output)

    # calculamos la perdida total como la suma de las perdidas para las imagenes reales y generadas
    perdida_total_del_discriminador = perdida_img_real + perdida_img_generada

    return perdida_total_del_discriminador

### Inicialización de Modelos y Optimizadores
se instancian los modelos de Generador y Discriminador definidos previamente, y se configuran los optimizadores para ambos. Se utiliza el optimizador Adam con una tasa de aprendizaje de 
2×10^−4 y un valor de beta_1 de 0.5. Estos optimizadores se aplicarán en el proceso de entrenamiento de las redes neuronales, facilitando la actualización de los pesos y mejorando el rendimiento del modelo

In [None]:
generador = Generador()
discriminador = Discriminador()
optimizador_generador = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)
optimizador_discriminador = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)


In [None]:
# Seleccionar una imagen aleatoria
imagen_seleccionada = LISTA_TOTAL_NOMBRES[np.random.randint(0, len(LISTA_TOTAL_NOMBRES))]
pareja_tensores = nombre_img_a_tensores(imagen_seleccionada, paneles=2)
tensor_entrada = tf.expand_dims(pareja_tensores[0], 0)

# Generar una imagen usando el Generador
tensor_generado = generador(tensor_entrada, training=False)

# elimina la normalizacion de las imagenes para que el discriminador pueda procesarlas y 
# los convierte en int
imagen_generada = ((tensor_generado[0, ...,1])+1)*255
# Evaluar la imagen generada usando el Discriminador
tensor_discriminado = discriminador([tensor_entrada, tensor_generado], training=False)
imagen_tensor_discriminado = (tensor_discriminado[0, ...,-1]+1)*255
# Visualización
plt.figure(figsize=(12, 6))

# Visualizar la imagen generada
plt.subplot(1, 2, 1)
plt.title("Imagen Generada")
plt.imshow(imagen_generada)
plt.axis('off')

# Visualizar la salida del Discriminador
plt.subplot(1, 2, 2)
plt.title("Evaluación del Discriminador")
# Ajustar la escala de colores según la salida del discriminador
plt.imshow(imagen_tensor_discriminado)
plt.colorbar()
plt.axis('off')

plt.show()

In [None]:

# Checkpoints - puntos de control del modelo

checkpoint = tf.train.Checkpoint(optimizador_generador=optimizador_generador,
                                optimizador_discriminador=optimizador_discriminador,
                                generador=generador,
                                discriminador=discriminador)

latest_checkpoint = tf.train.latest_checkpoint(RUTA_CHECKPOINTS)
if latest_checkpoint:
    checkpoint.restore(latest_checkpoint).assert_consumed()
    print(f"Restaurado desde {latest_checkpoint}")
else:
    print("Chekpoint no encontrado, entrenamiento inicial")

In [None]:
# Generamos imagenes con el generador entrenado en el checkpoint
def generar_imagen(modelo_generador, img_prueba, img_objetivo=None, guardar_archivo=False, mostrar_imgs=False):
    # Generamos la imagen
    img_generada = modelo_generador(img_prueba, training=True)

    # Mostramos la imagen generada
    if mostrar_imgs:
        plt.figure(figsize=(15,15))

        titulos = ['Entrada', 'Generada']
        lista_imgs_a_mostrar = [img_prueba[0], img_generada[0]]

        # Si img_objetivo no es None, incluirlo en la visualización
        if img_objetivo is not None:
            titulos.insert(1, 'Realidad')
            lista_imgs_a_mostrar.insert(1, img_objetivo[0])

        # Mostrar las imágenes
        for i, img in enumerate(lista_imgs_a_mostrar):
            plt.subplot(1, len(lista_imgs_a_mostrar), i+1)
            plt.title(titulos[i])
            # Ajustar los valores de los píxeles entre 0 y 1 si es necesario
            plt.imshow(img * 0.5 + 0.5)
            plt.axis('off')
        plt.show()

    # Guardar la imagen generada
    if guardar_archivo:
        tf.keras.preprocessing.image.save_img(guardar_archivo, ((img_generada[0] + 1) * 127.5).numpy().astype("uint8"))



### Rutina de entrenamiento

In [None]:
# paso de entrenamiento
@tf.function
def paso_entrenamiento(tensor_entrada, tensor_objetivo):
    with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
        # generamos la imagen
        tensor_generado = generador(tensor_entrada, training=True)
        discriminacion_tensor_generado = discriminador([tensor_generado, tensor_entrada], training=True)
        discriminacion_tensor_objetivo = discriminador([tensor_objetivo, tensor_entrada], training=True)
        valor_perdida_discriminador = perdida_discriminador(discriminacion_tensor_objetivo, discriminacion_tensor_generado)
        valor_perdida_generador = perdida_generador(discriminacion_tensor_generado, tensor_generado, tensor_objetivo)

        # calculamos los gradientes
        gradientes_generador = gen_tape.gradient(valor_perdida_generador, generador.trainable_variables)
        gradientes_discriminador = disc_tape.gradient(valor_perdida_discriminador, discriminador.trainable_variables)
        optimizador_generador.apply_gradients(zip(gradientes_generador, generador.trainable_variables))
        optimizador_discriminador.apply_gradients(zip(gradientes_discriminador, discriminador.trainable_variables))

# rutina de entrenamiento
# definimos la funcion de entrenamiento
from IPython.display import clear_output

def entrenamiento(dataset, epocas):
    
    for epoca in range(epocas):
        tiempo_inicio = time.time()
        img_i = 0
        for tensor_entrada, tensor_objetivo in dataset:
            print("Epoca:", epoca, "Imagen:", img_i, "de", len(LISTA_NMS_ENTRENAMIENTO))
            img_i += 1
            paso_entrenamiento(tensor_entrada, tensor_objetivo)
            #clear_output(wait=True)
        
        img_i = 0

        # Verifica que el directorio de salida exista
        ruta_salida = f"{RUTA_DATASET}/resultados_entrenamiento"
        if not os.path.exists(ruta_salida):
            os.makedirs(ruta_salida)

        # Genera imágenes para el entrenamiento
        for entrada_vias , objetivo in dataset_prueba.take(25):
            img_folder = f"{ruta_salida}/imagen_{img_i}"
            if not os.path.exists(img_folder):
                os.makedirs(img_folder)
                
            save_filename = f"{img_folder}/img{img_i}_epoca_{epoca:03d}.png"
            generar_imagen(generador, entrada_vias, objetivo, guardar_archivo=save_filename)
            img_i += 1

        # Guardar cada n epocas
        if (epoca + 1) % AUTOGUARDADO_EPOCAS == 0:
            checkpoint.save(file_prefix=PREFIJO_CHECKPOINT)
            generador.save(f'{RUTA_CHECKPOINTS}/generador_epoca_{epoca+1}.h5')

        print(f'Tiempo por completar época {epoca + 1} es {time.time() - tiempo_inicio} seg\n')




### Entrenamiento del modelo ajustando el numero de epocas

se guarda el modelo entrenado cada N epocas en la ruta checkpoints

In [None]:
entrenamiento(dataset_entrenamiento, EPOCAS)

### Carga de un modelo entrenado

Carga de un modelo entrenado, la funcion a continuacion carga el ultimo modelo y queda listo para ser utilizado para generar imagenes dada una ruta para el archivo .png  de dimensiones 256x256 

In [None]:
def cargar_modelo_mas_reciente(ruta_checkpoints):
    # Expresión regular para extraer el número de época del nombre del archivo
    regex_epoca = r"generador_epoca_(\d+)\.h5"

    # Lista para almacenar los nombres de los archivos y los números de las épocas
    modelos = []

    # Recorrer todos los archivos en la carpeta
    for archivo in os.listdir(ruta_checkpoints):
        coincidencia = re.search(regex_epoca, archivo)
        if coincidencia:
            numero_epoca = int(coincidencia.group(1))
            ruta_completa = os.path.join(ruta_checkpoints, archivo)
            modelos.append((ruta_completa, numero_epoca))

    # Verificar si se encontraron modelos
    if not modelos:
        print("No se encontraron modelos .h5 en la ruta proporcionada.")
        return None

    # Ordenar los modelos por número de época (el más reciente primero)
    modelos.sort(key=lambda x: x[1], reverse=True)

    # Cargar y devolver el modelo más reciente
    modelo_mas_reciente = modelos[0][0]
    print(f"Cargando el modelo más reciente: {modelo_mas_reciente}")
    return tf.keras.models.load_model(modelo_mas_reciente)


# Cargar el modelo generador más reciente
modelo_generador = cargar_modelo_mas_reciente(RUTA_CHECKPOINTS)

# Elegir una imagen de prueba
lista_imagenes_de_prueba = os.listdir(RUTA_IMG_PRUEBA)
imagen_prueba_entrada = lista_imagenes_de_prueba[3]
print("Elegimos la imagen:", imagen_prueba_entrada)

# Preparar el tensor de entrada
tensor_entrada = nombre_img_a_tensores(imagen_prueba_entrada)
tensor_entrada = tf.expand_dims(tensor_entrada[0], 0)

# Utilizar generar_imagen para visualizar la salida
generar_imagen(modelo_generador, tensor_entrada, mostrar_imgs=True)


### Evaluacion del modelo
#### Generacion de imagenes dados checkpoints

In [None]:
## video mp4
import cv2
import os

def imagenes_a_video(ruta_carpeta, ruta_salida, fps=30):
    dir_trabajo = os.getcwd()
    ruta_absoluta_carpeta = os.path.join(dir_trabajo, ruta_carpeta)
    
    if not os.path.exists(ruta_absoluta_carpeta):
        print(f"La carpeta {ruta_absoluta_carpeta} no existe.")
        return

    archivos = [f for f in os.listdir(ruta_absoluta_carpeta) if os.path.isfile(os.path.join(ruta_absoluta_carpeta, f))]
    print(f"Número de archivos en {ruta_absoluta_carpeta}: {len(archivos)}")

    archivos.sort()
    
    # Leer la primera imagen para obtener las dimensiones
    img = cv2.imread(os.path.join(ruta_absoluta_carpeta, archivos[0]))
    altura, ancho, _ = img.shape
    
    # Establecer la configuración para el video de salida
    ruta_absoluta_salida = os.path.join(dir_trabajo, ruta_salida)
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(ruta_absoluta_salida, fourcc, fps, (ancho, altura))

    for archivo in archivos:
        ruta_img = os.path.join(ruta_absoluta_carpeta, archivo)
        img = cv2.imread(ruta_img)
        out.write(img)
        
    out.release()
    print(f"Video guardado en {ruta_absoluta_salida}")

# Ejemplo de uso en varias carpetas, un video por carpeta con las imágenes
for x in range(25):
    ruta_carpeta = f"dataset_geo_col_256/training_outputs/imagen_{x}"
    ruta_salida = f"dataset_geo_col_256/training_outputs/animacion_batch_500_epocas_200_img_{x}.mp4"
    imagenes_a_video(ruta_carpeta, ruta_salida, fps=6)


### Conceptos vinculados para entender antes de aplicar la Pérdida de Entropía Cruzada Sigmoide
Probabilidad y Estadísticas: Comprensión de la teoría básica de probabilidad y conceptos estadísticos.
Logaritmos: La función de pérdida utiliza logaritmos, por lo que una comprensión básica es esencial.
Cálculo: Específicamente, comprensión de derivadas y gradientes para la optimización.
Algoritmos de Optimización: Familiaridad con algoritmos como el Descenso de Gradiente que se utilizan para minimizar la función de pérdida.
Redes Neuronales: Comprensión básica de qué son las redes neuronales y cómo funcionan, incluida la propagación hacia adelante y hacia atrás.
Funciones de Activación: Comprensión de qué son las funciones de activación, con un enfoque en la función sigmoide.
Funciones de Pérdida: Comprensión general de qué son las funciones de pérdida y por qué se utilizan.
Clasificación Binaria: Comprensión de problemas de clasificación, específicamente clasificación binaria.
Operaciones de Tensor: Como estás trabajando con imágenes, entender cómo manipular tensores (matrices multidimensionales) es crucial.
Redes Neuronales Convolucionales (CNN): A menudo se utilizan en tareas relacionadas con imágenes, por lo que entender su arquitectura y función es beneficioso.
Redes Generativas: Dado que mencionaste redes generativas, entender las Redes Antagónicas Generativas (GAN) u otros modelos generativos sería útil.
Retropropagación: Comprensión de cómo se calculan los gradientes y se actualizan los pesos durante el entrenamiento.