<a href="https://colab.research.google.com/github/TimothyJan/RNN_Text_Generation/blob/main/Project_4_RNN_Text_Generation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Spring 2022 - CPSC 585-section-13883
Project 4 - Recurrent Neural Networks Text Generation
Sean Javiya
Timothy Jan
Timothy Kheang

In Project 4, we use an RNN-based language model to generate text for a creative application.

Goals for this project are:
<ul>
  <li>Reading about how RNNs can be used to generate text, and examining several different applications after training on different text corpora.</li>
  <li>Using a multi-layer RNN to train and sample from a character-level language model.</li>
  <li>Adapting and reusing published model code.</li>
  <li>Having some fun.</li>
</ul>

In [1]:
import tensorflow as tf
import numpy as np
import os
import time
import urllib.request, json 

Download the Elder Scroll's or Shakespearean dataset and read the data.

In [2]:
'''imported the elder scrolls json file'''
# with urllib.request.urlopen("https://raw.githubusercontent.com/hmi-utwente/video-game-text-corpora/master/The%20Elder%20Scrolls/data/imperial_library_20200626.json") as url:
#     data_dict = json.loads(url.read().decode())
# text=""
# for key in data_dict:
#   text += data_dict[key]['description'] +' ' + data_dict[key]['title'] + ' ' + data_dict[key]['text']
# text[:100]

path_to_file = tf.keras.utils.get_file('shakespeare.txt', 'https://storage.googleapis.com/download.tensorflow.org/data/shakespeare.txt')
# Read, then decode for py2 compat.
text = open(path_to_file, 'rb').read().decode(encoding='utf-8')
# length of text is the number of characters in it
print(f'Length of text: {len(text)} characters')
print(text[:100])

Downloading data from https://storage.googleapis.com/download.tensorflow.org/data/shakespeare.txt
Length of text: 1115394 characters
First Citizen:
Before we proceed any further, hear me speak.

All:
Speak, speak.

First Citizen:
You


Read the Data

In [3]:
# unique characters in the file
vocab = sorted(set(text))
print(f'{len(vocab)} unique characters')

65 unique characters


<h1>Process the text</h1>
<h2>Vectorize the text.</h2>
Before training, you need to convert the strings to a numerical representation.
The tf.keras.layers.StringLookup layer can convert each character into a numeric ID. It just needs the text to be split into tokens first.

In [4]:
example_texts = ['abcdefg', 'xyz']
chars = tf.strings.unicode_split(example_texts, input_encoding='UTF-8')
chars

<tf.RaggedTensor [[b'a', b'b', b'c', b'd', b'e', b'f', b'g'], [b'x', b'y', b'z']]>

Create the tf.keras.layers.StringLookup layer. It onverts from tokens to character IDs:

In [5]:
ids_from_chars = tf.keras.layers.StringLookup(
    vocabulary=list(vocab), mask_token=None)
ids = ids_from_chars(chars)
ids

<tf.RaggedTensor [[40, 41, 42, 43, 44, 45, 46], [63, 64, 65]]>

This layer recovers the characters from the vectors of IDs, and returns them as a tf.RaggedTensor of characters:

In [6]:
chars_from_ids = tf.keras.layers.StringLookup(
    vocabulary=ids_from_chars.get_vocabulary(), invert=True, mask_token=None)
chars = chars_from_ids(ids)
chars

<tf.RaggedTensor [[b'a', b'b', b'c', b'd', b'e', b'f', b'g'], [b'x', b'y', b'z']]>

Use tf.strings.reduce_join to join the characters back into strings.

In [7]:
tf.strings.reduce_join(chars, axis=-1).numpy()

array([b'abcdefg', b'xyz'], dtype=object)

In [8]:
def text_from_ids(ids):
  return tf.strings.reduce_join(chars_from_ids(ids), axis=-1)


<h2>The Prediction Task</h2>
Given a character, or a sequence of characters, what is the most probable next character? This is the task you're training the model to perform. The input to the model will be a sequence of characters, and you train the model to predict the output—the following character at each time step.

Since RNNs maintain an internal state that depends on the previously seen elements, given all the characters computed until this moment, what is the next character?

<h2>Create training examples and targets</h2>
Next divide the text into example sequences. Each input sequence will contain seq_length characters from the text.

For each input sequence, the corresponding targets contain the same length of text, except shifted one character to the right.

So break the text into chunks of seq_length+1. For example, say seq_length is 4 and our text is "Hello". The input sequence would be "Hell", and the target sequence "ello".

