# **RECONSTUCTION GAN**

En este cuaderno se va a realizar la reconstruccion imagenes que han sido previamente dañadas para obtener a partir de estas y gracias a una Red Generativa las imagenes originales. Para ello se va a usar el modelo Pix2Pix basado en el siguiente paper https://arxiv.org/abs/1611.07004

Esto se ha realizado para el trabajo de fin de master y obtencion del titulo de ingeniero en telecomunicaciones por la universidad de cantabria

In [None]:
# Se importan las librerias
import tensorflow as tf
import matplotlib.pyplot as plt
import numpy as np
from IPython import display

In [None]:
%tensorflow_version 2.x

In [None]:
tf.__version__

'2.3.0'

# **CARGA DE DATOS**

In [None]:
# Images path
PATH =                  # main path
PATH_RESULTS =          # results path
INPATH =                # input images path
OUPATH =                # hr images path
CKPATH =                # google colab checkpoints path

# Se listan las fotos de entrada, las urls
imgurls = !ls -1 "{INPATH}"
# Numero de imagenes que se van a usar
n = 1077                            # En funcion de las imagenes que me pase Adolfo <-------------------
# Usaré para entrenamiento el 80%
train_n = round(n*0.8)
# Se randomiza el listado de imagenes
randurls = np.copy(imgurls)
# Se barajean las url de las imagenes
np.random.seed()
np.random.shuffle(randurls)
# Se separan en bloque de train y de test
tr_urls = randurls[:train_n]
ts_urls = randurls[train_n:n]


Funciones que modifican las imagenes de entrada para aumentar el dataset y no condicionarlo

In [None]:
# Dimensiones a las que se quiere reajustar la imagen, depende del tamaño de las imagenes
width = 1024
heigth = 1024
# Se crea una funcion que reescale las imagenes
def resize(inimg, heigth, width):
  inimg = tf.image.resize(inimg, [heigth, width])   # se transforma la imagen de entrada (la que se ve mal)
  return inimg

In [None]:
# Se crea una funcion que normalice las imagenes para que esten entre -1 y 1
def normalize(inimg, tgimg):

  inimg = (inimg/127.5) - 1  
  tgimg = (tgimg/127.5) - 1

  return inimg, tgimg

In [None]:
@tf.function()
# Aumentacion de los datos: Random Crop + Flip, aleatoriamente se giraran algunas imagenes y de esta forma se aumentan los datos de entrada
def random_jitter(inimg, tgimg):
  # Se aumenta ligeramente (un 10%)
  inimg = resize(inimg, 1144, 1144)
  tgimg = resize(tgimg, 1144, 1144)
  # Se ponen ambas imagenes una encima de otra
  stacked_image = tf.stack([inimg, tgimg], axis = 0)
  # Se mueven un poco
  cropped_image = tf.image.random_crop(stacked_image, size = [2, heigth, width, 3])
  # Se separan
  inimg, tgimg = cropped_image[0], cropped_image[1]
  # El 50% de las veces se hara un flip
  if tf.random.uniform(()) > 0.5:
    inimg = tf.image.flip_left_right(inimg)
    tgimg = tf.image.flip_left_right(tgimg)
  
  return inimg,tgimg

In [None]:
# Funcion que carga las imagenes
def load_images(filename, augment = True):
  
  inimg = tf.cast(tf.image.decode_jpeg(tf.io.read_file(INPATH + '/' + filename)), tf.float32)[...,:3]
  tgimg = tf.cast(tf.image.decode_jpeg(tf.io.read_file(OUPATH + '/' + filename)), tf.float32)[...,:3]

  inimg = resize(inimg, 1024, 1024)
  tgimg = resize(tgimg, 1024, 1024)

  if augment:
    inimg, tgimg = random_jitter(inimg, tgimg)

  inimg, tgimg = normalize(inimg, tgimg)

  return inimg, tgimg

In [None]:
def load_train_image(filename):
  return load_images(filename,True)
def load_test_image(filename):
  return load_images(filename,False)

In [None]:
train_dataset = tf.data.Dataset.from_tensor_slices(tr_urls)
train_dataset = train_dataset.map(load_train_image, num_parallel_calls = tf.data.experimental.AUTOTUNE)
train_dataset = train_dataset.batch(1)

test_dataset = tf.data.Dataset.from_tensor_slices(ts_urls)
test_dataset = test_dataset.map(load_test_image, num_parallel_calls = tf.data.experimental.AUTOTUNE)
test_dataset = test_dataset.batch(1)


# **Modelo Pix2Pix**

