# Laboratorio 10 A - Generacion de texto con RNN.

Este laboratorio demuestra cómo generar texto usando una character-based RNN. Trabajará con un conjunto de datos de los escritos de Game Of Thrones. Dada una secuencia de caracteres a partir de estos datos, se entrena un modelo para predecir el siguiente carácter de la secuencia. Se pueden generar secuencias de texto más largas llamando al modelo repetidamente.
El siguiente es el resultado de muestra cuando el modelo de este tutorial se entrenó durante 30 épocas y comenzó con el mensaje "The Targaryens":

<pre>
The Targaryens and the other lefts had shared a bench. He may even let them get a grimmer if you storm under the stables. 

We'll want to her when she'd leave the toscat, Grand Maester Pycelle told him. It meant the lord Freth put us his shoulder, and the ilinit is a primate foolide as beft by a babe as well. Yet Jon Arryn's meat was fled, Arya could taste it. Then he finished faintly in her need, but massive resumed he was needed to below. And so Dany came up, offry frightened him.  I am sorry, my princess. The whores are brought by the traitor. 

There was a stout man in long and hard.

Robb lifted his shoulders with his back whise arm and skirts and moonbalat, amession behind him, quick as a snake of bone and spit. Catelyn knelt beside her, Sansa had eyes shownered in Arya rushing toward him and heard the soft cross and spinned deep beet away and ran in half, defiant.  How long do I do?  he asked her.
</pre>

## Preparación del entorno.

Si no estamos parados en el repo, clonar y cd al repo. Esto nos permite usar el mismo notebook tanto local como en Google Colab.

In [None]:
import os

REPO_NAME = "lab10"
if REPO_NAME not in os.getcwd():
  if not os.path.exists(REPO_NAME):
    !git clone https://github.com/FCEIA-AAII/{REPO_NAME}.git
  os.chdir(REPO_NAME)

# Install tensorflow 2.15
!pip install tensorflow==2.15.0

Importar librerías

In [None]:
import tensorflow as tf
import numpy as np
import os
import time

Establecer GPU por defecto en caso de estar disponible.

In [None]:
# Configurar para que TensorFlow utilice la GPU por defecto
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        # Configurar para que TensorFlow asigne memoria dinámicamente
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        # Especificar la GPU por defecto
        logical_gpus = tf.config.experimental.list_logical_devices('GPU')
        print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPUs")
    except RuntimeError as e:
        # Manejar error
        print(e)

## Leer la data

Primero, miremos el texto:

In [None]:
# Read, then decode for py2 compat.
text = open("game_of_thrones.txt", 'rb').read().decode(encoding='utf-8')
# length of text is the number of characters in it
print(f'Length of text: {len(text)} characters')

In [None]:
# Take a look at the first 250 characters in text
print(text[:250])

In [None]:
# The unique characters in the file
vocab = sorted(set(text))
print(f'{len(vocab)} unique characters')

## Preprocesamiento

### Vectorizacion del texto

Previo al entrenamiento, necesitamos convertir el texto a una representacion numerica. 

La capa `tf.keras.layers.StringLookup` nos permite convertir cada caracter en un ID numerico. Solo necesita que el texto este separado primero en tokens.

In [None]:
example_texts = ['abcdefg', 'xyz']

chars = tf.strings.unicode_split(example_texts, input_encoding='UTF-8')
chars

Ahora creamos la capa `tf.keras.layers.StringLookup`:

In [None]:
ids_from_chars = tf.keras.layers.StringLookup(
    vocabulary=list(vocab), mask_token=None)

Esto nos convierte de tokens a IDs de caracteres:

In [None]:
ids = ids_from_chars(chars)
ids

Dado que el proposito de este laboratorio es generar texto, tambien sera importante invertir esta representacion y recuperar texto legible desde estos IDs. Para esto podemos usar `tf.keras.layers.StringLookup(..., invert=True)`.

Nota: Aquí, en lugar de pasar el vocabulario original generado con `sorted(set(text))`, usamos el método `get_vocabulary()` de la capa `tf.keras.layers.StringLookup` para que los tokens `[UNK]` se configuren de la misma manera.

In [None]:
chars_from_ids = tf.keras.layers.StringLookup(
    vocabulary=ids_from_chars.get_vocabulary(), invert=True, mask_token=None)

Esta capa recupera los caracteres desde los vectores de IDs y los retorna como un `tf.RaggedTensor` de caracteres:

In [None]:
chars = chars_from_ids(ids)
chars

Finalmente usando `tf.strings.reduce_join` se pueden volver a juntar los caracteres en texto.

In [None]:
tf.strings.reduce_join(chars, axis=-1).numpy()

