In [171]:
import io
import re
import string
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt

from tensorflow import keras
from keras import layers
from keras import models
from keras.layers import TextVectorization


In [172]:
BATCH_SIZE = 128
SEQUENCE_LENGTH = 4
# Set the number of negative samples per positive context.
NUM_HEADS = 2
SEED = 42
FEED_FORWARD_DIM = 64

buffer_size = 5000
embedding_dim = 64

In [173]:
# Loading data
file_path = '../Homework10/bible.txt'

with open(file_path, "r") as f:
    text = f.read().splitlines()

for line in text[:50]:
    print(line) 

The First Book of Moses:  Called Genesis


1:1 In the beginning God created the heaven and the earth.

1:2 And the earth was without form, and void; and darkness was upon
the face of the deep. And the Spirit of God moved upon the face of the
waters.

1:3 And God said, Let there be light: and there was light.

1:4 And God saw the light, that it was good: and God divided the light
from the darkness.

1:5 And God called the light Day, and the darkness he called Night.
And the evening and the morning were the first day.

1:6 And God said, Let there be a firmament in the midst of the waters,
and let it divide the waters from the waters.

1:7 And God made the firmament, and divided the waters which were
under the firmament from the waters which were above the firmament:
and it was so.

1:8 And God called the firmament Heaven. And the evening and the
morning were the second day.

1:9 And God said, Let the waters under the heaven be gathered together
unto one place, and let the dry land appear

In [174]:
# Preparing Data for Model
vocabulary = dict((x, i) for i, x in enumerate(np.unique(list(text))))
vocab_size = len(vocabulary)
print(vocab_size, '\n')

72192 



## 2.1 The dataset, preprocessing & tokenizatio

In [175]:
# Create a custom standardization function to lowercase the text and
# remove special characters and punctuation.
def custom_standardization(input_data):
    lowercase = tf.strings.lower(input_data)
    # lowercase = lowercase.split()
    return tf.strings.regex_replace(lowercase,
                                    '[%s]' % re.escape(string.punctuation), '')

In [176]:
def prepare_input_labels(data):
    """
    Shift word sequences by 1 position so that the target for position (i) is
    word at position (i+1). The model will use all words up till position (i)
    to predict the next word.
    """

    data = tf.expand_dims(data, axis=-1)
    tokenized_sentences = vectorize_layer(data)
    x = tokenized_sentences[:, :-1]
    y = tokenized_sentences[:, 1:]

    return x, y

In [177]:
# Preparing Data for Model
dataset = tf.data.TextLineDataset(file_path).filter(lambda x: tf.cast(tf.strings.length(x), bool))

In [178]:
# Configure the dataset for performance
def config_dataset(text_ds, buffer_size, batch_size):
    text_ds = text_ds.shuffle(buffer_size=32)
    text_ds = text_ds.batch(batch_size, drop_remainder=True)
    text_ds = text_ds.map(prepare_input_labels)
    # Apply Dataset.cache and Dataset.prefetch to improve performance:
    text_ds = text_ds.cache()
    text_ds = text_ds.prefetch(tf.data.AUTOTUNE)

    return text_ds

In [179]:
# Use the `TextVectorization` layer to normalize, split, and map strings to
# integers. Set the `output_sequence_length` length to pad all samples to the
# same length.
vectorize_layer = TextVectorization(
    standardize=custom_standardization,
    max_tokens=vocab_size - 1,
    output_mode='int',
    output_sequence_length=SEQUENCE_LENGTH + 1)

In [180]:
# Create the vocabulary for text dataset
vectorize_layer.adapt(dataset.batch(BATCH_SIZE))
# Returns a list of all vocabulary tokens sorted (descending) by their frequency
inverse_vocab = vectorize_layer.get_vocabulary()
print(inverse_vocab[:30], '\n')

['', '[UNK]', 'the', 'and', 'of', 'to', 'that', 'in', 'he', 'shall', 'unto', 'for', 'i', 'his', 'a', 'lord', 'they', 'be', 'is', 'him', 'not', 'them', 'it', 'with', 'all', 'thou', 'thy', 'was', 'god', 'which'] 