To do this first use the tf.data.Dataset.from_tensor_slices function to convert the text vector into a stream of character indices.

In [9]:
all_ids = ids_from_chars(tf.strings.unicode_split(text, 'UTF-8'))
all_ids

<tf.Tensor: shape=(1115394,), dtype=int64, numpy=array([19, 48, 57, ..., 46,  9,  1])>

In [10]:
ids_dataset = tf.data.Dataset.from_tensor_slices(all_ids)
for ids in ids_dataset.take(10):
    print(chars_from_ids(ids).numpy().decode('utf-8'))

F
i
r
s
t
 
C
i
t
i


The batch method lets you easily convert these individual characters to sequences of the desired size.

In [11]:
seq_length = 100
examples_per_epoch = len(text)//(seq_length+1)

sequences = ids_dataset.batch(seq_length+1, drop_remainder=True)

for seq in sequences.take(1):
  print(chars_from_ids(seq))

tf.Tensor(
[b'F' b'i' b'r' b's' b't' b' ' b'C' b'i' b't' b'i' b'z' b'e' b'n' b':'
 b'\n' b'B' b'e' b'f' b'o' b'r' b'e' b' ' b'w' b'e' b' ' b'p' b'r' b'o'
 b'c' b'e' b'e' b'd' b' ' b'a' b'n' b'y' b' ' b'f' b'u' b'r' b't' b'h'
 b'e' b'r' b',' b' ' b'h' b'e' b'a' b'r' b' ' b'm' b'e' b' ' b's' b'p'
 b'e' b'a' b'k' b'.' b'\n' b'\n' b'A' b'l' b'l' b':' b'\n' b'S' b'p' b'e'
 b'a' b'k' b',' b' ' b's' b'p' b'e' b'a' b'k' b'.' b'\n' b'\n' b'F' b'i'
 b'r' b's' b't' b' ' b'C' b'i' b't' b'i' b'z' b'e' b'n' b':' b'\n' b'Y'
 b'o' b'u' b' '], shape=(101,), dtype=string)


In [12]:
for seq in sequences.take(5):
  print(text_from_ids(seq).numpy())

b'First Citizen:\nBefore we proceed any further, hear me speak.\n\nAll:\nSpeak, speak.\n\nFirst Citizen:\nYou '
b'are all resolved rather to die than to famish?\n\nAll:\nResolved. resolved.\n\nFirst Citizen:\nFirst, you k'
b"now Caius Marcius is chief enemy to the people.\n\nAll:\nWe know't, we know't.\n\nFirst Citizen:\nLet us ki"
b"ll him, and we'll have corn at our own price.\nIs't a verdict?\n\nAll:\nNo more talking on't; let it be d"
b'one: away, away!\n\nSecond Citizen:\nOne word, good citizens.\n\nFirst Citizen:\nWe are accounted poor citi'


For training you'll need a dataset of (input, label) pairs. Where input and label are sequences. At each time step the input is the current character and the label is the next character.

Here's a function that takes a sequence as input, duplicates, and shifts it to align the input and label for each timestep:

In [13]:
def split_input_target(sequence):
    input_text = sequence[:-1]
    target_text = sequence[1:]
    return input_text, target_text

split_input_target(list("Tensorflow"))
dataset = sequences.map(split_input_target)

for input_example, target_example in dataset.take(1):
    print("Input :", text_from_ids(input_example).numpy())
    print("Target:", text_from_ids(target_example).numpy())

Input : b'First Citizen:\nBefore we proceed any further, hear me speak.\n\nAll:\nSpeak, speak.\n\nFirst Citizen:\nYou'
Target: b'irst Citizen:\nBefore we proceed any further, hear me speak.\n\nAll:\nSpeak, speak.\n\nFirst Citizen:\nYou '


<h2>Create training batches</h2>
You used tf.data to split the text into manageable sequences. But before feeding this data into the model, you need to shuffle the data and pack it into batches.

In [14]:
# Batch size
BATCH_SIZE = 64

# Buffer size to shuffle the dataset
# (TF data is designed to work with possibly infinite sequences,
# so it doesn't attempt to shuffle the entire sequence in memory. Instead,
# it maintains a buffer in which it shuffles elements).
BUFFER_SIZE = 10000

dataset = (
    dataset
    .shuffle(BUFFER_SIZE)
    .batch(BATCH_SIZE, drop_remainder=True)
    .prefetch(tf.data.experimental.AUTOTUNE))