In [None]:
def text_from_ids(ids):
  return tf.strings.reduce_join(chars_from_ids(ids), axis=-1)

### Prediccion

Dado un caracter, o una secuencia de caracteres, ¿cuál es el siguiente caracter más probable? Esta es la tarea para la que estamos entrenando al modelo. La entrada al modelo será una secuencia de caracteres y entrenamos el modelo para predecir la salida: el siguiente carácter en cada paso de tiempo.

Dado que los RNN mantienen un estado interno que depende de los elementos vistos anteriormente, a partir de todos los caracteres calculados hasta este momento, ¿cuál es el siguiente carácter?

### Crear los ejemplos de entrenamiento

Dividimos el texto en secuencias de ejemplo. Cada secuencia de entrada contendrá `seq_length` caracteres del texto.

Para cada secuencia de entrada, los targets correspondientes contienen la misma longitud de texto, excepto que se desplazan un carácter hacia la derecha.

Así que divida el texto en fragmentos de `seq_length+1`. Por ejemplo, digamos que `seq_length` es 3 y nuestro texto es "Hola". La secuencia de entrada sería "Hol" y la secuencia target "ola".

Para hacer esto, usamos la función `tf.data.Dataset.from_tensor_slices` para convertir el vector de texto en una secuencia de índices de caracteres.

In [None]:
all_ids = ids_from_chars(tf.strings.unicode_split(text, 'UTF-8'))
all_ids

In [None]:
ids_dataset = tf.data.Dataset.from_tensor_slices(all_ids)

In [None]:
for ids in ids_dataset.take(10):
    print(chars_from_ids(ids).numpy().decode('utf-8'))

In [None]:
seq_length = 100

El método `batch` nos permite convertir fácilmente estos caracteres individuales en secuencias del tamaño deseado.

In [None]:
sequences = ids_dataset.batch(seq_length+1, drop_remainder=True)

for seq in sequences.take(1):
  print(chars_from_ids(seq))

Es mas facil ver lo que esta haciendo si unimos de vuelta los tokens en texto:

In [None]:
for seq in sequences.take(5):
  print(text_from_ids(seq).numpy())

Para el entrenamiento, necesitaremos un conjunto de datos de pares `(input, label)`. Donde `input` y
`label` son secuencias. En cada timestep, la entrada es el carácter actual y la etiqueta es el siguiente carácter.

Aquí hay una función que toma una secuencia como entrada, la duplica y la desplaza para alinear la entrada y la etiqueta para cada timestep:

In [None]:
def split_input_target(sequence):
    input_text = sequence[:-1]
    target_text = sequence[1:]
    return input_text, target_text

In [None]:
split_input_target(list("Tensorflow"))

In [None]:
dataset = sequences.map(split_input_target)

In [None]:
for input_example, target_example in dataset.take(1):
    print("Input :", text_from_ids(input_example).numpy())
    print("Target:", text_from_ids(target_example).numpy())

### Batches de entrenamiento

Usamos `tf.data` para dividir el texto en secuencias manejables. Pero antes de introducir estos datos en el modelo, es necesario mezclarlos y batchearlos.

In [None]:
# Batch size
BATCH_SIZE = 64

# Buffer size to shuffle the dataset
# (TF data is designed to work with possibly infinite sequences,
# so it doesn't attempt to shuffle the entire sequence in memory. Instead,
# it maintains a buffer in which it shuffles elements).
BUFFER_SIZE = 10000

dataset = (
    dataset
    .shuffle(BUFFER_SIZE)
    .batch(BATCH_SIZE, drop_remainder=True)
    .prefetch(tf.data.experimental.AUTOTUNE))

dataset

## Construccion del modelo

