In [4]:
pip install -q "tensorflow-text==2.8.*"

Note: you may need to restart the kernel to use updated packages.


In [41]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.preprocessing.text import Tokenizer
import numpy as np
import warnings
warnings.filterwarnings('always')
warnings.filterwarnings('ignore')

In [42]:
#Open data
data_file = open('Data.txt').read()
data_file

#Create tokenizer object
tokenizer = Tokenizer()

#Convert data to lowercase
data = data_file.lower().split('.')

In [43]:
def Text_Vectorization(tokenizer, data, max_sentence_length):
    #Create dictionary of words with the frequency they occur
    #Every word gets unique value > 0
    #0 is reserved for padding
    tokenizer.fit_on_texts(data)

    #Transforms sentences into set of integers from the dictionary
    input_sequences = tokenizer.texts_to_sequences(data)
    
    #Pad sequences to length of max_length
    input_sequences = tf.keras.preprocessing.sequence.pad_sequences(input_sequences, maxlen = max_sentence_length, padding ='post', value = 0)
    
    #Counts total words
    total_words = len(tokenizer.word_index) + 1
    
    return total_words, input_sequences    

In [44]:
def Create_Output_Sequences(input_sequences):
    #Shift array by one to create targets
    #Last item is all 0s because it is not used
    roll_amount = input_sequences.shape[1] * (input_sequences.shape[0]-1)
    output_sequences = np.array(np.roll(input_sequences, roll_amount))
    output_sequences[-1] = 0

In [45]:
#Embed one input sentence at a time into a matrix of sentence length by embed_dim
#Adds position embeddings to word matrix
class Embed_Inputs(keras.layers.Layer):
    def __init__(self, total_words, embed_dim, max_sentence_length):
        self.embedding_layer = keras.layers.Embedding(input_dim = total_words, output_dim = embed_dim, mask_zero = True, input_length = max_sentence_length)
        self.positions_layer = keras.layers.position_embedding_layer(input_dim = max_sentence_length, output_dim = embed_dim)
        
    def Call(self, data):
        self.embedded_words = self.embedding_layer(data) + self.positions_layer(tf.range(max_sentence_length))
        return self.embedded_words

In [46]:
class Transformer_Block(keras.layers.Layer):
    def __init__(self, num_heads, embed_dim, ff_dim, dropout):
        self.attention = keras.layers.MultiHeadAttention(num_heads = num_heads, key_dim = embed_dim)
        self.layer_norm1 = keras.layers.LayerNormalization(epsilon = 1e-6)
        self.feed_forward = keras.Sequential([keras.layers.Dense(ff_dim, activation = "relu"), keras.layers.Dense(embed_dim)])
        self.layer_norm2 = keras.layers.LayerNormalization(1e-6)
        self.dropout = keras.layers.dropout(dropout)
        
    def Call(self, embedded_words, training):
        attention_output = self.attention(embedded_words, embedded_words)
        attention_output = self.dropout1(attention_output, training=training)
        #Residual connections from input to add/norm
        res1 = self.layer_norm1(embedded_words + attention_output)
        ffn_output = self.feed_forward(res1)
        ffn_output = self.dropout2(ffn_output, training=training)
        #Residual connections from add/norm to add/norm
        final_output = self.layer_norm2(ffn_output + res1)
        return final_output

In [47]:
class MaskedTransformerBlock(keras.layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1):
        super(MaskedTransformerBlock, self).__init__()
        self.att1 = keras.layers.MultiHeadAttention(num_heads=num_heads,
        key_dim=embed_dim)
        self.att2 = keras.layers.MultiHeadAttention(num_heads=num_heads,
        key_dim=embed_dim)
        self.ffn = keras.Sequential(
        [keras.layers.Dense(ff_dim, activation="gelu"),
        keras.layers.Dense(embed_dim),]
        )
        self.layernorm1 = keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm3 = keras.layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = keras.layers.Dropout(rate)
        self.dropout2 = keras.layers.Dropout(rate)
        self.dropout3 = keras.layers.Dropout(rate)
        
    def Causal_Attention_Mask(self, batch_size, n_dest, n_src, dtype):
        i = tf.range(n_dest)[:, None]
        j = tf.range(n_src)
        m = i >= j - n_src + n_dest
        mask = tf.cast(m, dtype)
        mask = tf.reshape(mask, [1, n_dest, n_src])
        mult = tf.concat(
        [tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)], 0
        )
        return tf.tile(mask, mult)

    def Call(self, inputs, training):
        input_shape = tf.shape(inputs[0])
        batch_size = input_shape[0]
        seq_len = input_shape[1]
        mask = self.causal_attention_mask(batch_size,
        seq_len, seq_len,
        tf.bool)
        attn_output1 = self.att1(inputs[0], inputs[0],
        attention_mask = mask)
        attn_output1 = self.dropout1(attn_output1, training=training)
        out1 = self.layernorm1(inputs[0] + attn_output1)
        attn_output2 = self.att2(out1, inputs[1])
        attn_output2 = self.dropout2(attn_output2, training=training)
        out2 = self.layernorm1(out1 + attn_output2)
        ffn_output = self.ffn(out2)
        ffn_output = self.dropout3(ffn_output, training=training)
        return self.layernorm2(out2 + ffn_output)

