# Shakespeare Text Generation

Let's create Shakespearean text by training a character RNN

In [32]:
import tensorflow as tf
import numpy as np
import os
import time

In [2]:
path_to_file = tf.keras.utils.get_file('shakespeare.txt', 'https://storage.googleapis.com/download.tensorflow.org/data/shakespeare.txt')

Downloading data from https://storage.googleapis.com/download.tensorflow.org/data/shakespeare.txt


In [3]:
# Read and decode
text = open(path_to_file, 'rb').read().decode(encoding='utf-8')
print(f'Length of text: {len(text):,} characters')

Length of text: 1,115,394 characters


In [4]:
# View first 250 characters
print(text[:250])

First Citizen:
Before we proceed any further, hear me speak.

All:
Speak, speak.

First Citizen:
You are all resolved rather to die than to famish?

All:
Resolved. resolved.

First Citizen:
First, you know Caius Marcius is chief enemy to the people.



In [5]:
# The unique characters in the file
vocab = sorted(set(text))
print(f'{len(vocab)} unique characters')

65 unique characters


## Preprocess Text

First, we need to convert the strings to numbers. We can use `tf.keras.layers.StringLookup` but first need to split the text into tokens. 

In [6]:
example_texts = ['hello', 'world']
chars = tf.strings.unicode_split(example_texts, input_encoding='UTF-8')
print(chars)

ids_from_chars = tf.keras.layers.StringLookup(
    vocabulary=list(vocab),
    mask_token=None
)

example_ids = ids_from_chars(chars)
# Here we see that 'l' is mapped to 51
print(example_ids)

# We need to set invert=True so that we can get human
# readable text back. We use get_vocabulary() to esnure
# [UNK] tokens are set the same way
chars_from_ids = tf.keras.layers.StringLookup(
    vocabulary=ids_from_chars.get_vocabulary(),
    invert=True,
    mask_token=None
)

reproduced_chars = chars_from_ids(example_ids)
print(reproduced_chars)
reproduced_text = tf.strings.reduce_join(chars, axis=-1).numpy()
print(reproduced_text)

def text_from_ids(ids):
    return tf.strings.reduce_join(chars_from_ids(ids), axis=-1)

<tf.RaggedTensor [[b'h', b'e', b'l', b'l', b'o'], [b'w', b'o', b'r', b'l', b'd']]>
<tf.RaggedTensor [[47, 44, 51, 51, 54], [62, 54, 57, 51, 43]]>
<tf.RaggedTensor [[b'h', b'e', b'l', b'l', b'o'], [b'w', b'o', b'r', b'l', b'd']]>
[b'hello' b'world']


We use a character RNN (rather than word) since there are far less characters to predict than possible words. Each input is a sequence of characters and the prediction is the next character.

## Create Train Examples and Targets

We make train sequences with `seq_length` elements. The target sequences are the same length but shifted one char to the right. So if `hello` is our text, the input is `hell` and the target sequence `ello`. 

Let's make text vector into character indicies.

In [7]:
all_ids = ids_from_chars(tf.strings.unicode_split(text, 'UTF-8'))
all_ids

<tf.Tensor: shape=(1115394,), dtype=int64, numpy=array([19, 48, 57, ..., 46,  9,  1])>

In [8]:
ids_dataset = tf.data.Dataset.from_tensor_slices(all_ids)
# Print first elements in ids_dataset
for ids in ids_dataset.take(10):
    print(chars_from_ids(ids).numpy().decode('utf-8'))

F
i
r
s
t
 
C
i
t
i


In [9]:
seq_length = 100
examples_per_epoch = len(text) // (seq_length+1)

In [10]:
# Use .batch() to convert individual characters to sequences of
# desired size
sequences = ids_dataset.batch(seq_length+1, drop_remainder=True)
# Print the first sequence (that includes input + target)
for seq in sequences.take(1):
    print(chars_from_ids(seq))