Este modelo sigue el modelo GAN en el que dos redes neuronales compiten entre ellas siendo una la que genera imagenes falsas y otra la que decide si son reales o falsas.

El generador tiene una arquitectura enconder-decoder en el que primero se comprimira la informacion de las images y luego se ira descomprimiendo. 

**ENCODER**

El enconder tiene bloque que seran Covolucion-BatchNormalization-LeakyReLU estos bloques el paper los denomina "C" y les acompañará un numero en funcion del numero de filtros. La sucesion de bloques que forma el encoder es la siguiente:

C64-C128-C256-C512-C512-C512-C512-C512

Ademas de esto, el paper indica una serie de "modificiciones" para las capas:


*   A la primera capa (C64) no se le aplicará BatchNorm
*   Todas las convoluciones son de filtros espaciales de tamaño 4x4 con stride = 2
*   Los pesos a la hora de inicializarse se inicializan como ruido gaussiano de media 0 y desviacion 0.02



In [None]:
from tensorflow.keras import *
from tensorflow.keras.layers import *

In [None]:
tf.keras.__version__

'2.4.0'

In [None]:

# Funcion que define los bloques Conv-BatchNorm-ReLU
def downsample(filters, apply_batchnorm = True):
  
  result = Sequential()

  initializer = tf.random_normal_initializer(0, 0.02)
  # Capa convolucional
  result.add(Conv2D(filters,
                    kernel_size = 4,
                    strides = 2,
                    padding = "same",
                    kernel_initializer = initializer,
                    use_bias = not apply_batchnorm))
  if apply_batchnorm:
    # Capa de BatchNormalization
    result.add(BatchNormalization())
  # Capa de Activacion Leaky ReLU
  result.add(LeakyReLU())

  return result

**DECODER**

El decoder tiene bloque que seran Covolucion-BatchNormalization-Dropout-ReLU con un dropout del 50%. Estos bloques el paper los denomina "CD" y les acompañará un numero en funcion del numero de filtros. La sucesion de bloques que forma el encoder es la siguiente:

CD512-CD512-CD512-C512-C256-C128-C64

Ademas de esto, el paper indica una serie de "modificiciones" para las capas:


*   A la primera capa (C64) no se le aplicará BatchNorm
*   Todas las convoluciones son de filtros espaciales de tamaño 4x4 con stride = 2
*   Los pesos a la hora de inicializarse se inicializan como ruido gaussiano de media 0 y desviacion 0.02

In [None]:
# Funcion que define los bloques Conv-BatchNorm-ReLU
def upsample(filters, apply_dropout = False):
  
  result = Sequential()

  initializer = tf.random_normal_initializer(0, 0.02)
  # Capa convolucional
  result.add(Conv2DTranspose( filters,
                              kernel_size = 4,
                              strides = 2,
                              padding = "same",
                              kernel_initializer = initializer,
                              use_bias = False))
  # Capa de BatchNormalization
  result.add(BatchNormalization())
  if apply_dropout:
    # Capa de Dropout
    result.add(Dropout(0.5))
  # Capa de Activacion Leaky ReLU
  result.add(ReLU())

  return result

**GENERADOR**

El generador sigue el modelo U-NET en el trabajn conjuntamente el econder y el decoder


