In [1]:
import numpy as np
import tensorflow as tf

In [16]:
def split_input_target(text_chunk, split_index=1):
    """Splits text into two chunks representing the input to be fed 
    into the NN, and it's target label.
    
    Example
    -------
    >>> split_input_target("Python")
    "Pytho", "ython"
    """
    input_text = text_chunk[:-split_index]
    target_text = text_chunk[split_index:]
    
    return input_text, target_text

## Purpose
Create text using a character-based recurrent neural network. We will use the novel Great Expectations by Charles Dickens. We will train the network on this text so that, if we give it a character sequence such as thousan, it will produce the next character in the sequence, d. This process can be continued, and longer sequences of text created by calling the model repeatedly on the evolving sequence.

In [2]:
text_url = 'https://www.gutenberg.org/files/1400/1400-0.txt' # Great Expectations by Charles Dickens
file_path = tf.keras.utils.get_file('1400-0.txt', text_url) # Downloads to cache if it isn't already there

Downloading data from https://www.gutenberg.org/files/1400/1400-0.txt


In [3]:
with open(file_path) as fp:
    text = fp.read()

print(f'Lenght of text: {len(text)} characters')

Lenght of text: 1013445 characters


The first 824 characters are not part of the book. They are notes and licencing information from Project Gutenberg and shouldn't be part of training so lets remove them

In [4]:
text = text[824:]

In [5]:
print(text[:300])

Chapter I

My father's family name being Pirrip, and my Christian name Philip, my
infant tongue could make of both names nothing longer or more explicit
than Pip. So, I called myself Pip, and came to be called Pip.

I give Pirrip as my father's family name, on the authority of his
tombstone and my s


Next, lets create a mapping from char to int so the characters can represented as integers

In [33]:
unique_chars = sorted(set(text)) # Gets distinct values
char_to_int = {char:i for i, char in enumerate(unique_chars)}
int_to_char = {v:k for k, v in char_to_int.items()}
index_to_char = np.array(unique_chars)

In [10]:
# Sample output
for (k, v), _ in zip(char_to_int.items(), range(10)):
    print(f"{repr(k):4s}: {v}")

'\n': 0
' ' : 1
'!' : 2
'$' : 3
'%' : 4
'&' : 5
"'" : 6
'(' : 7
')' : 8
'*' : 9


In [12]:
book_vector = np.array([char_to_int[char] for char in text])

# Sample mapping
print(f"{text[10:27]} ----> {book_vector[10:27]}")


My father's fami ----> [ 0 40 78  1 59 54 73 61 58 71  6 72  1 59 54 66 62]


In [13]:
# The maximum length sentence we want for a single input in characters
sequence_length = 100
examples_per_epoch = len(text) // sequence_length

In [14]:
char_dataset = tf.data.Dataset.from_tensor_slices(book_vector)

# Sanity check
for char in char_dataset.take(8):
    print(int_to_char[char.numpy()])

C
h
a
p
t
e
r
 


In [80]:
# Because we're adding 1 to the sequence in this function, the batch size is 101
sequences = char_dataset.batch(sequence_length + 1, drop_remainder=True)
dataset = sequences.map(split_input_target)

In [45]:
for input_example, target_example in dataset.take(1):
    print ('Input data: ', repr(''.join(index_to_char[input_example.numpy()]))) #101 characters
    print ('Target data:', repr(''.join(index_to_char[target_example.numpy()])))

Input data:  "Chapter I\n\nMy father's family name being Pirrip, and my Christian name Philip, my\ninfant tongue coul"
Target data: "hapter I\n\nMy father's family name being Pirrip, and my Christian name Philip, my\ninfant tongue could"


In [56]:
for i, (input_index, target_index) in enumerate(zip(input_example[:5], target_example[:5])):
    print(f"Step {i:4d}", end="")
    print(f" input: {input_index} ({repr(int_to_char[input_index.numpy()])})", end="")
    print(f" expected output: {target_index} ({repr(int_to_char[target_index.numpy()])})")

Step    0 input: 30 ('C') expected output: 61 ('h')
Step    1 input: 61 ('h') expected output: 54 ('a')
Step    2 input: 54 ('a') expected output: 69 ('p')
Step    3 input: 69 ('p') expected output: 73 ('t')
Step    4 input: 73 ('t') expected output: 58 ('e')


In [81]:
## Training set up

# How many characters in a batch
batch_size = 64

# The number of training steps taken in each epoch
steps_per_epoch = examples_per_epoch // batch

# TF data maintains a buffer in memory to shuffle data since it's designed
# to work with the possibility of endless data
buffer = 1000

dataset = dataset.shuffle(buffer).batch(batch, drop_remainder=True)

dataset

<BatchDataset shapes: ((64, 100), (64, 100)), types: (tf.int64, tf.int64)>

In [64]:
# The vocabulary length in characters
vocabulary_length = len(unique_chars)