En esta sección definimos el modelo como una subclase de `keras.Model` (para obtener más detalles, consulte [Making new Layers and Models via subclassing](https://www.tensorflow.org/guide/keras/custom_layers_and_models)).

Este modelo tiene tres capas:

* `tf.keras.layers.Embedding`: La capa de entrada. Una lookup table entrenable que asignará cada ID de carácter a un vector con dimensiones `embedding_dim`;
* `tf.keras.layers.GRU`: una capa recurrente GRU de tamaño `units=rnn_units` (también se puede usar una capa LSTM aquí).
* `tf.keras.layers.Dense`: La capa de salida, con salidas `vocab_size`. Genera un logit para cada carácter del vocabulario. Estas son las probabilidades de cada caracter según el modelo.

In [None]:
# Length of the vocabulary in StringLookup Layer
vocab_size = len(ids_from_chars.get_vocabulary())

# The embedding dimension
embedding_dim = 256

# Number of RNN units
rnn_units = 1024

In [None]:
class MyModel(tf.keras.Model):
  def __init__(self, vocab_size, embedding_dim, rnn_units):
    super().__init__(self)
    self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
    self.gru = tf.keras.layers.GRU(rnn_units,
                                   return_sequences=True,
                                   return_state=True)
    self.dense = tf.keras.layers.Dense(vocab_size)

  def call(self, inputs, states=None, return_state=False, training=False):
    x = inputs
    x = self.embedding(x, training=training)
    if states is None:
      states = self.gru.get_initial_state(x)
    x, states = self.gru(x, initial_state=states, training=training)
    x = self.dense(x, training=training)

    if return_state:
      return x, states
    else:
      return x

In [None]:
model = MyModel(
    vocab_size=vocab_size,
    embedding_dim=embedding_dim,
    rnn_units=rnn_units)

Por cada caracter el modelo calcula su embedding, corre la GRU un timestep con el embedding como entrada y aplica la capa densa para generar los logits prediciendo la probabilidades del siguiente caracter.

![A drawing of the data passing through the model](images/text_generation_training.png)

Nota: Para el entrenamiento, se puede utilizar un modelo `keras.Sequential`. Para generar texto más adelante, necesitaremos administrar el estado interno de la RNN. Es más sencillo incluir las opciones de entrada y salida de estado por adelantado que reorganizar la arquitectura del modelo más adelante. Para obtener más detalles, consulte [Keras RNN guide](https://www.tensorflow.org/guide/keras/rnn#rnn_state_reuse).

## Probar el modelo

Ejecutamos el modelo para ver que se comporta como se esperaba.

Primero verificamos la shape de salida:

In [None]:
for input_example_batch, target_example_batch in dataset.take(1):
    example_batch_predictions = model(input_example_batch)
    print(example_batch_predictions.shape, "# (batch_size, sequence_length, vocab_size)")

En el ejemplo anterior, la longitud de la secuencia de la entrada es `100`, pero el modelo se puede ejecutar con entradas de cualquier longitud:

In [None]:
model.summary()

Para obtener predicciones reales del modelo, se deben tomar muestras de la distribución de salida para obtener índices de caracteres reales. Esta distribución está definida por los logits sobre el vocabulario de los caracteres.

Nota: Es importante tomar una muestra de esta distribución, ya que tomar el _argmax_ de la distribución puede fácilmente hacer que el modelo se atasque en un bucle.

Tomando como ejemplo el primero del batch:

In [None]:
sampled_indices = tf.random.categorical(example_batch_predictions[0], num_samples=1)
sampled_indices = tf.squeeze(sampled_indices, axis=-1).numpy()

Esto nos da para cada timestep una predicción del siguiente índice de caracteres:

In [None]:
sampled_indices

Por ultimo los decodificamos para ver el texto predicho por este modelo no entrenado:

In [None]:
print("Input:\n", text_from_ids(input_example_batch[0]).numpy())
print()
print("Next Char Predictions:\n", text_from_ids(sampled_indices).numpy())

## Entrenamiento del modelo

El problema puede tratarse como un problema de clasificación estándar. Dado el estado RNN anterior y la entrada en este timestep, predice la clase del siguiente carácter.

### Agregamos un optimizador y una funcion costo

La función de pérdida estándar `tf.keras.losses.sparse_categorical_crossentropy` funciona en este caso porque se aplica en la última dimensión de las predicciones.

Debido a que su modelo devuelve logits, necesita configurar el indicador `from_logits`.

In [None]:
loss = tf.losses.SparseCategoricalCrossentropy(from_logits=True)

In [None]:
example_batch_mean_loss = loss(target_example_batch, example_batch_predictions)
print("Prediction shape: ", example_batch_predictions.shape, " # (batch_size, sequence_length, vocab_size)")
print("Mean loss:        ", example_batch_mean_loss)

Un modelo recién inicializado no debería estar demasiado seguro de sí mismo, todos los logits de salida deberían tener magnitudes similares. Para confirmar esto, puede comprobar que la exponencial del costo medio es aproximadamente igual al tamaño del vocabulario. Una pérdida mucho mayor significa que el modelo está seguro de sus respuestas incorrectas y está mal inicializado:

In [None]:
tf.exp(example_batch_mean_loss).numpy()

Compilamos el modelo con `tf.keras.Model.compile` indicando el optimizador y la funcion costo:

In [None]:
model.compile(optimizer='adam', loss=loss)

### Checkpoints del modelo

Usamos el callback `tf.keras.callbacks.ModelCheckpoint` para que se guarden checkpoints del modelo durante el entrenamiento.

In [None]:
# Directory where the checkpoints will be saved
checkpoint_dir = './training_checkpoints'
# Name of the checkpoint files
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")

checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_prefix,
    save_weights_only=True)

### Ejecucion del entrenamiento

Para mantener un tiempo de entrenamiento razonable, utilice entre 10 y 20 épocas para entrenar el modelo. En Colab, configure el tiempo de ejecución en GPU para un entrenamiento más rápido.

In [None]:
EPOCHS = 20

In [None]:
history = model.fit(dataset, epochs=EPOCHS, callbacks=[checkpoint_callback])

## Generacion de texto

La forma más sencilla de generar texto con este modelo es ejecutarlo en un bucle y realizar un seguimiento del estado interno del modelo a medida que lo ejecutamos.

![Para generar texto, la salida del modelo se retroalimenta a la entrada](images/text_generation_sampling.png)

Cada vez que llamamos al modelo, pasamos algún texto y un estado interno. El modelo devuelve una predicción para el siguiente caracter y su nuevo estado. Vuelva a pasar la predicción y el estado para continuar generando texto.


Lo siguiente hace una predicción de un solo paso:

In [None]:
class OneStep(tf.keras.Model):
  def __init__(self, model, chars_from_ids, ids_from_chars, temperature=1.0):
    super().__init__()
    self.temperature = temperature
    self.model = model
    self.chars_from_ids = chars_from_ids
    self.ids_from_chars = ids_from_chars

    # Create a mask to prevent "[UNK]" from being generated.
    skip_ids = self.ids_from_chars(['[UNK]'])[:, None]
    sparse_mask = tf.SparseTensor(
        # Put a -inf at each bad index.
        values=[-float('inf')]*len(skip_ids),
        indices=skip_ids,
        # Match the shape to the vocabulary
        dense_shape=[len(ids_from_chars.get_vocabulary())])
    self.prediction_mask = tf.sparse.to_dense(sparse_mask)

  @tf.function
  def generate_one_step(self, inputs, states=None):
    # Convert strings to token IDs.
    input_chars = tf.strings.unicode_split(inputs, 'UTF-8')
    input_ids = self.ids_from_chars(input_chars).to_tensor()

    # Run the model.
    # predicted_logits.shape is [batch, char, next_char_logits]
    predicted_logits, states = self.model(inputs=input_ids, states=states,
                                          return_state=True)
    # Only use the last prediction.
    predicted_logits = predicted_logits[:, -1, :]
    predicted_logits = predicted_logits/self.temperature
    # Apply the prediction mask: prevent "[UNK]" from being generated.
    predicted_logits = predicted_logits + self.prediction_mask

    # Sample the output logits to generate token IDs.
    predicted_ids = tf.random.categorical(predicted_logits, num_samples=1)
    predicted_ids = tf.squeeze(predicted_ids, axis=-1)

    # Convert from token ids to characters
    predicted_chars = self.chars_from_ids(predicted_ids)

    # Return the characters and model state.
    return predicted_chars, states

In [None]:
one_step_model = OneStep(model, chars_from_ids, ids_from_chars)

Lo ejecutamos en un bucle para generar texto. Al observar el texto generado, veremos que el modelo sabe cuándo poner mayúsculas, hacer párrafos e imita un vocabulario de escritura similar al de Game Of Thrones. Con el reducido número de épocas de entrenamiento, todavía no ha aprendido a formar frases coherentes.

In [None]:
start = time.time()
states = None
next_char = tf.constant(['The Targaryens'])
result = [next_char]

for n in range(1000):
  next_char, states = one_step_model.generate_one_step(next_char, states=states)
  result.append(next_char)

result = tf.strings.join(result)
end = time.time()
print(result[0].numpy().decode('utf-8'), '\n\n' + '_'*80)
print('\nRun time:', end - start)

Lo más fácil que podemos hacer para mejorar los resultados es entrenarlo por más tiempo (prueba con `EPOCHS = 30`).

También puede experimentar con una secuencia de inicio diferente, intentar agregar otra capa RNN para mejorar la precisión del modelo o ajustar el parámetro de temperatura para generar predicciones más o menos aleatorias.

Si queremos que el modelo genere texto *más rápido*, lo más fácil que se puede hacer es generar el texto por batches. En el siguiente ejemplo, el modelo genera 5 resultados aproximadamente en el mismo tiempo que tomó generar 1 arriba.

In [None]:
start = time.time()
states = None
next_char = tf.constant(['The Targaryens', 'The Targaryens', 'The Targaryens', 'The Targaryens', 'The Targaryens'])
result = [next_char]

for n in range(1000):
  next_char, states = one_step_model.generate_one_step(next_char, states=states)
  result.append(next_char)

result = tf.strings.join(result)
end = time.time()
print(result, '\n\n' + '_'*80)
print('\nRun time:', end - start)