![texto alternativo](https://qph.fs.quoracdn.net/main-qimg-78a617ec1de942814c3d23dab7de0b24)

In [None]:
def Generator():
  # Se especifica la capa de entrada especificandole las dimensiones, no se especifican el alto y ancho pero si los canales de color
  inputs = tf.keras.layers.Input(shape = [None, None, 3])
  # Se especifica la lista de bloques del encoder
  down_stack = [
               downsample(64, apply_batchnorm = False),   # (bs, 128, 128, 64)        # Esto muestra mas o menos la dimensiones (batch_size, height, width, mapas de caracteristicas)
               downsample(128),                           # (bs, 64, 64, 128)
               downsample(256),                           # (bs, 32, 32, 256)
               downsample(512),                           # (bs, 16, 16, 512)
               downsample(512),                           # (bs, 8, 8, 512)
               downsample(512),                           # (bs, 4, 4, 512)
               downsample(512),                           # (bs, 2, 2, 512)
               downsample(512),                           # (bs, 1, 1, 512)
  ]
  # Se especifica la lista de bloques del decoder
  up_stack = [
              upsample(512, apply_dropout = True),        # (bs, 2, 2, 1024)
              upsample(512, apply_dropout = True),        # (bs, 4, 4, 1024)
              upsample(512, apply_dropout = True),        # (bs, 8, 8, 1024)
              upsample(512),                              # (bs, 16, 16, 1024)
              upsample(256),                              # (bs, 32, 32, 512)
              upsample(128),                              # (bs, 64, 64, 256)
              upsample(64),                               # (bs, 128, 128, 128)
              upsample(32),                               # (bs, 256, 256, 64)
              upsample(16)                                # (bs, 512, 512, 32)      
  ]
  # Se crea la capa final que formará la imagen
  initializer = tf.random_normal_initializer(0, 0.02)
  last = Conv2DTranspose(filters = 3,
                         kernel_size = 4,
                         strides = 2,
                         padding = "same",
                         kernel_initializer = initializer,
                         activation = "tanh")
  
  # Se conectan las capas del encoder
  x = inputs
  skips = []

  concat = Concatenate()

  for down in down_stack:
    x = down(x)
    skips.append(x)

  skips = reversed(skips[:-1])
  
  # Se conectan las capas del decoder
  for up, sk in zip(up_stack, skips):

    x = up(x)
    x = concat([x, sk])
  # Se pasa por la ultima capa que es la que forma la imagen
  last = last(x)

  return Model(inputs = inputs, outputs = last)

In [None]:
def resize(inimg, heigth, width):

  inimg = tf.image.resize(inimg, [heigth, width])   # se transforma la imagen de entrada (la que se ve mal)

  return inimg

In [None]:
generator = Generator()
tf.keras.utils.plot_model(generator, to_file= "model.png", show_shapes=True, show_layer_names= True, dpi=64)

**DISCRIMINADOR**

El discriminador que presenta el paper no es de los normales, es decir no devulve un escalar indicando si la imagen generada es real o fake comparandola con una imagen real, sino que devuelve una especia de mapa en el que se indica que partes de la imagen son reales y cuales no.

El discriminador sigue la siguiente arquitectura
C64-C128-C256-C512

In [None]:
def Discriminator():
  # El discriminador recibe dos entradas la imagen original y la generada
  ini = Input(shape=[None, None, 3], name = "input_img")
  gen = Input(shape=[None, None, 3], name = "gener_img")
  # Las va a concatener y poner una encima de la otra
  con = concatenate([ini, gen])

  initializer = tf.random_normal_initializer(0, 0.02)

  down1 = downsample(64, apply_batchnorm = False)(con)
  down2 = downsample(128)(down1)
  down3 = downsample(256)(down2)
  down4 = downsample(512)(down3)

  last = tf.keras.layers.Conv2D(filters = 1,
                                kernel_size = 4,
                                strides = 1,
                                kernel_initializer = initializer,
                                padding = "same")(down4)
  
  return tf.keras.Model(inputs = [ini, gen], outputs = last)

In [None]:
discriminator = Discriminator()
tf.keras.utils.plot_model(generator, show_shapes=True, dpi=64)

**FUNCIONES DE COSTE**

El `real_loss`evalua la diferencia entre el resultado del discriminador al observar una imagen real comparandolo con el resultado idoneo es decir una imagen 100% real que seria una matriz a 1s

El `generated_loss` lo que esta haciendo es comprobar el resultado del discriminador al observar la imagen que ha sido generada por el Generador y compararla con una imagen 100% falsa que seria una matriz de 0s

Estas dos componentes evaluan si el trabajo del Discriminador es el adecuado

In [None]:
loss_object = tf.keras.losses.BinaryCrossentropy(from_logits = True)

def discriminator_loss(disc_real_output, disc_generated_output):
    # Diferencia entre los true por ser real y el deectado por el discriminador
    real_loss = loss_object(tf.ones_like(disc_real_output), disc_real_output)
    # Diferencia entre los false por ser generado y el detectado por el discriminador
    generated_loss = loss_object(tf.zeros_like(disc_generated_output), disc_generated_output)

    total_loss = real_loss + generated_loss
    
    return total_loss

El generador tiene dos objetivos, uno de ellos es generar una imagen realista, y su otro objetivo es conseguir qeu el error del discriminador se maximice. Por eso se le pasa la imagen que ha generado `gen_output`, la imagen objetivo `target` y el mapa que ha generado el discriminador al observar su imagen `disc_generated_output`

Se computan dos errores diferentes, el adversario `gan_loss` y la diferencia absoluta por pixeles `l1_loss`

In [None]:
LAMBDA = 100

def generator_loss(disc_generated_output, gen_output, target):

  gan_loss = loss_object(tf.ones_like(disc_generated_output), disc_generated_output)

  # mean absolute error
  l1_loss = tf.reduce_mean(tf.abs(target - gen_output))

  total_gen_loss = gan_loss + (LAMBDA + l1_loss)

  return total_gen_loss

**OPTIMIZADORES**

In [None]:
# Optimizadores con los hiperparamtros que especifica el paper
generator_optimizer     = tf.keras.optimizers.Adam(2e-4, beta_1 = 0.5)
discriminator_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1 = 0.5)