tf.Tensor(
[b'F' b'i' b'r' b's' b't' b' ' b'C' b'i' b't' b'i' b'z' b'e' b'n' b':'
 b'\n' b'B' b'e' b'f' b'o' b'r' b'e' b' ' b'w' b'e' b' ' b'p' b'r' b'o'
 b'c' b'e' b'e' b'd' b' ' b'a' b'n' b'y' b' ' b'f' b'u' b'r' b't' b'h'
 b'e' b'r' b',' b' ' b'h' b'e' b'a' b'r' b' ' b'm' b'e' b' ' b's' b'p'
 b'e' b'a' b'k' b'.' b'\n' b'\n' b'A' b'l' b'l' b':' b'\n' b'S' b'p' b'e'
 b'a' b'k' b',' b' ' b's' b'p' b'e' b'a' b'k' b'.' b'\n' b'\n' b'F' b'i'
 b'r' b's' b't' b' ' b'C' b'i' b't' b'i' b'z' b'e' b'n' b':' b'\n' b'Y'
 b'o' b'u' b' '], shape=(101,), dtype=string)


In [11]:
# Easier to see if join tokens back into strings
for seq in sequences.take(5):
    print(text_from_ids(seq).numpy())

b'First Citizen:\nBefore we proceed any further, hear me speak.\n\nAll:\nSpeak, speak.\n\nFirst Citizen:\nYou '
b'are all resolved rather to die than to famish?\n\nAll:\nResolved. resolved.\n\nFirst Citizen:\nFirst, you k'
b"now Caius Marcius is chief enemy to the people.\n\nAll:\nWe know't, we know't.\n\nFirst Citizen:\nLet us ki"
b"ll him, and we'll have corn at our own price.\nIs't a verdict?\n\nAll:\nNo more talking on't; let it be d"
b'one: away, away!\n\nSecond Citizen:\nOne word, good citizens.\n\nFirst Citizen:\nWe are accounted poor citi'


Not sure why it just includes the sequence once and then moves on to the next sentence, seems like we could massively increase our training data by giving it more samples that way.

Also not sure why the target is also a sequence of the exact same length, why is it not a sequence with one character?

In [12]:
def split_input_target(sequence):
    input_text = sequence[:-1]
    target_text = sequence[1:]
    return input_text, target_text

In [13]:
split_input_target(list('Hello World!'))

(['H', 'e', 'l', 'l', 'o', ' ', 'W', 'o', 'r', 'l', 'd'],
 ['e', 'l', 'l', 'o', ' ', 'W', 'o', 'r', 'l', 'd', '!'])

In [14]:
dataset = sequences.map(split_input_target)

In [15]:
for input_example, target_example in dataset.take(3):
    print('Input :', text_from_ids(input_example).numpy())
    print('Target:', text_from_ids(target_example).numpy())
    print()

Input : b'First Citizen:\nBefore we proceed any further, hear me speak.\n\nAll:\nSpeak, speak.\n\nFirst Citizen:\nYou'
Target: b'irst Citizen:\nBefore we proceed any further, hear me speak.\n\nAll:\nSpeak, speak.\n\nFirst Citizen:\nYou '

Input : b'are all resolved rather to die than to famish?\n\nAll:\nResolved. resolved.\n\nFirst Citizen:\nFirst, you '
Target: b're all resolved rather to die than to famish?\n\nAll:\nResolved. resolved.\n\nFirst Citizen:\nFirst, you k'

Input : b"now Caius Marcius is chief enemy to the people.\n\nAll:\nWe know't, we know't.\n\nFirst Citizen:\nLet us k"
Target: b"ow Caius Marcius is chief enemy to the people.\n\nAll:\nWe know't, we know't.\n\nFirst Citizen:\nLet us ki"



Not sure why they are treating this like a character RNN... I guess the point is that it is predicting characters and not words. But we are grouping loads of characters together in both the input and target and that way it will learn how to use English.

Note also that we feed the data in as a sequence into an LSTM layer that is designed to work explicity with sequences. Since the input and output sequences are the same length, each input char gets mapped to the next one. We don't have to manually create the dataset like 100 input and 1 output but if we have 100 input and 100 output, we get 100 steps of learning when we feed each sequence in.

## Create Training Batches

Now we need to shuffle and pack the data into batches. 

In [16]:
BATCH_SIZE = 64