# The embedding dimension 
embedding_dimension = 256

# The number of recurrent neural network units
num_rnn_units = 1024

In [62]:
if tf.test.is_gpu_available():
    recurrent_nn = tf.compat.v1.keras.layers.CuDNNGRU
    print("Using GPU")
else:
    import functools
    recurrent_nn = functools.partial(tf.keras.layers.GRU, recurrent_activation='sigmoid')
    print("GPU not found, falling back to CPU")

GPU not found, falling back to CPU


## The Model
![DNN Layout](images/dnn-layout.png "DNN Layout")

In [67]:
def build_model(vocab_size, embedding_dim, num_rnn_units, batch_size, recurrent_nn):
    model = tf.keras.Sequential([tf.keras.layers.Embedding(vocab_size, embedding_dim, batch_input_shape=[batch_size, None]),
                                 recurrent_nn(num_rnn_units, return_sequences=True, recurrent_initializer='glorot_uniform', stateful=True),
                                 tf.keras.layers.Dense(vocab_size)
                                ])
    
    return model

model = build_model(vocab_size=len(unique_chars), embedding_dim=embedding_dimension, 
                    num_rnn_units=num_rnn_units, batch_size=batch_size, recurrent_nn=recurrent_nn)

In [82]:
for batch_input_example, batch_target_example in dataset.take(1):
    batch_predictions_example = model(batch_input_example)
    print(batch_predictions_example.shape, "# (batch, sequence_length, vocabulary_length)")

(64, 100, 84) # (batch, sequence_length, vocabulary_length)


In [83]:
model.summary()

Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
embedding (Embedding)        (64, None, 256)           21504     
_________________________________________________________________
unified_gru (UnifiedGRU)     (64, None, 1024)          3938304   
_________________________________________________________________
dense (Dense)                (64, None, 84)            86100     
Total params: 4,045,908
Trainable params: 4,045,908
Non-trainable params: 0
_________________________________________________________________


84 unique chars * 256 embedding dimms = 21,504

1024 GRU units * 84 unique chars + 84 bias units = 86,100

In [84]:
def loss(labels, logits):
    return tf.keras.losses.sparse_categorical_crossentropy(labels, logits, from_logits=True)

In [90]:
example_batch_loss = loss(batch_target_example, batch_predictions_example)
print("Prediction shape: ", batch_predictions_example.shape, " # (batch_size, sequence_length, vocab_size)")
print("scalar_loss: ", batch_loss_example.numpy().mean())

Prediction shape:  (64, 100, 84)  # (batch_size, sequence_length, vocab_size)
scalar_loss:  10.125925


In [91]:
model.compile(optimizer='adam', loss=loss)

In [93]:
import os

# Directory where the checkpoints will be saved
checkpoint_dir = './training_checkpoints'
# Name of the checkpoint files
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")

checkpoint_callback=tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_prefix,
    save_weights_only=True)

In [94]:
EPOCHS=10

history = model.fit(dataset, epochs=EPOCHS, callbacks=[checkpoint_callback])

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


## Prediction
Because of the way the RNN state is passed from timestep to timestep, the model only accepts a fixed batch size once built.

To run the model with a different batch_size, we need to rebuild the model and restore the weights from the checkpoint.

In [104]:
foo = build_model(vocab_size=len(unique_chars), embedding_dim=embedding_dimension, 
                    num_rnn_units=num_rnn_units, batch_size=batch_size, recurrent_nn=recurrent_nn)
foo.load_weights(tf.train.latest_checkpoint(checkpoint_dir))

foo.build(tf.TensorShape([1, None]))

In [101]:
def generate_text(model, start_string):
    # Evaluation step (generating text using the learned model)

    # Number of characters to generate
    num_generate = 1000

    # Converting our start string to numbers (vectorizing)
    input_eval = [char_to_int[s] for s in start_string]
    input_eval = tf.expand_dims(input_eval, 0)

    # Empty string to store our results
    text_generated = []

    # Low temperatures results in more predictable text.
    # Higher temperatures results in more surprising text.
    # Experiment to find the best setting.
    temperature = 1.0

    # Here batch size == 1
    model.reset_states()
    for i in range(num_generate):
        predictions = model(input_eval)
        # remove the batch dimension
        predictions = tf.squeeze(predictions, 0)

        # using a categorical distribution to predict the word returned by the model
        predictions = predictions / temperature
        predicted_id = tf.random.categorical(predictions, num_samples=1)[-1,0].numpy()

        # We pass the predicted word as the next input to the model
        # along with the previous hidden state
        input_eval = tf.expand_dims([predicted_id], 0)

        text_generated.append(int_to_char[predicted_id])

    return (start_string + ''.join(text_generated))

In [105]:
generate_text(foo, 'my father')

ValueError: Tensor's shape (9, 64, 1024) is not compatible with supplied shape [9, 1, 1024]