<a href="https://colab.research.google.com/github/Zeta36/pix2pix_algebra_basica/blob/master/pix2pix_algebra_basica.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

##### Copyright 2019 Samuel Graván (https://github.com/Zeta36).

Licensed under the Apache License, Version 2.0 (the "License");

##### Scope del proyecto

El siguiente proyecto ha sido desarrollado para participar en el reto creado por Dot CSV (https://www.youtube.com/watch?v=BNgAaCK920E): "¡Gana una NVidia RTX 2080 SUPER con el Reto DotCSV!" 

En concreto, se ha hecho uso de la tecnología Pix2Pix para desarrollar un modelo capaz de "ver" una imagen con una expresión algebraica sencilla; por ejemplo: 5 - 2, y que sea capaz de generar otra imagen donde se imprima la respuesta. En el caso de ejemplo, una imagen con el número "3"

##### Otra información de interés

Este modelo no utiliza un dataset estático, sino que en cada iteración de entrenamiento genera una expresión algebraica aleatoria (que posteriormente se transforma en una imagen). Para conseguir dataset de pruebas (test) con expresiones no vistas durante el entrenamiento, se restringe la generación de estas expresiones para que no generen entradas con ciertos valores concretos.

El modelo es funcional y parece que generaliza bien a expresiones no vistas durante el entrenamiento.

También sería fácil ampliar este desarrollo añadiendo nuevos tipos de letras, fuentes, tamaños, orientaciones, ruido, etc., de modo que el modelo pudiese realmente computar estas expresiones algebraicas sencillas simplemente tras un "vistazo" a una imagen de entrada que incluya una expresión matemática de este estilo. 



## Importamos las librerías que vamos a utilizar y accedemos a nuestro Drive

In [0]:
from __future__ import absolute_import, division, print_function, unicode_literals

try:
  # %tensorflow_version only exists in Colab.
  !pip install tf-nightly-gpu-2.0-preview
except Exception:
  pass
import tensorflow as tf

import os
import time
import matplotlib.pyplot as plt
import numpy as np
import random
from IPython.display import clear_output
from PIL import Image, ImageDraw, ImageFont

from google.colab import drive
drive.mount('/content/drive/')

In [0]:
# Nos aseguramos que tenemos creada la carpeta de checkpoints y la de fonts

!ls "/content/drive/My Drive/training_checkpoints"
!ls "/content/drive/My Drive/fonts"

## Generación del dataset

Para este proyecto vamosa generar el dataset de manera dinámica por lo que no habrá que acceder ni descargar ningún fichero.

De cualquier modo, la imagen generada tendrá un tamaño de `286 x 286` sin necesidad de aplicar ningún filtro posterior (no haremos resize ni crop, ni random flip). Únicamete aplicaremos normalización. 


In [0]:
BUFFER_SIZE = 400
BATCH_SIZE = 1
IMG_WIDTH = 256
IMG_HEIGHT = 256

In [0]:
# Generador de expresiones algebraicas simples
def generar_expresion(testing):
  OPS = ['+', '-']
  MIN_NUM, MAX_NUM = 0, 20
  
  while True:
    left = random.randint(MIN_NUM, MAX_NUM)
    right = random.randint(MIN_NUM, MAX_NUM)
    operator = random.choice(OPS)

    if not testing and eval(str(left) + ' ' + operator + ' ' + str(right), {'__builtins__': None}) not in [8, 9, 11, 26]:              
      return str(left) + ' ' + operator + ' ' + str(right)
    elif testing and eval(str(left) + ' ' + operator + ' ' + str(right), {'__builtins__': None}) in [8, 9, 11, 26]:
      return str(left) + ' ' + operator + ' ' + str(right)
    

In [0]:
def generate_and_load_image(testing=False):
  expresion = generar_expresion(testing)    
  font = ImageFont.truetype('/content/drive/My Drive/fonts/arial.ttf', 65) 
  input_image = Image.new(mode = "RGB", size = (IMG_WIDTH, IMG_HEIGHT), 
                          color = (255, 255, 255))
  d = ImageDraw.Draw(input_image)
  d.text((10, 100), ' ' + expresion + ' ', fill=(255, 0, 0), font=font)
  
  real_image = Image.new(mode = "RGB", size = (IMG_WIDTH, IMG_HEIGHT), 
          color = (255, 255, 255))
  d = ImageDraw.Draw(real_image)
  d.text((10, 100), ' ' + str(eval(expresion, {'__builtins__': None})) + ' ', fill=(255, 0, 0), font=font)
  
  input_image = np.array(input_image)[:, :, 0:3]
  real_image = np.array(real_image)[:, :, 0:3]

  input_image = tf.cast(input_image, tf.float32)
  real_image = tf.cast(real_image, tf.float32)

  return input_image, real_image

In [0]:
inp, re = generate_and_load_image()
# Probamos la generación de imágenes
plt.figure()
plt.imshow(inp/255.0)
plt.figure()
plt.imshow(re/255.0)

In [0]:
# normalizamos la imagen al rango [-1, 1]

def normalize(input_image, real_image):
  input_image = (input_image / 127.5) - 1
  real_image = (real_image / 127.5) - 1

  return input_image, real_image

In [0]:
def load_image_train():
  input_image, real_image = generate_and_load_image()
  input_image, real_image = normalize(input_image, real_image)

  return input_image, real_image

inp2, re2 = load_image_train()
# Probamos la generación de imágenes
plt.figure()
plt.imshow(inp2)
plt.figure()
plt.imshow(re2)

In [0]:
def load_image_test():
  input_image, real_image = generate_and_load_image(testing=True)
  input_image, real_image = normalize(input_image, real_image)

  return input_image, real_image

inp3, re3 = load_image_test()
# Probamos la generación de imágenes
plt.figure()
plt.imshow(inp3)
plt.figure()
plt.imshow(re3)

## Construimos el Generador (U-Net modificada)



In [0]:
OUTPUT_CHANNELS = 3

In [0]:
def downsample(filters, size, apply_batchnorm=True):
  initializer = tf.random_normal_initializer(0., 0.02)

  result = tf.keras.Sequential()
  result.add(
      tf.keras.layers.Conv2D(filters, size, strides=2, padding='same',
                             kernel_initializer=initializer, use_bias=False))

  if apply_batchnorm:
    result.add(tf.keras.layers.BatchNormalization())

  result.add(tf.keras.layers.LeakyReLU())

  return result

In [0]:
down_model = downsample(3, 4)
down_result = down_model(tf.expand_dims(inp, 0))
print (down_result.shape)

In [0]:
def upsample(filters, size, apply_dropout=False):
  initializer = tf.random_normal_initializer(0., 0.02)

  result = tf.keras.Sequential()
  result.add(
    tf.keras.layers.Conv2DTranspose(filters, size, strides=2,
                                    padding='same',
                                    kernel_initializer=initializer,
                                    use_bias=False))

  result.add(tf.keras.layers.BatchNormalization())

  if apply_dropout:
      result.add(tf.keras.layers.Dropout(0.5))

  result.add(tf.keras.layers.ReLU())

  return result

In [0]:
up_model = upsample(3, 4)
up_result = up_model(down_result)
print (up_result.shape)

In [0]:
def Generator():
  down_stack = [
    downsample(64, 4, apply_batchnorm=False), # (bs, 128, 128, 64)
    downsample(128, 4), # (bs, 64, 64, 128)
    downsample(256, 4), # (bs, 32, 32, 256)
    downsample(512, 4), # (bs, 16, 16, 512)
    downsample(512, 4), # (bs, 8, 8, 512)
    downsample(512, 4), # (bs, 4, 4, 512)
    downsample(512, 4), # (bs, 2, 2, 512)
    downsample(512, 4), # (bs, 1, 1, 512)
  ]

  up_stack = [
    upsample(512, 4, apply_dropout=True), # (bs, 2, 2, 1024)
    upsample(512, 4, apply_dropout=True), # (bs, 4, 4, 1024)
    upsample(512, 4, apply_dropout=True), # (bs, 8, 8, 1024)
    upsample(512, 4), # (bs, 16, 16, 1024)
    upsample(256, 4), # (bs, 32, 32, 512)
    upsample(128, 4), # (bs, 64, 64, 256)
    upsample(64, 4), # (bs, 128, 128, 128)
  ]

  initializer = tf.random_normal_initializer(0., 0.02)
  last = tf.keras.layers.Conv2DTranspose(OUTPUT_CHANNELS, 4,
                                         strides=2,
                                         padding='same',
                                         kernel_initializer=initializer,
                                         activation='tanh') # (bs, 256, 256, 3)

  concat = tf.keras.layers.Concatenate()

  inputs = tf.keras.layers.Input(shape=[None,None,3])
  x = inputs

  # Downsampling del modelo
  skips = []
  for down in down_stack:
    x = down(x)
    skips.append(x)

  skips = reversed(skips[:-1])

  # Upsampling del modelo y establecimiento de las skip connections
  for up, skip in zip(up_stack, skips):
    x = up(x)
    x = concat([x, skip])

  x = last(x)

  return tf.keras.Model(inputs=inputs, outputs=x)

In [0]:
generator = Generator()

gen_output = generator(inp[tf.newaxis,...], training=False)
plt.imshow(gen_output[0,...])

## Construimos el Discriminador (PatchGAN)


In [0]:
def Discriminator():
  initializer = tf.random_normal_initializer(0., 0.02)

  inp = tf.keras.layers.Input(shape=[None, None, 3], name='input_image')
  tar = tf.keras.layers.Input(shape=[None, None, 3], name='target_image')

  x = tf.keras.layers.concatenate([inp, tar]) # (bs, 256, 256, channels*2)

  down1 = downsample(64, 4, False)(x) # (bs, 128, 128, 64)
  down2 = downsample(128, 4)(down1) # (bs, 64, 64, 128)
  down3 = downsample(256, 4)(down2) # (bs, 32, 32, 256)

  zero_pad1 = tf.keras.layers.ZeroPadding2D()(down3) # (bs, 34, 34, 256)
  conv = tf.keras.layers.Conv2D(512, 4, strides=1,
                                kernel_initializer=initializer,
                                use_bias=False)(zero_pad1) # (bs, 31, 31, 512)

  batchnorm1 = tf.keras.layers.BatchNormalization()(conv)

  leaky_relu = tf.keras.layers.LeakyReLU()(batchnorm1)

  zero_pad2 = tf.keras.layers.ZeroPadding2D()(leaky_relu) # (bs, 33, 33, 512)

  last = tf.keras.layers.Conv2D(1, 4, strides=1,
                                kernel_initializer=initializer)(zero_pad2) # (bs, 30, 30, 1)

  return tf.keras.Model(inputs=[inp, tar], outputs=last)

In [0]:
discriminator = Discriminator()
disc_out = discriminator([inp[tf.newaxis,...], gen_output], training=False)
plt.imshow(disc_out[0,...,-1], vmin=-20, vmax=20, cmap='RdBu_r')
plt.colorbar()

## Definimos las funciones loss y el optimizador

In [0]:
LAMBDA = 100

In [0]:
loss_object = tf.keras.losses.BinaryCrossentropy(from_logits=True)

In [0]:
def discriminator_loss(disc_real_output, disc_generated_output):
  real_loss = loss_object(tf.ones_like(disc_real_output), disc_real_output)

  generated_loss = loss_object(tf.zeros_like(disc_generated_output), disc_generated_output)

  total_disc_loss = real_loss + generated_loss

  return total_disc_loss

In [0]:
def generator_loss(disc_generated_output, gen_output, target):
  gan_loss = loss_object(tf.ones_like(disc_generated_output), disc_generated_output)

  # Error absoluto medio
  l1_loss = tf.reduce_mean(tf.abs(target - gen_output))

  total_gen_loss = gan_loss + (LAMBDA * l1_loss)

  return total_gen_loss

In [0]:
generator_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)
discriminator_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)