dataset

<PrefetchDataset element_spec=(TensorSpec(shape=(64, 100), dtype=tf.int64, name=None), TensorSpec(shape=(64, 100), dtype=tf.int64, name=None))>

<h1>Build The Model</h1>
This section defines the model as a keras.Model subclass (For details see Making new Layers and Models via subclassing).

This model has three layers:
<ul>
  <li>tf.keras.layers.Embedding: The input layer. A trainable lookup table that will map each character-ID to a vector with embedding_dim dimensions;</li>
  <li>tf.keras.layers.GRU: A type of RNN with size units=rnn_units (You can also use an LSTM layer here.)</li>
  <li>tf.keras.layers.Dense: The output layer, with vocab_size outputs. It outputs one logit for each character in the vocabulary. These are the log-likelihood of each character according to the model.</li>
</ul>

In [15]:
# Length of the vocabulary in chars
vocab_size = len(vocab)

# The embedding dimension
embedding_dim = 256

# Number of RNN units
rnn_units = 1024

class MyModel(tf.keras.Model):
  def __init__(self, vocab_size, embedding_dim, rnn_units):
    super().__init__(self)
    self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
    self.gru = tf.keras.layers.GRU(rnn_units,
                                   return_sequences=True,
                                   return_state=True)
    self.dense = tf.keras.layers.Dense(vocab_size)

  def call(self, inputs, states=None, return_state=False, training=False):
    x = inputs
    x = self.embedding(x, training=training)
    if states is None:
      states = self.gru.get_initial_state(x)
    x, states = self.gru(x, initial_state=states, training=training)
    x = self.dense(x, training=training)

    if return_state:
      return x, states
    else:
      return x

model = MyModel(
    # Be sure the vocabulary size matches the `StringLookup` layers.
    vocab_size=len(ids_from_chars.get_vocabulary()),
    embedding_dim=embedding_dim,
    rnn_units=rnn_units)

<h1>Try the model</h1>
Now run the model to see that it behaves as expected.

First check the shape of the output:

In [16]:
for input_example_batch, target_example_batch in dataset.take(1):
    example_batch_predictions = model(input_example_batch)
    print(example_batch_predictions.shape, "# (batch_size, sequence_length, vocab_size)")

(64, 100, 66) # (batch_size, sequence_length, vocab_size)


In [17]:
model.summary()

Model: "my_model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 embedding (Embedding)       multiple                  16896     
                                                                 
 gru (GRU)                   multiple                  3938304   
                                                                 
 dense (Dense)               multiple                  67650     
                                                                 
Total params: 4,022,850
Trainable params: 4,022,850
Non-trainable params: 0
_________________________________________________________________


To get actual predictions from the model you need to sample from the output distribution, to get actual character indices. This distribution is defined by the logits over the character vocabulary.
using the first example in the batch, this gives us, at each timestep, a prediction of the next character index:

In [18]:
sampled_indices = tf.random.categorical(example_batch_predictions[0], num_samples=1)
sampled_indices = tf.squeeze(sampled_indices, axis=-1).numpy()
sampled_indices

array([ 0, 52,  6, 37, 53, 15, 44, 34,  8, 28, 33, 26, 53,  1,  1, 45, 37,
        3, 35, 13, 44, 52,  6, 44, 50, 55, 37, 28,  5, 52, 16, 49, 55, 47,
       16, 59, 55, 62, 62,  3, 39, 34, 59, 55, 49, 38, 51, 33,  0, 27, 31,
        5, 65, 22, 31, 25, 53, 52, 25, 22, 37,  2, 37, 52, 12, 45, 25, 59,
       26, 58, 57, 43,  2,  8, 29, 52, 41,  6, 35, 31, 37,  9, 54, 65, 21,
       37, 28, 10,  2, 21, 58, 18,  7, 32, 24,  5, 37, 12, 43, 35])

Decoded text predicted by untrained model:


In [19]:
print("Input:\n", text_from_ids(input_example_batch[0]).numpy())
print()
print("Next Char Predictions:\n", text_from_ids(sampled_indices).numpy())

Input:
 b'ingman:\nCome, we are fellows and friends: he was ever too\nhard for him; I have heard him say so hims'

Next Char Predictions:
 b"[UNK]m'XnBeU-OTMn\n\nfX!V?em'ekpXO&mCjphCtpww!ZUtpjYlT[UNK]NR&zIRLnmLIX Xm;fLtMsrd -Pmb'VRX.ozHXO3 HsE,SK&X;dV"