In [181]:
dataset = config_dataset(dataset, buffer_size, BATCH_SIZE)

## 2.2 The model components
    With the dataset ready, it’s time to write the code for the language model itself. We will go for the GPT version of transformer-based models which uses the decoder block from the original transformer model to build a next-token predictor. Causal masking in the multi-head-attention layer allows us to use the entire input sequence as the targets, since embeddings at time point t are only allowed to attent to previous tokens, not subsequent tokens.

### 2.2.1 The Embedding
    The first thing you want to do is write a subclassed layer class that embeds the individual token indices in the input (each index should be mapped to a vector that is looked up from a table). For this you can use tf.keras.layers.Embedding, in which the input dimension should be the vocabulary size that you chose and the output dimension is the dimensionality of the embeddings (try something between 64 and 256). What this subclassed layer should do is embed not only the token indices but also their position in the input. Transformer based models do not inherently operate on sequences but rather on sets of tokens. This means the model will also need to learn a positional embedding with a second embedding layer that has an input dimension of the sequence length and the same output embedding dimension. Without positional encoding, the order of the input sequence would not affect the model in any way.
    • In the init method define the two embeddings for the token indices and their position using tf.keras.layers.Embedding. Assign their input and output dim as described above.
    • In the call method first construct a tensor with tf.range from zero to m (the input sequence length, which can be obtained from the input’s shape). This will act as the indices to look up the positional code for each sub-word. Then, feed the token index embedding layer with the input sequence and the positional embedding layer with the newly constructed range tensor. FInally add the two embeddings (like it was done in a ResNet) and return the result.

In [182]:
# Create two separate embedding layers: one for tokens and one for token index (positions).
class TokenAndPositionEmbedding(layers.Layer):
    def __init__(self, maxlen, vocab_size, embed_dim):
        super(TokenAndPositionEmbedding, self).__init__()
        self.token_emb = layers.Embedding(input_dim=vocab_size, output_dim=embed_dim)
        self.pos_emb = layers.Embedding(input_dim=maxlen, output_dim=embed_dim)

    def call(self, x):
        maxlen = tf.shape(x)[-1]
        positions = tf.range(0, maxlen)
        positions = self.pos_emb(positions)
        x = self.token_emb(x)

        return x + positions

In [183]:
def casual_attention_mask(batch_size, n_dest, n_src, dtype):
    """
    Mask the upper half of the dot product matrix in self attention.
    This prevents flow of information from future tokens to current token.
    1's in the lower triangle, counting from the lower right corner.
    """
    i = tf.range(n_dest)[:, None]
    j = tf.range(n_src)
    m = i >= j - n_src + n_dest

    mask = tf.cast(m, dtype)
    mask = tf.reshape(mask, [1, n_dest, n_src])
    mult = tf.concat(
        [tf.expand_dims(batch_size, axis=-1), tf.constant([1, 1], dtype=tf.int32)], axis=0
    )

    return tf.tile(mask, mult)

### 2.2.2 The TransformerBlock layer
    The next part is the TransformerBlock. You again should create a subclassed layer for this. In it, create a MultiHeadAttention layer with 2-4 attention heads and a key dimension of the dimensionality used for the embeddings in the previous step. If you’d like to understand what is happening inside the MultiHeadAttention, refer to the Courseware to see how it can be implemented.
    • In the init method you have to instantiate the MHA layer, with the num heads and key dim arguments assigned as described above. Additionally, you also need to instantiate two Dense layers. The first has a ReLU activation and between 32 and 256 units and the second has no activation and again as many units as the dimensionality of the embeddings. You will also want to instantiate two dropout layers with a dropout rate of 0.1. Remember that dropout layers require a training argument in the call method. THis needs to be passed down all the way through higher order layers, down to the call method of this layer. Finally, add two layer-normalization layers, with an epsilon of 1e-6.
    • In the call method give the input to the multi-head attention layer as both value and query arguments (internally it will then also be used as the keys), meaning the embedded inputs are used as both the query, value and key arguments. Importantly the use causal mask argument has to be set to true during the call, so the model does not attend to future tokens. Then you use dropout on the output of the MHA and add the result of that back to the layer input, like in residual connections in a ResNet. To the sum, you apply layer normalization. Sometimes, adding the layer norm is done before the sum for stability reasons. The result - let’s call it ln_out - will be used for another residual connection: Apply the two dense layers to it, followed by another dropout layer, and then add ln_out back to the result of those dense and dropout layers (the residual connection). Remember the training argument. Finally, apply the secondlayer normalization and return the final output.
    Wow. That was a lot! But the good news is, you’re almost done building a GPT-like text generator!

