# Generación de Texto de "Origen de las Especies" usando una  Recurrent Neuronal Network del tipo RNN básico, LSTM o GRU
Basado en https://www.tensorflow.org/tutorials/text/text_generation

1) Cargar las librerías:

In [None]:
#@title Librerías a usar
from __future__ import absolute_import, division, print_function, unicode_literals

import tensorflow as tf

import numpy as np
import os
import time

from google.colab import files 
import io

print("Librerías cargadas")

2) Cargar el texto base a procesar:

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

# directorio local en Google Drive
path = 'gdrive/My Drive/IA/demo ANIMALES/datos'  #@param {type:"string"}

In [None]:
nombre_archivo = "/La_Evolucion_De_Las_Especies.txt"  #@param {type:"string"}

# levanta el archivo de texto del Drive para procesar
text = open("".join([path, nombre_archivo]), 'rb').read().decode(encoding='utf-8')

print("> Archivo cargado:")
print (' -- Tamaño total del texto: {} caracteres'.format(len(text)))

# muestra los primeros 250 caracteres del texto
print("\n -- Ejemplo: \n", text[:250])

3) Preparar el texto base a procesar:

In [None]:
#@title Limpiar el texto
# eliminar saltos de línea y signos de puntuación especiales
text = text.replace('\n', ' ')
text = text.replace('\t', ' ')
text = text.replace('-', ' ')
text = text.replace(':', ' ')
text = text.replace(',', ' ')
text = text.replace(';', ' ')
text = text.replace('.', ' ')
text = text.replace('\'', ' ')
text = text.replace('"', ' ')
text = text.replace('`', ' ')
text = text.replace('(', ' ')
text = text.replace(')', ' ')
text = text.replace('!', ' ')
text = text.replace('?', ' ')
text = text.replace('<', ' ')
text = text.replace('>', ' ')
text = text.replace('=', ' ')
text = text.replace('/', ' ')
text = text.replace('@', ' ')
text = text.replace('_', ' ')
text = text.replace('  ', ' ')

# pasa todo a minúsculas
text = text.lower()

# eliminar acentos (reemplaza por letra sin acento)
text = text.replace('á', 'a')
text = text.replace('é', 'e')
text = text.replace('í', 'i')
text = text.replace('ó', 'o')
text = text.replace('ú', 'u')

# muestra los primeros 250 caracteres del texto
print("\n -- Ejemplo luego de preparar: \n", text[:250])

In [None]:
#@title Preparar texto para RNN

# The unique characters in the file
vocab = sorted(set(text))
print ('{} caracteres distintos detectados'.format(len(vocab)))

# Creating a mapping from unique characters to indices
char2idx = {u:i for i, u in enumerate(vocab)}
idx2char = np.array(vocab)

text_as_int = np.array([char2idx[c] for c in text])

print('{')
for char,_ in zip(char2idx, range(20)):
    print('  {:4s}: {:3d},'.format(repr(char), char2idx[char]))
print('  ...\n}')

# Muestra ejemplo de cómo se mapean los caracteres a valores numéricos
print ('{} <-------- > {}'.format(repr(text[:13]), text_as_int[:13]))

In [None]:
#@title Armar secuencias de texto y formatear