# TF Data is designed to work with infinite seqs, so doesn't shuffle
# entire seq in memory. Instead, it has a buffer in which it shuffles
# elements
BUFFER_SIZE = 10000

dataset = (
    dataset
    .shuffle(BUFFER_SIZE)
    .batch(BATCH_SIZE, drop_remainder=True)
    .prefetch(tf.data.experimental.AUTOTUNE)
)

dataset

<PrefetchDataset shapes: ((64, 100), (64, 100)), types: (tf.int64, tf.int64)>

Note that even though it's easier to work with just characters, we still apply an embedding so that the chars can learn some relationships to each other. 

## Let's Build the Model

In [17]:
from tensorflow.keras.layers import Embedding, GRU, Dense

In [18]:
vocab_size = len(vocab)
embedding_dim = 256
rnn_units = 1024

class MyModel(tf.keras.Model):
    
    def __init__(self, vocab_size, embedding_dim, rnn_units):
        super().__init__(self)
        self.embedding = Embedding(vocab_size, embedding_dim)
        self.gru = GRU(rnn_units, return_sequences=True, return_state=True)
        self.dense = Dense(vocab_size)

    def call(self, inputs, states=None, return_state=False, training=False):
        x = inputs
        x = self.embedding(x, training=training)
        if states is None:
            states = self.gru.get_initial_state(x)
        x, states = self.gru(x, initial_state=states, training=training)
        x = self.dense(x, training=training)

        if return_state:
            return x, states
        else:
            return x

In [19]:
model = MyModel(
    # Make sure vocab size matches the StringLookup layer
    vocab_size=len(ids_from_chars.get_vocabulary()),
    embedding_dim=embedding_dim,
    rnn_units=rnn_units
)

## Test Model

In [20]:
for input_example_batch, target_example_batch in dataset.take(1):
    example_batch_predictions = model(input_example_batch)
    print(example_batch_predictions.shape, '# (batch_size, sequence_length, vocab_size)')

(64, 100, 66) # (batch_size, sequence_length, vocab_size)


In [21]:
# We lose access to model.summary() output shape when we subclass model
model.summary()

Model: "my_model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 embedding (Embedding)       multiple                  16896     
                                                                 
 gru (GRU)                   multiple                  3938304   
                                                                 
 dense (Dense)               multiple                  67650     
                                                                 
Total params: 4,022,850
Trainable params: 4,022,850
Non-trainable params: 0
_________________________________________________________________


**IMPORTANT** to get the actual predictions from the model you must *sample from the output distribution* to get the char indicies. If you take argmax, the model can get stuck in an infinite loop. 

In [22]:
sampled_indices = tf.random.categorical(example_batch_predictions[0], num_samples=1)
sampled_indices = tf.squeeze(sampled_indices, axis=-1).numpy()
sampled_indices

array([28, 20, 64, 27, 53, 12, 25, 16,  4,  7,  8, 20, 31, 30, 62, 46,  1,
       63, 44, 44, 23, 23, 65, 10, 46, 24, 56, 40,  3, 39,  6, 13, 34, 32,
       28, 34, 10,  2, 41, 35, 58,  7, 15, 23, 61, 46, 32, 33,  8, 20, 32,
       60, 56, 31, 53,  6,  4, 28, 54, 58, 61, 53, 63, 40, 59, 21, 41, 14,
       42, 32, 34, 38, 27, 65, 15, 38, 29, 31, 34, 34, 23, 41, 50, 11, 54,
       32, 14,  1, 25, 50,  2,  2, 64,  0,  4, 57, 55, 44, 54, 51])

This is a bit weird since we will get different values each time we run it. However, we pass in the logits (`example_batch_predictions`) which represent the probability of each character. The `tf.random.categorical` function is sampling from that distribution so those values with a higher probability will be chosen much more often. I guess randomness is good since there is not always one character that must follow the next. 

Note also that this model is currently untrained and so is not going to produce anything coherent. 

In [23]:
print('Input:\n', text_from_ids(input_example_batch[0]).numpy())
print()
print('Next char predictions:\n', text_from_ids(sampled_indices).numpy())

Input:
 b"you do me wrong.\n\nROMEO:\nTut, I have lost myself; I am not here;\nThis is not Romeo, he's some other "