**CHECKPOINT DEL ENTRENAMIENTO**

Con esto si Google Colab se cierra o tiene un error, como se van guardando los optimizadores y el proceso del entrenamiento pues no hay que empezar de cero

In [None]:
import os
checkpoint_prefix = os.path.join(CKPATH, "chpt")
checkpoint = tf.train.Checkpoint(generator_optimizer = generator_optimizer,
                                 discriminator_optimizer = discriminator_optimizer,
                                 generator = generator,
                                 discriminator = discriminator)

#checkpoint.restore(tf.train.latest_checkpoint(CKPATH)).assert_consumed()



**EVALUACION DEL MODELO DURANTE EL ENTRENAMIENTO**

In [None]:
def generate_images(model, test_input, tar, epoch, save_filename = False, display_images = True):
  prediction = model(test_input, training=True)

  if save_filename and epoch % 10 == 0:
    tf.keras.preprocessing.image.save_img(PATH_RESULTS + '/Predicted/' + save_filename + '_pre.jpg', prediction[0,...])
  if save_filename and epoch == 0:
    tf.keras.preprocessing.image.save_img(PATH_RESULTS + '/Target/' + save_filename + '_tar.jpg',tar[0,...])
    tf.keras.preprocessing.image.save_img(PATH_RESULTS + '/Input/' + save_filename + '_input.jpg',test_input[0,...])

  plt.figure(figsize=(15,15))

  display_list = [test_input[0], tar[0], prediction[0]]
  title = ['Input Image', 'Ground Truth', 'Predicted Image']

  if display_images:
    for i in range(3):
      plt.subplot(1, 3, i+1)
      plt.title(title[i])
      # getting the pixel values between [0, 1] to plot it.
      plt.imshow(display_list[i] * 0.5 + 0.5)
      plt.axis('off')

  plt.show()

# **ENTRENAMIENTO**

In [None]:
import datetime
log_dir="logs/"

summary_writer = tf.summary.create_file_writer(
  log_dir + "fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S"))

In [None]:
@tf.function()
def train_step(input_image, target, epoch):
  
  
  with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
    # Generador
    output_image = generator(input_image, training = True)
    # Discriminador
    output_gen_disc = discriminator([output_image, input_image], training = True)
    output_trg_disc = discriminator([target, input_image], training = True)
    # Funciones de pérdida
    disc_loss = discriminator_loss(output_trg_disc, output_gen_disc)
    gen_loss   = generator_loss(output_gen_disc, output_image, target)
    # Gradientes
    generator_grads    = gen_tape.gradient(gen_loss, generator.trainable_variables)
    discrminator_grads = disc_tape.gradient(disc_loss, discriminator.trainable_variables)
    # Optimizador
    generator_optimizer.apply_gradients(zip(generator_grads, generator.trainable_variables))
    discriminator_optimizer.apply_gradients(zip(discrminator_grads, discriminator.trainable_variables))
    with summary_writer.as_default():
      tf.summary.scalar('gen_total_loss', gen_loss, step=epoch)
      tf.summary.scalar('disc_loss', disc_loss, step=epoch)

In [None]:
from IPython.display import clear_output

def train(dataset, epochs):

  for epoch in range(epochs):

    imgi = 0
    for input_image, target in dataset:
      print('epoch ' + str(epoch) + '- train: ' + str(imgi) + '/' + str(len(tr_urls)))
      imgi += 1
      train_step(input_image, target, epoch)
      clear_output(wait = True)
    
    imgi = 0
    for inp, tar in test_dataset.take(5):
      generate_images(generator, inp, tar, epoch, str(imgi) + '_' + str(epoch), display_images = True)
      imgi += 1
    # Guardar (checkpoint) el modelo cada 20 epocas
    if(epoch + 1) % 20 == 0:
      checkpoint.save(file_prefix = checkpoint_prefix)
    if(epoch + 1) % 20 == 0:
      generator.save('') # save model path

In [None]:
# Uncomment if you want restore a checkpoint
# checkpoint.restore(tf.train.latest_checkpoint(CKPATH))

<tensorflow.python.training.tracking.util.CheckpointLoadStatus at 0x7f4905101cf8>

In [None]:
# restoring the latest checkpoint in checkpoint_dir


train(train_dataset, 61)
generator.save('/content/drive/My Drive/DeepLearning/recons+sr/Model/recons_sr_GAN_finish.h5')
print("Finish training")