In [184]:
# Create a Transformer block as a layer
class TransformerBlock(layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, dropout=0.1):
        super(TransformerBlock, self).__init__()
        # Transformer block multi-head Self Attention
        self.multiHeadSelfAtt = layers.MultiHeadAttention(num_heads, embed_dim)
        self.ffn = keras.Sequential([
            layers.Dense(ff_dim, activation="relu"), layers.Dense(embed_dim),
        ])
        self.layer_norm_1 = layers.LayerNormalization(epsilon=1e-6)
        self.layer_norm_2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout_1 = layers.Dropout(dropout)
        self.dropout_2 = layers.Dropout(dropout)
        
    def call(self, inputs):
        input_shape = tf.shape(inputs)
        batch_size = input_shape[0]
        seq_len = input_shape[1]
        casual_mask = casual_attention_mask(batch_size, seq_len, seq_len, tf.bool)
        attention_output = self.multiHeadSelfAtt(inputs, inputs, attention_mask=casual_mask)
        attention_output = self.dropout_1(attention_output)
        out_1 = self.layer_norm_1(inputs + attention_output)
        ffn_output = self.ffn(out_1)
        ffn_output = self.dropout_2(ffn_output)
        
        return self.layer_norm_2(out_1 + ffn_output)

### 2.2.3 The subclassed model

In [185]:
def createTransformerModel(vocab_size, sequence_length, embed_dim, num_heads, feed_forward_dim):
    model = models.Sequential()
    model.add(layers.Input(shape=(sequence_length,), dtype=tf.int32))
    # Add a class with two separate embedding layers
    model.add(TokenAndPositionEmbedding(sequence_length, num_heads, feed_forward_dim))
    model.add(TransformerBlock(embed_dim, num_heads, feed_forward_dim))
    model.add(layers.Dense(units=vocab_size))

    loss_function = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
    optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)

    model.compile(
        optimizer=optimizer,
        loss=[loss_function, None]
    )
    # No loss and optimization based on word embeddings from transformer block
    return model

In [186]:
class TextGenerator(keras.callbacks.Callback):
    """A callback to generate text from a trained model.
    1. Feed some starting prompt to the model
    2. Predict probabilities for the next token
    3. Sample the next token and add it to the next input

    Arguments:
        max_tokens: Integer, the number of tokens to be generated after prompt.
        start_tokens: List of integers, the token indices for the starting prompt.
        index_to_word: List of strings, obtained from the TextVectorization layer.
        top_k: Integer, sample from the `top_k` token predictions.
        print_every: Integer, print after this many epochs.
    """
    def __init__(self, max_tokens, start_tokens, index_to_word, sequence_length, k=10, print_every=1):
        self.max_tokens = max_tokens
        self.start_tokens = start_tokens
        self.index_to_word = index_to_word
        self.k = k
        self.print_every = print_every
        self.sequence_length = sequence_length

    def sample_from(self, logits):
        logits, indices = tf.math.top_k(logits, self.k, sorted=True)
        indices = np.asarray(indices).astype('int32')
        predictions = keras.activations.softmax(tf.expand_dims(logits, 0))[0]
        predictions = np.asarray(predictions).astype('float32') 

        return np.random.choice(indices, p=predictions)
    
    def detokenize(self, number):
        return self.index_to_word[number]
    
    def on_epoch_end(self, epoch, logs=None):
        start_tokens = [_ for _ in self.start_tokens]
        if (epoch + 1) % self.print_every != 0:
            return
        
        num_tokens_generated = 0
        tokens_generated = []
        while num_tokens_generated <= self.max_tokens:
            pad_len = self.sequence_length - len(start_tokens)
            sample_index = len(start_tokens) - 1

            if pad_len < 0:
                x = start_tokens[:self.sequence_length]
                sample_index = self.sequence_length - 1
            elif pad_len > 0:
                x = start_tokens + [0] * pad_len
            else:
                x = start_tokens

            x = np.array([x])
            y = self.model.predict(x)
            sample_token = self.sample_from(y[0][sample_index])
            tokens_generated.append(sample_token)
            start_tokens.append(sample_token)
            num_tokens_generated = len(tokens_generated)
        
        text = " ".join([self.detokenize(_) for _ in self.start_tokens + tokens_generated]) 
        print(f'Generated text: {text}\n')