Next char predictions:
 b"OGyNn;LC$,-GRQwg\nxeeJJz3gKqa!Z'?USOU3 bVs,BJvgST-GSuqRn'$OosvnxatHbAcSUYNzBYPRUUJbk:oSA\nLk  y[UNK]$rpeol"


# Time to Train

Now we just have a classification problem where each class is a character.

In [24]:
loss = tf.losses.SparseCategoricalCrossentropy(from_logits=True)

example_batch_mean_loss = loss(target_example_batch, example_batch_predictions)
print('Pred shape: ', example_batch_predictions.shape, ' # (batch_size, sequence_length, vocab_size)')
print('Mean loss:  ', example_batch_mean_loss)

Pred shape:  (64, 100, 66)  # (batch_size, sequence_length, vocab_size)
Mean loss:   tf.Tensor(4.1897755, shape=(), dtype=float32)


We expect an untrained model to be unsure of itself. So, the output of all the logits should be similar magnitude. We can check this is true by comparing the exp of the mean loss to the vocab size. 

In [25]:
tf.exp(example_batch_mean_loss).numpy(), vocab_size

(66.007965, 65)

In [26]:
model.compile(optimizer='adam', loss=loss)

checkpoint_dir = './text_gen_train_checkpoints'
# Name of checkpoint files
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt_{epoch}')

checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_prefix,
    save_weights_only=True
)

In [27]:
EPOCHS = 20
history = model.fit(dataset, epochs=EPOCHS, callbacks=[checkpoint_callback])

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


## Generate Text

To generate text, run the model in a loop, and keep track of the internal state as you execute. 

Each time you run the model, pass in some text + internal state. The model returns next char prediction and the new state. Pass the prediction + state back in and you will continue generating text. 

Let's make a single step prediction. 

In [28]:
class OneStep(tf.keras.Model):
    
    def __init__(
        self, 
        model, 
        chars_from_ids, 
        ids_from_chars, 
        temperature=1.0
        ):
        super().__init__()
        self.temperature = temperature
        self.model = model
        self.chars_from_ids = chars_from_ids
        self.ids_from_chars = ids_from_chars

        # Create mask to avoid '[UNK]' being predicted
        skip_ids = self.ids_from_chars(['[UNK]'])[:, None]
        sparse_mask = tf.SparseTensor(
            # Put -inf at each bad index
            values = [-float('inf')] * len(skip_ids),
            indices=skip_ids,
            # Match shape to vocab
            dense_shape=[len(ids_from_chars.get_vocabulary())]
        )
        self.prediction_mask = tf.sparse.to_dense(sparse_mask)

    @tf.function
    def generate_one_step(self, inputs, states=None):
        # Convert strings to token IDs
        input_chars = tf.strings.unicode_split(inputs, 'UTF-8')
        input_ids = self.ids_from_chars(input_chars).to_tensor()

        # Run model
        # predicted_logits.shape is [batch, char, next_char_logits]
        predicted_logits, states = self.model(inputs=input_ids, 
                                                states=states,
                                                return_state=True)
        
        # Only use last prediction
        predicted_logits = predicted_logits[:, -1, :]
        # Control randomness of predictions
        predicted_logits = predicted_logits / self.temperature
        # Apply prediction mask and prevent '[UNK]' from being generated
        # Having a logit of -inf means this will never be predicted
        predicted_logits = predicted_logits + self.prediction_mask

        # Sample output logits to generate predicted token IDs
        predicted_ids = tf.random.categorical(predicted_logits, num_samples=1)
        predicted_ids = tf.squeeze(predicted_ids, axis=-1)

        # Convert from token ids to chars
        predicted_chars = self.chars_from_ids(predicted_ids)

        # Return chars and model state
        return predicted_chars, states


In [29]:
one_step_model = OneStep(model, chars_from_ids, ids_from_chars)

Now just run this model in a loop to generate text!! Very cool!

Model already knows about capitalization, can make paragraphs and can also imitate Shakespeare-esque writing style. But (due mainly to small number of training epochs) it cannot form coherent sentences.

In [30]:
start = time.time()
states = None
next_char = tf.constant(['LADY MACBETH:'])
result = [next_char]