# determinar el largo máximo de la secuencia
if ((len(text)//101)<1000):
  seq_length = 50
else:
  seq_length = 100
examples_per_epoch = len(text)//(seq_length+1)
print("Largo de secuencias: ", seq_length)
print("\n")
print("Ejemplos por época: ", examples_per_epoch)

# Dividir en datos de entrenamiento y prueba, para ello divide el texto en secuencias donde 
#- la secuencia de la posición 0 a [seq_length] se considera de entrada, y 
#- la secuencia de la posición  [seq_length+1] al final es la de salida

# genera un vector de caracteres
char_dataset = tf.data.Dataset.from_tensor_slices(text_as_int)

# procesa para generar las secuencias el largo deseado
sequences = char_dataset.batch(seq_length+1, drop_remainder=True)

# muestra ejemplo
for item in sequences.take(5):
  print(repr(''.join(idx2char[item.numpy()])))


# genera las secuencias de entrada y salida
def split_input_target(chunk):
    input_text = chunk[:-1]
    target_text = chunk[1:]
    return input_text, target_text

datasetSeq = sequences.map(split_input_target)

print("DatasetSeq: ", datasetSeq, "\n")

# muestra ejemplo
for input_example, target_example in  datasetSeq.take(2):
  print ('Texto de Entrada: ', repr(''.join(idx2char[input_example.numpy()])))
  print ('Texto  de Salida:', repr(''.join(idx2char[target_example.numpy()])))
  print("\n")

In [None]:
#@title Ejemplos

# muestra entrada y salida por cada caracter
for i, (input_idx, target_idx) in enumerate(zip(input_example[:5], target_example[:5])):
    print("Step {:4d}".format(i))
    print("  Entrada: {} ({:s})".format(input_idx, repr(idx2char[input_idx])))
    print("  Salida Esperada: {} ({:s})".format(target_idx, repr(idx2char[target_idx])))

In [None]:
#@title Armar repositorio de datos de entrenamiento
# genera 'batch' de secuencias que se van a procesar en el entrenamiento

# Batch size
BATCH_SIZE = 64

# Buffer size to shuffle the dataset
# (TF data is designed to work with possibly infinite sequences,
# so it doesn't attempt to shuffle the entire sequence in memory. Instead,
# it maintains a buffer in which it shuffles elements).
BUFFER_SIZE = 100000

dataset = datasetSeq.shuffle(BUFFER_SIZE).batch(BATCH_SIZE, drop_remainder=True)

print("Dataset: ", dataset, "\n")

4) Especificar y preparar el modelo de la RNN a usar:

In [None]:
#@title Establecer modelo

# Seleccione el modelo a usar
seleccModel = 'LSTM'  #@param ["LSTM", "GRU", "RNN"]

# cantidad de neuronas RNN
rnn_units = 1024 #@param {type:"integer"}

# define el modelo a utilizar
if seleccModel == 'LSTM': 
    # define el modelo LSTM
    def build_model(vocab_size, embedding_dim, rnn_units, batch_size):
      model = tf.keras.Sequential([
        tf.keras.layers.Embedding(vocab_size, embedding_dim,
                                  batch_input_shape=[batch_size, None]),
        tf.keras.layers.LSTM(rnn_units,
                            return_sequences=True,
                            stateful=True,
                            recurrent_initializer='glorot_uniform'),
        tf.keras.layers.Dense(vocab_size)
      ])
      return model

    print("Modelo LSTM definido.")

elif seleccModel == 'GRU': 
    # define el modelo GRU
    def build_model(vocab_size, embedding_dim, rnn_units, batch_size):
      model = tf.keras.Sequential([
        tf.keras.layers.Embedding(vocab_size, embedding_dim,
                                  batch_input_shape=[batch_size, None]),
        tf.keras.layers.GRU(rnn_units,
                            return_sequences=True,
                            stateful=True,
                            recurrent_initializer='glorot_uniform'),
        tf.keras.layers.Dense(vocab_size)
      ])
      return model

    print("Modelo GRU definido.")

else: #if seleccModel == 'RNN': 
    # define el modelo RNN básico
    def build_model(vocab_size, embedding_dim, rnn_units, batch_size):
      model = tf.keras.Sequential([
        tf.keras.layers.Embedding(vocab_size, embedding_dim,
                                  batch_input_shape=[batch_size, None]),
        tf.keras.layers.SimpleRNN(rnn_units,
                            return_sequences=True,
                            stateful=True,
                            recurrent_initializer='glorot_uniform'),
        tf.keras.layers.Dense(vocab_size)
      ])
      return model

    print("Modelo RNN definido.")

# Length of the vocabulary in chars
vocab_size = len(vocab)

# The embedding dimension
embedding_dim = 256

# genera el modelo
model = build_model(
  vocab_size = len(vocab),
  embedding_dim=embedding_dim,
  rnn_units=rnn_units,
  batch_size=BATCH_SIZE)

model.summary()

# prepara variables auxiliares para el entrenamiento  de la RNN
for input_example_batch, target_example_batch in dataset.take(1):
  example_batch_predictions = model(input_example_batch)
  print(example_batch_predictions.shape, "# (batch_size, sequence_length, vocab_size)")

sampled_indices = tf.random.categorical(example_batch_predictions[0], num_samples=1)
sampled_indices = tf.squeeze(sampled_indices,axis=-1).numpy()

# compila el modelo para el entrenamiento  de la RNN
def loss(labels, logits):
  return tf.keras.losses.sparse_categorical_crossentropy(labels, logits, from_logits=True)

example_batch_loss  = loss(target_example_batch, example_batch_predictions)
print("Forma vector predicción: ", example_batch_predictions.shape, " # (batch_size, sequence_length, vocab_size)")
print("scalar_loss:      ", example_batch_loss.numpy().mean())

model.compile(optimizer='adam', loss=loss)

print("\nModelo generado", model)

5) Entrenar la RNN:

