## Generating (Shakespearean) Text with a Transformer (Decoder)

In [None]:
# Suppress tensorflow warnings
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'

In [None]:
import numpy as np
import tensorflow as tf

## Create a GPT Like Decoder Block

In [None]:
class GPTDecoderBlock(tf.keras.layers.Layer):
    """
    This class implements the "Decoder" block from the "Attention is all You Need" paper,
    but is doesn't include the multi-head (cross) attention because there is no encoder.
    """

    def __init__(self, num_heads, embed_size, **kwargs):
        super().__init__(**kwargs)

        self.num_heads = num_heads
        self.embed_size = embed_size

        self.masked_attn_layer = tf.keras.layers.MultiHeadAttention(
            num_heads=num_heads,
            key_dim=embed_size // self.num_heads, # Correct ?? See section 3.2.2 of "attention is all you need" paper
        )

        self.norm1_layer = tf.keras.layers.LayerNormalization()

        self.dense1_layer = tf.keras.layers.Dense(
            units=4*embed_size,
            activation="relu") # See section 3.3 of "attention is all you need" paper
        # No activation function on second dense layer
        self.dense2_layer = tf.keras.layers.Dense(units=embed_size)

        self.norm2_layer = tf.keras.layers.LayerNormalization()



    def call(self, inputs):

        # Masked Multi-Head (Self)-Attention block
        skip = inputs
        inputs = self.masked_attn_layer(
            query=inputs,
            value=inputs,
            use_causal_mask=True)
        inputs = self.norm1_layer(
            tf.keras.layers.Add()([inputs, skip]))

        # Feedforward block
        skip = inputs
        inputs = self.dense1_layer(inputs)
        inputs = self.dense2_layer(inputs)

        inputs = self.norm2_layer(
            tf.keras.layers.Add()([inputs, skip]))

        return inputs


In [None]:
## Test the block, e.g. by testing the output for an input
## of shape (2, 10, 64)
block = GPTDecoderBlock(num_heads=4, embed_size=64)
X = tf.constant(0., shape=(2, 10, 64))
block(X).shape

## Prepare the Data for Shakespeare Text Generation

In [None]:
SEQ_LENGTH = 100

In [None]:
# Fetch the data, same as in book
shakespeare_url = "https://homl.info/shakespeare"  # shortcut URL
filepath = tf.keras.utils.get_file("shakespeare.txt", shakespeare_url)
with open(filepath) as f:
    shakespeare_text = f.read() # shakespeare_text is now a string

In [None]:
# Split on characters, keep punctuation as well as upper- and lowercase letters
text_vec_layer = tf.keras.layers.TextVectorization(
  split="character", standardize=None) # also keep upper case etc.
# shakespeare_text is a string and adapt expects a dataset or list
text_vec_layer.adapt([shakespeare_text])
encoded = text_vec_layer([shakespeare_text])[0]

In [None]:
print(text_vec_layer.get_vocabulary())
print(len(text_vec_layer.get_vocabulary()))

In [None]:
encoded -= 2  # drop tokens 0 (pad) and 1 (unknown), which we will not use
              # use broadcasting to subtract 2 from all values
n_tokens = text_vec_layer.vocabulary_size() - 2  # number of distinct chars = 65
dataset_size = len(encoded)  # total number of chars = 1,115,394

In [None]:
# create sequence to sequence dataset, same as in book
def to_dataset(sequence, length, shuffle=False, seed=None, batch_size=32):
    ds = tf.data.Dataset.from_tensor_slices(sequence)
    ds = ds.window(length + 1, shift=1, drop_remainder=True)
    ds = ds.flat_map(lambda window_ds: window_ds.batch(length + 1))
    if shuffle:
        ds = ds.shuffle(100_000, seed=seed)
    ds = ds.batch(batch_size)
    return ds.map(lambda window: (window[:, :-1], window[:, 1:])).prefetch(1)

In [None]:
# Create training, validation and test data set. Same as in book.
tf.random.set_seed(42)
train_set = to_dataset(encoded[:1_000_000], length=SEQ_LENGTH, shuffle=True,
                       seed=42)