for n in range(1000):
    next_char, states = one_step_model.generate_one_step(next_char, states=states)
    result.append(next_char)
    if n < 7:
        print(f'Result after {n}-th iteration')
        print(tf.strings.join(result)[0].numpy().decode('utf-8'))
        print()
        result_7 = tf.strings.join(result)

result = tf.strings.join(result)
end = time.time()
print(result[0].numpy().decode('utf-8'), '\n\n' + '_'*80)
print(f'\nRun time: {end - start:.2f}s')

Result after 0-th iteration
LADY MACBETH:


Result after 1-th iteration
LADY MACBETH:
B

Result after 2-th iteration
LADY MACBETH:
Be

Result after 3-th iteration
LADY MACBETH:
Be 

Result after 4-th iteration
LADY MACBETH:
Be h

Result after 5-th iteration
LADY MACBETH:
Be he

Result after 6-th iteration
LADY MACBETH:
Be he 

LADY MACBETH:
Be he the tyrann, ay, that all the banns thou canst grow you our ten this
there? the Earl of Wrench, that now is of
make othereign and to pity; it hath born to me;
Near yet some greeting to his disight, to flatter'd then
for sheep, ridges of golden cross, and in prison with the tape.

Pedant:
O maid, my lord; 'tis your sword like one set down.

POMPEY:
Three, poor hearing and conquest of it! And to the cord
Would lay in thee, poison, elecoments,
My feating work, I, and myself are wears,
Are for whose cheeks in death.

PAULINA:
Not so,
And therefore I'll become you measure mercy.

QUEEN ELIZABETH:
Would nother, grow together, Kate: but green!

All:
L

In [31]:
result_7

<tf.Tensor: shape=(1,), dtype=string, numpy=array([b'LADY MACBETH:\nBe he '], dtype=object)>

**IMPROVE PERFORMANCE**
Simplest way to improve the model is to train longer. Then can add in more layers and also play with `temperature` to see how random the predictions are.

**IMPROVE TEXT GENERATION SPEED**
Simplest way is to batch it. In other words, give it more than one starting character. It takes the same amount of time to produce 5 outputs as it does to create 1. 

In [33]:
start = time.time()
states = None
# Pass in multiple inputs
next_char = tf.constant(['LADY MACBETH:', 
                         'KING LEAR:', 
                         'Will you marry me?', 
                         'HAMLET:', 
                         'To be or not to be'])
result = [next_char]

for n in range(1000):
    next_char, states = one_step_model.generate_one_step(next_char, states=states)
    result.append(next_char)

result = tf.strings.join(result)
end = time.time()
# Just print out result for ease
print(result, '\n\n' + '_'*80)
print(f'\nRun time: {end - start:.2f}s')