In [None]:
#@title Entrenar

# define donde se a almacenar la información de checkpoints

# Directory where the checkpoints will be saved
checkpoint_dir = './checkpoints/RNN_training_checkpoints'
# Name of the checkpoint files
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")

checkpoint_callback=tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_prefix,
    save_weights_only=True)

print("Checkpoints grabados en ", checkpoint_dir)
print("\n")

# ejecutar el entrenamiento (poner antes en GPU)
EPOCHS = 100
history = model.fit(dataset, epochs=EPOCHS, callbacks=[checkpoint_callback])

6) Probar la RNN entrenada:

In [None]:
#@title Cargar modelo entrenado 

# recupera la información del último checkpoint
tf.train.latest_checkpoint(checkpoint_dir)

modelPred = build_model(vocab_size, embedding_dim, rnn_units, batch_size=1)
modelPred.load_weights(tf.train.latest_checkpoint(checkpoint_dir))
modelPred.build(tf.TensorShape([1, None]))

modelPred.summary()

# define función auxiliar para devolver predicción de texto
def generate_text(model, temperature, start_string, num_generate=500):

  # Converting our start string to numbers (vectorizing)
  input_eval = [char2idx[s] for s in start_string]
  input_eval = tf.expand_dims(input_eval, 0)

  # Empty string to store our results
  text_generated = []

  # Here batch size == 1
  model.reset_states()
  for i in range(num_generate):
      predictions = model(input_eval)
      # remove the batch dimension
      predictions = tf.squeeze(predictions, 0)

      # using a categorical distribution to predict the word returned by the model
      predictions = predictions / temperature
      predicted_id = tf.random.categorical(predictions, num_samples=1)[-1,0].numpy()

      # We pass the predicted word as the next input to the model
      # along with the previous hidden state
      input_eval = tf.expand_dims([predicted_id], 0)

      text_generated.append(idx2char[predicted_id])

  return (start_string + ''.join(text_generated))


# ejecuta el modelo usando como entrada un texto para probar
print("\n\n--------------------------------------------------------------------------------------------\n")
print(generate_text(modelPred, temperature=1.0, start_string=u"darwin"))
print("\n--------------------------------------------------------------------------------------------\n\n")


In [None]:
#@title Probar generación de texto:

# Grado de "temperatura" u originalidad que va a generar el algoritmo:
# - cuanto más alto el valor, se genera texto "más sorprendente".
# - cuanto más bajo, se genera texto "más esperado".
originalidad =  0.9 #@param {type:"number"}

# Texto inicial para generar
texto_inicial = 'mamifero' #@param {type:"string" }

# Largo del texto a generar
largo_texto = 500 #@param {type:"integer" }

# ejecuta el modeo usando como entrada texto_inicial
print("\n\n--------------------------------------------------------------------------------------------\n")
print(generate_text(modelPred, temperature=originalidad, start_string=texto_inicial, num_generate=largo_texto))
print("\n--------------------------------------------------------------------------------------------\n\n")