In [2]:
!pip install tensorflow_text

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting tensorflow_text
  Downloading tensorflow_text-2.11.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (5.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m5.8/5.8 MB[0m [31m27.0 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: tensorflow_text
Successfully installed tensorflow_text-2.11.0


In [3]:
!pip install sentencepiece

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting sentencepiece
  Downloading sentencepiece-0.1.97-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.3/1.3 MB[0m [31m17.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: sentencepiece
Successfully installed sentencepiece-0.1.97


In [4]:

import tensorflow as tf
import tensorflow_text as tf_txt
from pathlib import Path
import re
import sentencepiece as sp
import numpy as np
import datetime

path = '/content/drive/MyDrive/bible.txt'

In [5]:
sp.SentencePieceTrainer.train(
    input=path, model_prefix='tokenizer_model', model_type="unigram", vocab_size=2000)

# deserialize the trained model file to load it in the correct format
trained_tokenizer_model = tf.io.gfile.GFile('tokenizer_model.model', "rb").read()

# load the model as a tokenizer that can be used inside a tensorflow model
tokenizer = tf_txt.SentencepieceTokenizer(
    model=trained_tokenizer_model, out_type=tf.int32, nbest_size=-1, alpha=1, reverse=False,
    add_bos=False, add_eos=False, return_nbest=False, name=None
)

# create the tokens by applying the tokenizer to the text
tokens = tokenizer.tokenize('bible.txt')

# define an input sequence length
input_seq_length = 32

# create the windows
windows = tf_txt.sliding_window(data = tokens, width = input_seq_length+1, axis = -1,name = None)

# turn the windows into a TF dataset
dataset = tf.data.Dataset.from_tensor_slices(windows)

# shuffle, batch, and prefetch
dataset = dataset.shuffle(1000)
dataset = dataset.batch(32)
dataset = dataset.prefetch(tf.data.AUTOTUNE)

print(dataset)

<PrefetchDataset element_spec=TensorSpec(shape=(None, 33), dtype=tf.int32, name=None)>


In [6]:
vocabulary_size=10 
embedding_di=64 

class Embedding(tf.keras.layers.Layer):
    
    def __init__(self):
        super(Embedding, self).__init__()

        self.embedding_indices = tf.keras.layers.Embedding(input_dim=vocabulary_size, output_dim=embedding_di)
        self.embedding_position = tf.keras.layers.Embedding(input_dim=vocabulary_size, output_dim=embedding_di)
        self.add = tf.keras.layers.Add()

    def call(self, input):
        a = self.embedding_position(tf.range(len(input)))
        b = self.embedding_indices(input)
        c = a + b

        return c

class TransformerBlock(tf.keras.layers.Layer):

    def __init__(self):
        super(TransformerBlock, self).__init__()

        self.mha_layer = tf.keras.layers.MultiHeadAttention(num_heads=2, key_dim=embedding_di) # use 2-4 attention heads
        self.dense1 = tf.keras.layers.Dense(units=32, activation='relu')
        self.dense2 = tf.keras.layers.Dense(units=embedding_di, activation=None)
        self.dropout1 = tf.keras.layers.Dropout(rate=0.1)
        self.dropout2 = tf.keras.layers.Dropout(rate=0.1)
        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.add = tf.keras.layers.Add()


    def call(self, input, training=False):
        
        x = self.mha_layer(query=input, value=input, key=input, use_causal_mask=True)
        x = self.dropout(x, training=training)
        x = self.add([input, x])
        in_out  = self.layernorm(x)

        x = self.dense1(in_out)
        x = self.dense2(x)
        x = self.dropout(x, training=training)
        x = self.add([in_out, x])
        x = self.layernorm2(x)

        return x    

class Transformer(tf.keras.Model):
    def __init__(self):
        super(Transformer, self).__init__()

        self.opt = tf.keras.optimizers.Adam(learning_rate=0.001)
        self.lossf = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
        self.metrics_list = [tf.keras.metrics.Mean(name="loss"),
                            tf.keras.metrics.CategoricalAccuracy(name="acc"),
                            tf.keras.metrics.TopKCategoricalAccuracy(3,name="top-3-acc")]

        self.embedding_layers = Embedding()
        self.transformer_block = TransformerBlock()
        self.dense = tf.keras.layers.Dense(units=vocabulary_size, activation=None)
        self.tokenizer = tokenizer
        self.input_seq_length = input_seq_length

    def call(self, input, training=False):
        x = self.embedding_layers(input, training=training)
        x = self.transformer_block(x, training=training) 
        x = self.dense(x)
        
        return x

    def reset_metrics(self):
        for metric in self.metrics:
            metric.reset_states()

    @tf.function
    def train_step(self, data):

        inputs = data[:self.input_seq_length]
        targets = data[1:]

        with tf.GradientTape() as tape:
            predictions = self(inputs, training=True)
            loss = self.loss_function(targets, predictions) + tf.reduce_sum(self.losses)

        gradients = tape.gradient(loss, self.trainable_variables)
        self.optimizer.apply_gradients(zip(gradients, self.trainable_variables))

        self.metrics[0].update_state(loss)

        for metric in self.metrics[1:]:
            metric.update_state(targets, predictions)

        return {m.name: m.result() for m in self.metrics}

    def generate_text(self, prompt, output_length, top_k):

        input = prompt

        for _ in range(output_length):

            tokens = self.tokenizer.tokenize(input)

            tf.expand_dims(tokens, axis=0)

            logits = self.call(tokens)

            top_k_logits, top_k_logits_locals = tf.math.top_k(logits, k=top_k, sorted=True)

            sorted_logits = tf.sort(top_k_logits, direction='DESCENDING')
            top_logit = tf.gather(sorted_logits, [0])

            top_logit_index = np.where(top_k_logits=top_logit)
            top_logit_local = top_k_logits_locals[top_logit_index]

            next_token = tokens[top_logit_local]

            tokens = tf.concat([tokens,next_token],0)

            input = self.tokenizer.decode(tokens)

        output = input

        # return text
        return output

In [10]:
def training(model, dataset, epochs, start, output_length, top_k, train_summary_writer): #, val_summary_writer):
    for epoch in range(epochs):
        print(f"Epoch: {epoch}")
        print(f"Generated text: {model.generate_text(start, output_length, top_k)}")

        for data in tqdm.tqdm(dataset, position=0, leave=True):
            metrics = model.train_step(data)
            
            
            with train_summary_writer.as_default():
                for metric in model.metrics:
                    tf.summary.scalar(f"{metric.name}", metric.result(), step=epoch)

       
        print([f"{key}: {value.numpy()}" for (key, value) in metrics.items()])

        
        model.reset_metrics()    
        
       
        for data in val_ds:
            metrics = model.test_step(data)
        
       
            with val_summary_writer.as_default():
               for metric in model.metrics:
                    tf.summary.scalar(f"{metric.name}", metric.result(), step=epoch)
                    
        print([f"val_{key}: {value.numpy()}" for (key, value) in metrics.items()])

        # reset all metrics
        model.reset_metrics()
        print("\n")

        # save model
        model.save_weights(dir)

In [12]:
epochs = 2 # between 100 and 600
output_length = 10
top_k = 5
start = "The Bible said"


model = Transformer()

config_name= "Bible"
current_time = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
train_log_path = f"logs/{config_name}/{current_time}/train"
val_log_path = f"logs/{config_name}/{current_time}/val"

# log writer
train_summary_writer = tf.summary.create_file_writer(train_log_path)
val_summary_writer = tf.summary.create_file_writer(val_log_path)

training(model=model, dataset=dataset, epochs=epochs, output_length=output_length, start = start, top_k=top_k, train_summary_writer=train_summary_writer)

Epoch: 0


InvalidArgumentError: ignored