tf.Tensor(
[b"LADY MACBETH:-vie; this news before the prisoner\nstorm to make my power true service burns to help to tarn far\nissued.\n\nGRUMIO:\nAy, my notime Edward for myself: some touch poor grants\nWhich cannot choose but help thereof; friar.\n\nBAPTISTA:\nI am content.\nHis train-learness part these he now,\nBy thus twenty thousand crows'd: besides a cess\nThat thought to see their frowns made balladempt\nTo reconcile thee against God was burn'd\nYour trust my Richard mark'd by thy death.\n\nCATESBY:\nI'll slay the people against their stomach;\nAnd with a piech of small begin poise doth eye\nSince his true king's falch, here since we doth great\nHis throal of impartius.\n\nCOMINIUS:\nTush!\nI will learn to fight;\nAnd all the master is the wind when take\nThe wind shall make an one attend on Padua\ncan stir a little burnt in arms: shall we go forth\nAs monarding thus and warlike serviced,\nSo safely in large mazed; for Polixenes\nIm presentations of return, from wind,\nSleeping

## Export Generator



In [36]:
!ls

one_step_text_gen_model  sample_data  text_gen_train_checkpoints


In [37]:
tf.saved_model.save(one_step_model, 'one_step_text_gen_model')
one_step_reloaded = tf.saved_model.load('one_step_text_gen_model')





INFO:tensorflow:Assets written to: one_step_text_gen_model/assets


INFO:tensorflow:Assets written to: one_step_text_gen_model/assets


In [38]:
states = None
# Pass in multiple inputs
next_char = tf.constant(['NEO:'])
result = [next_char]

for n in range(100):
    next_char, states = one_step_reloaded.generate_one_step(next_char, states=states)
    result.append(next_char)

result = tf.strings.join(result)
# Just print out result for ease
print(result[0].numpy().decode('utf-8'), '\n\n' + '_'*80)

NEO:
Brother, give the cause, then anon day's maid?
And sometials laughing me; it is as business
gone ac 

________________________________________________________________________________


## Customized Training

The above technique uses *techer forcing* and prevents a bad prediction from being fed back into the model. But now we will let the model learn from its mistakes. 

This is an example of *curriculum learning*.

[Customize what happens in Model.fit](https://www.tensorflow.org/guide/keras/customizing_what_happens_in_fit) official TF tutorial.

In [40]:
class CustomTraining(MyModel):
    @tf.function
    def train_step(self, inputs):
        inputs, labels = inputs
        with tf.GradientTape() as tape:
            predictions = self(inputs, training=True)
            loss = self.loss(labels, predictions)
        grads = tape.gradient(loss, self.trainable_variables)
        self.optimizer.apply_gradients(zip(grads, self.trainable_variables))
        self.compiled_metrics.update_state(labels, predictions)

        return {'loss': loss}

In [41]:
model = CustomTraining(
    vocab_size=len(ids_from_chars.get_vocabulary()),
    embedding_dim=embedding_dim,
    rnn_units=rnn_units
)
model.compile(optimizer='adam',
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True))

model.fit(dataset, epochs=2)

Epoch 1/2
Epoch 2/2


<keras.callbacks.History at 0x7fd795593f10>

Finally, let's look at a full blown custom training loop (just like you would have to write in PyTorch from scratch).

In [42]:
EPOCHS = 5
mean = tf.metrics.Mean()

for epoch in range(EPOCHS):
    start = time.time()

    mean.reset_states()
    for (batch_n, (inp, target)) in enumerate(dataset):
        logs = model.train_step([inp, target])
        mean.update_state(logs['loss'])

        if batch_n % 50 == 0:
            template = f"Epoch {epoch+1} Batch {batch_n} Loss {logs['loss']:.4f}"
            print(template)

    # Save model checkpoint every 5 epochs
    if (epoch + 1) % 5 == 0:
        model.save_weights(checkpoint_prefix.format(epoch=epoch))

    print()
    print(f'Epoch {epoch+1} Loss: {mean.result().numpy():.4f}')
    print(f'Time taken for 1 epoch {time.time() - start:.2f} sec')
    print("_"*80)
    
# Save weights at end of training
model.save_weights(checkpoint_prefix.format(epoch=epoch))

Epoch 1 Batch 0 Loss 1.8444
Epoch 1 Batch 50 Loss 1.7242
Epoch 1 Batch 100 Loss 1.6455
Epoch 1 Batch 150 Loss 1.6016

Epoch 1 Loss: 1.6965
Time taken for 1 epoch 26.23 sec
________________________________________________________________________________
Epoch 2 Batch 0 Loss 1.5920
Epoch 2 Batch 50 Loss 1.5355
Epoch 2 Batch 100 Loss 1.4879
Epoch 2 Batch 150 Loss 1.4967

Epoch 2 Loss: 1.5406
Time taken for 1 epoch 22.91 sec
________________________________________________________________________________
Epoch 3 Batch 0 Loss 1.4967
Epoch 3 Batch 50 Loss 1.4656
Epoch 3 Batch 100 Loss 1.4318
Epoch 3 Batch 150 Loss 1.3821

Epoch 3 Loss: 1.4440
Time taken for 1 epoch 23.84 sec
________________________________________________________________________________
Epoch 4 Batch 0 Loss 1.3807
Epoch 4 Batch 50 Loss 1.3958
Epoch 4 Batch 100 Loss 1.3817
Epoch 4 Batch 150 Loss 1.3342

Epoch 4 Loss: 1.3780
Time taken for 1 epoch 23.29 sec
_____________________________________________________________________