In [188]:
word_to_index = {}
# Tokenize starting prompt
for index, word in enumerate(inverse_vocab):
    word_to_index[word] = index

In [189]:
start_prompt = 'The First Book'
start_tokens = [word_to_index.get(_, 1) for _ in start_prompt.split()]
num_tokens_generated = 50
text_generator_callback = TextGenerator(num_tokens_generated, start_tokens, 
                                        inverse_vocab, SEQUENCE_LENGTH)    

In [190]:
gpt_model = createTransformerModel(vocab_size=vocab_size, 
                                    sequence_length=SEQUENCE_LENGTH, 
                                    embed_dim=embedding_dim,
                                    num_heads=NUM_HEADS,
                                    feed_forward_dim=FEED_FORWARD_DIM)

In [191]:
history = gpt_model.fit(dataset,
                        epochs=10,
                        verbose=1,
                        callbacks=[text_generator_callback])

Epoch 1/10
Generated text: [UNK] [UNK] [UNK] of  that not the is not that unto  is the  of of of the that not is  unto that the the  of and the and and that that of and and the and that of not and that that unto that the of a the the

Epoch 2/10
Generated text: [UNK] [UNK] [UNK] the of of of and and unto that of  the of is a  and that unto and of the of the the the unto the the to that to the  to the the unto the that of  of  of of a a the and the to

Epoch 3/10


Generated text: [UNK] [UNK] [UNK] the and to of that  of  of the and of the the of and is that the unto and that of he the and of a unto and the  a and and and the of a of of to the the he the and that the of of

Epoch 4/10
Generated text: [UNK] [UNK] [UNK] of the and a  of is is and to  and and of that and to of  and of unto is and the is that and  the and the a the and and the the the in that of of and to and to that unto and that

Epoch 5/10
Generated text: [UNK] [UNK] [UNK] the of  the a and  of the the of  the in of the  the of and unto the and the to of and and  the is that of and the in that the of and the a is and the and and the of that of

Epoch 6/10


Generated text: [UNK] [UNK] [UNK] the unto the of a unto  be that to and and and and is and and the and of and and of unto that of of to of of and of and of is the unto of a that is a of of the the and is of of that

Epoch 7/10
Generated text: [UNK] [UNK] [UNK]                                                   

Epoch 8/10
Generated text: [UNK] [UNK] [UNK] that and and the and unto and of that of of the be be to to a that   of in unto  the be and and to  unto and that the the the to a and of be the the in the the the be unto that 

Epoch 9/10


Generated text: [UNK] [UNK] [UNK] the that that and of a of of the  be the and of of to the of the that  and be  and and and of the that the that be unto be and be of of and of and  and unto the and the in the the

Epoch 10/10
Generated text: [UNK] [UNK] [UNK] is and to the of in the that and that the and and that and and the in  to of  the the the of and and and a  of and and and the and that and and and the the unto the of to and the of and



In [192]:
gpt_model.summary()

Model: "sequential_14"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 token_and_position_embeddin  (None, 4, 64)            384       
 g_7 (TokenAndPositionEmbedd                                     
 ing)                                                            
                                                                 
 transformer_block_7 (Transf  (None, 4, 64)            41792     
 ormerBlock)                                                     
                                                                 
 dense_23 (Dense)            (None, 4, 72192)          4692480   
                                                                 
Total params: 4,734,656
Trainable params: 4,734,656
Non-trainable params: 0
_________________________________________________________________