<h1>Train the model</h1>
Given the previous RNN state, and the input this time step, predict the class of the next character.
<h2>Attach an optimizer, and a loss function</h2>
The standard tf.keras.losses.sparse_categorical_crossentropy loss function works in this case because it is applied across the last dimension of the predictions.

Because our model returns logits, you need to set the from_logits flag.

In [20]:
loss = tf.losses.SparseCategoricalCrossentropy(from_logits=True)
example_batch_mean_loss = loss(target_example_batch, example_batch_predictions)
print("Prediction shape: ", example_batch_predictions.shape, " # (batch_size, sequence_length, vocab_size)")
print("Mean loss:        ", example_batch_mean_loss)

Prediction shape:  (64, 100, 66)  # (batch_size, sequence_length, vocab_size)
Mean loss:         tf.Tensor(4.1889195, shape=(), dtype=float32)


A newly initialized model shouldn't be too sure of itself, the output logits should all have similar magnitudes. To confirm this you can check that the exponential of the mean loss is approximately equal to the vocabulary size. A much higher loss means the model is sure of its wrong answers, and is badly initialized:

In [21]:
tf.exp(example_batch_mean_loss).numpy()

65.95149

Configure the training procedure using the tf.keras.Model.compile method. Use tf.keras.optimizers.Adam with default arguments and the loss function.

In [22]:
model.compile(optimizer='adam', loss=loss)

<h2>Configure Checkpoints</h2>
Use a tf.keras.callbacks.ModelCheckpoint to ensure that checkpoints are saved during training:

In [23]:
# Directory where the checkpoints will be saved
checkpoint_dir = './training_checkpoints'
# Name of the checkpoint files
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")

checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_prefix,
    save_weights_only=True)

<h2>Execute the training</h2>
To keep training time reasonable, use 10 epochs to train the model. In Colab, set the runtime to GPU for faster training.

In [30]:
EPOCHS = 5
history = model.fit(dataset, epochs=EPOCHS, callbacks=[checkpoint_callback])

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<h1>Generate text</h1>
The simplest way to generate text with this model is to run it in a loop, and keep track of the model's internal state as you execute it.<br>
Each time you call the model you pass in some text and an internal state. The model returns a prediction for the next character and its new state. Pass the prediction and state back in to continue generating text.

The following makes a single step prediction:

In [32]:
class OneStep(tf.keras.Model):
  def __init__(self, model, chars_from_ids, ids_from_chars, temperature=1.0):
    super().__init__()
    self.temperature = temperature
    self.model = model
    self.chars_from_ids = chars_from_ids
    self.ids_from_chars = ids_from_chars

    # Create a mask to prevent "[UNK]" from being generated.
    skip_ids = self.ids_from_chars(['[UNK]'])[:, None]
    sparse_mask = tf.SparseTensor(
        # Put a -inf at each bad index.
        values=[-float('inf')]*len(skip_ids),
        indices=skip_ids,
        # Match the shape to the vocabulary
        dense_shape=[len(ids_from_chars.get_vocabulary())])
    self.prediction_mask = tf.sparse.to_dense(sparse_mask)

  @tf.function
  def generate_one_step(self, inputs, states=None):
    # Convert strings to token IDs.
    input_chars = tf.strings.unicode_split(inputs, 'UTF-8')
    input_ids = self.ids_from_chars(input_chars).to_tensor()

    # Run the model.
    # predicted_logits.shape is [batch, char, next_char_logits]
    predicted_logits, states = self.model(inputs=input_ids, states=states,
                                          return_state=True)
    # Only use the last prediction.
    predicted_logits = predicted_logits[:, -1, :]
    predicted_logits = predicted_logits/self.temperature
    # Apply the prediction mask: prevent "[UNK]" from being generated.
    predicted_logits = predicted_logits + self.prediction_mask

    # Sample the output logits to generate token IDs.
    predicted_ids = tf.random.categorical(predicted_logits, num_samples=1)
    predicted_ids = tf.squeeze(predicted_ids, axis=-1)

    # Convert from token ids to characters
    predicted_chars = self.chars_from_ids(predicted_ids)

    # Return the characters and model state.
    return predicted_chars, states

In [33]:
one_step_model = OneStep(model, chars_from_ids, ids_from_chars)

Run it in a loop to generate some text. Looking at the generated text, you'll see the model knows when to capitalize, make paragraphs and imitates a Shakespeare-like writing vocabulary. With the small number of training epochs, it has not yet learned to form coherent sentences.