In [48]:
class MaskedTokenAndPositionEmbedding(keras.layers.Layer):
    def __init__(self, maxlen, vocab_size, embed_dim):
        super(MaskedTokenAndPositionEmbedding, self).__init__()
        self.token_emb = keras.layers.Embedding(input_dim=vocab_size,
        output_dim=embed_dim,
        mask_zero=True)
        self.pos_emb = keras.layers.Embedding(input_dim=maxlen+1,
        output_dim=embed_dim,
        mask_zero=True)
    def Call(self, x):
        maxlen = tf.shape(x)[-1]
        positions = tf.range(start=1, limit=maxlen+1, delta=1)
        positions = positions * tf.cast(tf.sign(x),tf.int32)
        positions = self.pos_emb(positions)
        x = self.token_emb(x)
        return x + positions

In [49]:
#Pool context with input of encoder and output of decoder 
#Perform self attention 
#Pass through feed forward then through decoder
class Multi_Head_Attention_Pooling(keras.layers.Layer):
    def __init__(self, num_heads, embed_dim, ff_dim, dropout):
        self.attention = keras.layers.MultiHeadAttention(num_heads = num_heads, key_dim = embed_dim)
        self.feed_forward = keras.Sequential([keras.layers.Dense(ff_dim, activation = "relu"), keras.layers.Dense(embed_dim)])
        self.dropout = keras.layers.dropout(dropout)
    
    def Call(self, input_data, context):
        attention_output = self.attention(input_data, context)
        #Perform self attention after attention pooling 
        self_attention_output = self.attention(attention_output, attention_output)
        attention_output = self.dropout1(attention_output, training=training)
        ffn_output = self.feed_forward(res1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return ffn_output 

In [50]:
#First set of context vectors is randomized
class Context(keras.layers.Layer):
    def __init__(self, embed_dim, context_dim):
        #Create vector of ramdom weights with size context_dim that is trainable
        #self.context_vector = np.random.rand(embed_dim, context_dim)
        self.attention_pooling = Multi_Head_Attention_Pooling(num_heads, embed_dim, ff_dim, dropout)
    def Create_New_Context(self, input_data, context):
        #Pool decoder output with previous context vector
        pooling_output = self.attention_pooling.Call(input_data, context)
        return pooling_output

In [51]:
class Encoder(keras.layers.Layer):
    def __init__(self, data, training, total_words, embed_dim, max_sentence_length, num_heads, ff_dim, dropout):
        self.embeddings = Embed_Inputs(total_words, embed_dim, max_sentence_length)
        self.transformer_block = Transformer_Block(num_heads, embed_dim, ff_dim, dropout)
        
    def Call(self, data):
        embeddings_output = self.embeddings.Forward(data)
        transformer_block_output = self.transformer_block.Forward(embeddings_output, training)
        return transformer_block_output

In [52]:
class Decoder(keras.layers.Layer):
    def __init__(self, num_heads, embed_dim, ff_dim, dropout, data, training, total_words, max_sentence_length):
        #Masked positional embeddings
        self.masked_position = MaskedTokenAndPositionEmbedding(max_sentence_length, total_words, embed_dim)
        #Masked mutlihead attention
        self.masked_transformer_block = MaskedTransformerBlock(embed_dim, num_heads, ff_dim, rate=0.1)
        #Encoder output
        self.encoder = Encoder(data, training, total_words, embed_dim, max_sentence_length, num_heads, ff_dim, dropout)
        #Attention pooling =
        self.attention_pooling = Multi_Head_Attention_Pooling(num_heads, embed_dim, ff_dim, dropout)
        #Transformer block with enocder output and decoder input
        self.transformer_block = Transformer_Block(num_heads, embed_dim, ff_dim, dropout)
        
    def Call(self, data, batch_size, n_dest, n_src, dtype, training, context):
        #Masked position embeddings
        masked_position_output = self.masked_position.Call(data)
        #Create mask
        create_mask = self.masked_transformer_block.Causal_Attention_Mask(self, batch_size, n_dest, n_src, dtype)
        #Masked multi headed attention
        masked_transfomer_block_output = self.masked_transformer_block(data, training)
        #Encoder Output
        encoder_output = self.Encoder.Call(data)
        #Perform attention pooling 
        pooling_output = self.attention_pooling(enocder_output, context)
        #Transformer block with encoder output and decoder input
        transformer_block_output = self.transformer_block.Call(pooling_output, training)

In [53]:
class Output(keras.layers.Layer):
    def __init__(self, num_heads, embed_dim, ff_dim, dropout, data, training, total_words, max_sentence_length, context_dim):
        self.decoder = Decoder(num_heads, embed_dim, ff_dim, dropout, data, training, total_words, max_sentence_length)
        #??? self.linear = keras.layers.Dense()
        #Softmax
        self.context = Context(embed_dim, context_dim)
    def Call(self, data, batch_size, n_dest, n_src, dtype, training, context):
        decoder_output = self.decoder.Call(data, batch_size, n_dest, n_src, dtype, training, context)
        #linear_output = self.linear(decoder_output)
        #softmax_output = self.softmax(linear_output)
    def Context(self, input_data, context):
        new_context = self.context.Create_New_Context(input_data, context)
        return new_context