valid_set = to_dataset(encoded[1_000_000:1_060_000], length=SEQ_LENGTH)
test_set = to_dataset(encoded[1_060_000:], length=SEQ_LENGTH)

## Create the model.

In [None]:
class GPTModel(tf.keras.Model):

    def __init__(self, n_tokens, embed_size, num_blocks, num_heads, max_seq_length, **kwargs):

        super().__init__(**kwargs)

        self.num_heads = num_heads
        self.max_seq_length = max_seq_length

        # Layers
        self.embed_layer = tf.keras.layers.Embedding(
            input_dim=n_tokens,
            output_dim=embed_size,
            name='embedding')
        self.pos_embed_layer = tf.keras.layers.Embedding(
            input_dim=max_seq_length,
            output_dim=embed_size,
            name='positional_embedding')
        #self.add_layer = tf.keras.layers.Add()
        self.decoder_blocks = [GPTDecoderBlock(
            num_heads=num_heads,
            embed_size=embed_size,
            name='GPTBlock' + str(i)) for i in range(num_blocks)]
        self.dense_layer = tf.keras.layers.Dense(
            units=n_tokens,
            activation='softmax',
            name='output')

    def call(self, inputs):

        embeddings = self.embed_layer(inputs)

        pos_embeddings = self.pos_embed_layer(tf.range(self.max_seq_length))


        embeddings = embeddings + pos_embeddings # Rely on broadcasting

        for decoder_block in self.decoder_blocks:
            embeddings = decoder_block(embeddings)

        output = self.dense_layer(embeddings)

        return output

## Instantiate a Model and Train it

In [None]:
EMBED_SIZE = 32
NUM_HEADS = 4
NUM_BLOCKS = 2

In [None]:
tf.keras.backend.clear_session()
model = GPTModel(n_tokens=n_tokens,
                  embed_size=EMBED_SIZE,
                  num_blocks=NUM_BLOCKS,
                  num_heads=NUM_HEADS,
                  max_seq_length=SEQ_LENGTH
                 )

In [None]:
for X, Y in train_set.take(1):
    print(X.shape)
    print(model(X).shape)

In [None]:
model.summary()

In [None]:
model.compile(
    optimizer='nadam',
    loss='sparse_categorical_crossentropy',
    metrics=["accuracy"]
)

In [None]:
# 10 minutes per epoch on a (fast) GPU
early_stopping = tf.keras.callbacks.EarlyStopping(
    monitor='val_loss',
    patience=2,
    restore_best_weights=True
)
model.fit(train_set, validation_data=valid_set, epochs=20, callbacks=[early_stopping])

## Generating Text

In [None]:
# Create a model that includes the textvectorization layer (same as in book)
shakespeare_model = tf.keras.Sequential([
  text_vec_layer,
  tf.keras.layers.Lambda(lambda X: X - 2),  # no <PAD> or <UNK> tokens
  model
])

Check the shape of the model when trying to make predictions for one sentence.

Note: you can only give the model sentences that have length exactly equal to the length that was used when training the model.
This is because we always used this length when computing the positional embeddings.

In [None]:
y_proba = shakespeare_model.predict(["HAMLET" + ' '*(SEQ_LENGTH-6)])
y_proba.shape

Adapt the `next_char` method from the book so that it works for the transformer model we trained.

In [None]:
def next_char(text, temperature=1):
    # shakepeare_model is the model we trained earlier
    y_proba = shakespeare_model.predict([text + ' ' * (SEQ_LENGTH-len(text))])[0, len(text) - 1:len(text)]
    rescaled_logits = tf.math.log(y_proba) / temperature
    char_id = tf.random.categorical(rescaled_logits, num_samples=1)[0, 0]
    # text_vec_layer is the layer we adapted earlier
    return text_vec_layer.get_vocabulary()[char_id + 2]

In [None]:
next_char("to be or not to be ")

Adapt the `extend_text` method from the book.

We should be able to generate texts of arbitrary lengths.  Make sure to try your method when predicting at least one hundred characters.

In [None]:
def extend_text(text, n_chars=50, temperature=1):
    for _ in range(n_chars):
        text += next_char(text[-SEQ_LENGTH:], temperature)
    return text

In [None]:
extend_text("HAMLET:", n_chars=100, temperature=0.2)