In [34]:
start = time.time()
states = None
next_char = tf.constant(['ROMEO:'])
result = [next_char]

for n in range(1000):
  next_char, states = one_step_model.generate_one_step(next_char, states=states)
  result.append(next_char)

result = tf.strings.join(result)
end = time.time()
print(result[0].numpy().decode('utf-8'), '\n\n' + '_'*80)
print('\nRun time:', end - start)

ROMEO:
As lett thou art the master!

COMINIUS:
I has with him;
Which fells, will I straight thou since,
No fair of heavent spariting.
He's, there are mine, or we she that the king's rest; and my
loss, you will rance they will be perpeded
and such a
force better brew ittes and consul, inspeal'd.

QUEEN MARGARET:
Nay in more than here long of this wife were nor man.

QUEEN ELIZABETH:
Even unto Richmond: I came to my king?

KING EDWIA that we end reigns of done.

QUEEN MARGARET:
Grum me, is heaven us.

CATESBY:
You are on; 'hen.

MERCUTIO:
Now, loving with a pitch here!

MAren Marchuman:
Some they covert the king of soltier swarns,
They diddn pepsounk-dights you command?

MISTRESS OF YORK:
How hast the ten this confuse man, best more son straight sir.

ROMEO:
Yet har not tood my lord; the corrow cornect me pass'd.
Mark cause 'ncounte Prist; now sir, they speak,
better draw our freely to to be pault
Is night to Myrancif and to helf till answer?

QUEEN EFIZALE:
More toad!

COMILINIUS:
Hard!

The easiest thing you can do to improve the results is to train it for longer (try EPOCHS = 30).

You can also experiment with a different start string, try adding another RNN layer to improve the model's accuracy, or adjust the temperature parameter to generate more or less random predictions.

If you want the model to generate text faster the easiest thing you can do is batch the text generation. In the example below the model generates 5 outputs in about the same time it took to generate 1 above.

In [35]:
start = time.time()
states = None
next_char = tf.constant(['ROMEO:', 'ROMEO:', 'ROMEO:', 'ROMEO:', 'ROMEO:'])
result = [next_char]

for n in range(1000):
  next_char, states = one_step_model.generate_one_step(next_char, states=states)
  result.append(next_char)

result = tf.strings.join(result)
end = time.time()
print(result, '\n\n' + '_'*80)
print('\nRun time:', end - start)

tf.Tensor(
[b"ROMEO:\nWhat suns and use,\nAnd cannot with holy mine everys of their felt.\n\nHENRY BOLINGBBROKE:\nConserve peace, bearing, taloug's name, of sound,\nHe hopes made the people yet of content leaves\nthey exinciss not bount. Have I bid the wext\nTo make the mistress than he mark warths their tombless;\nBut raily dines. Aufidius,\nAnd what it imps'd me in cheeks not so.\n\nBIONTES:\nO, it not speak?\nWhy, so hath but approach whip the city;\nAnd the ends of Montagues\nOf all yourself men that home. I stand theee in this night?\nWell as suinted, Roman! no, no?\n\nMARCIUS:\nHe'll not great to the court?\n\nThird ConstBRAKE:\nTherefore! his loddly sir, and the noble--dod\nOf revive the king contemn'd fath.\n\nCAPULET:\n\nFirst Spritten:\nAfter him, thbook help this.\n\nFLORIZEL:\nHere not cas off stays charge to stual; and fraped by mask my majaginate-fraims?\nWhorsess, mercy that victory for the eatter-fatch.\nProve too learn. If you so?\n\nGONZALO:\nAy thoushal'ce, my life t

<h2>Export the generator</h2>
This single-step model can easily be saved and restored, allowing you to use it anywhere a tf.saved_model is accepted.


In [36]:
tf.saved_model.save(one_step_model, 'one_step')
one_step_reloaded = tf.saved_model.load('one_step')





INFO:tensorflow:Assets written to: one_step/assets


INFO:tensorflow:Assets written to: one_step/assets


In [37]:
states = None
next_char = tf.constant(['ROMEO:'])
result = [next_char]

for n in range(100):
  next_char, states = one_step_reloaded.generate_one_step(next_char, states=states)
  result.append(next_char)

print(tf.strings.join(result)[0].numpy().decode("utf-8"))

ROMEO:
My love, then he didest me of the guard.
And were in grate to my orney
That Absolver men in roye bu