## Checkpoints (Object-based saving)

In [0]:
checkpoint_dir = '/content/drive/My Drive/training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
checkpoint = tf.train.Checkpoint(generator_optimizer=generator_optimizer,
                                 discriminator_optimizer=discriminator_optimizer,
                                 generator=generator,
                                 discriminator=discriminator)

## Restauramos el último checkpoint

In [0]:
!ls '/content/drive/My Drive/training_checkpoints'

In [0]:
# Restauramos el último checkpoint almacenado
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))

## Entrenamos el modelo


In [0]:
EPOCHS = 150

In [0]:
def generate_images(model, test_input, tar):
  prediction = model(test_input, training=True)
  plt.figure(figsize=(15,15))

  display_list = [test_input[0], tar[0], prediction[0]]
  title = ['Imagen Entrada', 'Imagen Resultado Real', 'Imagen Resultado Predicho']

  for i in range(3):
    plt.subplot(1, 3, i+1)
    plt.title(title[i])
    plt.imshow(display_list[i] * 0.5 + 0.5)
    plt.axis('off')
  plt.show()
  time.sleep(10) 

In [0]:
@tf.function
def train_step(input_image, target):
  with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
    gen_output = generator(input_image, training=True)

    disc_real_output = discriminator([input_image, target], training=True)
    disc_generated_output = discriminator([input_image, gen_output], training=True)

    gen_loss = generator_loss(disc_generated_output, gen_output, target)
    disc_loss = discriminator_loss(disc_real_output, disc_generated_output)

  generator_gradients = gen_tape.gradient(gen_loss,
                                          generator.trainable_variables)
  discriminator_gradients = disc_tape.gradient(disc_loss,
                                               discriminator.trainable_variables)

  generator_optimizer.apply_gradients(zip(generator_gradients,
                                          generator.trainable_variables))
  discriminator_optimizer.apply_gradients(zip(discriminator_gradients,
                                              discriminator.trainable_variables))

In [0]:
def fit(epochs):
  for epoch in range(epochs):
    # Entrenamos la época    
    for imgi in range(BUFFER_SIZE):
        input_image, target = load_image_train()
        print('epoch ' + str(epoch) + ' - train: ' + str(imgi)+'/'+str(BUFFER_SIZE))        
        train_step(tf.expand_dims(input_image, 0),tf.expand_dims(target, 0))
        clear_output(wait=True)
    
    # Visualizamos los avances logrados mediante el dataset de pruebas
    example_input, example_target = load_image_test()
    generate_images(generator, tf.expand_dims(example_input, 0), tf.expand_dims(example_target, 0))

    # Guardamos (checkpoint) el modelo cada 20 épocas
    if (epoch + 1) % 20 == 0:
      checkpoint.save(file_prefix = checkpoint_prefix)

In [0]:
fit(EPOCHS)

## Realizamos algunas pruebas utilizando el modelo previamente entrenado

---



In [0]:
# Probamos el modelo con datos no vistos durante el entrenamiento
example_input, example_target = load_image_test()
generate_images(generator, tf.expand_dims(example_input, 0), tf.expand_dims(example_target, 0))