# Transformer

## Table of Contents



## Installs

In [1]:
!pip install -q -U tensorflow-text

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m5.8/5.8 MB[0m [31m33.5 MB/s[0m eta [36m0:00:00[0m
[?25h

In [2]:
!pip install sentencepiece

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting sentencepiece
  Downloading sentencepiece-0.1.97-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.3/1.3 MB[0m [31m8.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: sentencepiece
Successfully installed sentencepiece-0.1.97


## Imports

In [3]:
import tensorflow_text as tf_txt
import tensorflow as tf
import sentencepiece as sp

## Text Processing

In [4]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [5]:
bible = open('/content/drive/My Drive/Uni/7/IANNwTF/bible.txt', 'r').read()

In [6]:
len(bible)

4332496

In [None]:
def normalizetext(text):
  notwanted = '. ; , : ? ! -'
  notwanted = notwanted.split()
  
  text = text.replace('\n', ' ')
  for w in notwanted:
    text = text.replace(w, ' ')
  
  text = text.lower()
  return text

## SentencePiece Tokenizer

In [7]:
sp.SentencePieceTrainer.train(
    input='/content/drive/My Drive/Uni/7/IANNwTF/bible.txt', model_prefix='tokenizer_model', model_type="unigram", vocab_size=5000)

In [8]:
# deserialize the trained model file to load it in the correct format
trained_tokenizer_model = tf.io.gfile.GFile('tokenizer_model.model', "rb").read()

# load the model as a tokenizer that can be used inside a tensorflow model
# cant do it yet because I need to get the tensorflow-text to work
tokenizer = tf_txt.SentencepieceTokenizer(
    model=trained_tokenizer_model, out_type=tf.int32, nbest_size=-1, alpha=1, reverse=False,
    add_bos=False, add_eos=False, return_nbest=False, name=None
)

In [9]:
tokens = tokenizer.tokenize(bible)

## Sliding window

In [10]:
SEQ_LENGTH = 128
windows = tf_txt.sliding_window(data=tokens, width=SEQ_LENGTH+1, axis=0) # still have to import tensorflow_text

In [11]:
windows.shape

TensorShape([1026906, 129])

In [12]:
input = windows[:, :SEQ_LENGTH]
input_ds = tf.data.Dataset.from_tensor_slices(input)

In [13]:
target = windows[:, 1:]
target_ds = tf.data.Dataset.from_tensor_slices(target)

In [14]:
dataset = tf.data.Dataset.zip((input_ds, target_ds))

In [15]:
dataset = dataset.shuffle(1000).batch(32).prefetch(tf.data.AUTOTUNE)

In [16]:
for batch in dataset.take(1):
  print(batch[1][0].shape[0])

128


## The Components

### EmbeddingLayer

In [23]:
class PositionalEmbedding(tf.keras.layers.Layer):
  
  def __init__(self, embed_dim, vocab_size):
    super(PositionalEmbedding, self).__init__()

    super().__init__()
    self.embed_dim = embed_dim
    self.embedding = tf.keras.layers.Embedding(vocab_size, embed_dim, mask_zero=True) 
    self.pos_encoding = tf.keras.layers.Embedding(vocab_size, embed_dim, mask_zero=True)

  
  def call(self, input, training=False): # input.shape = (batch_size, sequence_length)
    lookup = tf.range(0, input[0].shape[0], 1)

    embedding = self.embedding(input)
    pos_encoding = self.pos_encoding(lookup) # is this really correct?

    return embedding + pos_encoding

### Transformer Block

In [31]:
class TransformerBlock(tf.keras.layers.Layer):

    def __init__(self, num_heads, embed_dim, dff, dropout_rate):
        super(TransformerBlock, self).__init__()
        self.mha = tf.keras.layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.dense1 = tf.keras.layers.Dense(dff, activation='relu')
        self.dense2 = tf.keras.layers.Dense(embed_dim)
        self.dropout1 = tf.keras.layers.Dropout(dropout_rate)
        self.dropout2 = tf.keras.layers.Dropout(dropout_rate)
        self.layernorm1 = tf.keras.layers.LayerNormalization()
        self.layernorm2 = tf.keras.layers.LayerNormalization()
        self.add1 = tf.keras.layers.Add()
        self.add2 = tf.keras.layers.Add()
        ## still have to add the dense layers and dropout layers

    def call(self, x):
        attn_output = self.mha(
            query=x,
            value=x,
            key=x,
            use_causal_mask = True)
        attn_output = self.dropout1(attn_output)
        x = self.add1([x, attn_output])
        ln_out = self.layernorm1(x)
        x = self.dense1(ln_out)
        x = self.dense2(x)
        x = self.dropout2(x)
        x = self.add2([ln_out, x])
        out = self.layernorm2(x)

        return out

## The Transformer

In [30]:
class Transformer(tf.keras.Model):
  
    def __init__(self, sentence_piece_tokenizer, embed_dim, vocab_size, num_heads, dff, dropout_rate):
        super(Transformer, self).__init__()

        self.optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)
        self.loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
        self.metrics_list = [
                            tf.keras.metrics.SparseCategoricalCrossentropy(name="loss"),
                            tf.keras.metrics.Accuracy(name="acc")
                           ]

        self.tokenizer = sentence_piece_tokenizer
        self.pos_embed = PositionalEmbedding(embed_dim=embed_dim, vocab_size=vocab_size)
        self.transformer_block = TransformerBlock(num_heads=num_heads, embed_dim=embed_dim, dff=dff, dropout_rate=dropout_rate)
        self.dense = tf.keras.layers.Dense(units=vocab_size)

    #2. call method (forward computation)
    def call(self, img, training=False):
        x = self.pos_embed(img)
        x = self.transformer_block(img)
        x = self.dense(x)
        return x

    #3. metrics property
    @property
    def metrics(self):
        # return a list with all metrics in the model
        return self.metrics_list


    #4 reset all metrics object
    def reset_metrics(self):
        for metric in self.metrics:
            metric.reset_states()

    #5 training step method
    def train_step(self, data):
        # update the state of the metrics according to loss
        # return a dictionary with metrics name as keys an metric results
        
        img, label = data
        with tf.GradientTape() as tape:
            output = self(img, training=True)
            loss = self.loss(label, output)
        
    
        gradients = tape.gradient(loss, self.trainable_variables)
        self.optimizer.apply_gradients(zip(gradients, self.trainable_variables))

        #update the state of the metrics according to loss
        #self.metrics[0].update_state(label, output)
        for metric in self.metrics:
            metric.update_state(label, output)

        # return a dictionary with metric names as keys and metric results as values
        return {m.name : m.result() for m in self.metrics}
    

    #6. test step method
    def test_step(self, data):
        img, label = data
        output = self(img, training=False)
        loss = self.loss(label, output)
        #self.metrics[0].update_state(label, output)
        for metric in self.metrics:
            metric.update_state(label, output)

        return {"val_"+m.name : m.result() for m in self.metrics}


    def generate_text(self, prompt, output_length, top_k):
        tokens = self.sentence_piece_tokenizer.tokenize(prompt)
        tokens = tf.expand_dim(tokens, axis=0)
        logits = self(tokens)


## Training

In [None]:
import tqdm
def training_loop(model, train_ds, val_ds, epochs, train_summary_writer, val_summary_writer, save_path):
  #1. iterate over epochs
  for e in range(epochs):
    #2. train steps on all batchs in the training data
    for data in tqdm.tqdm(train_ds):
      ret,metrics = model.train_step(data)
    # 3. log and print data metrics
    with train_summary_writer.as_default():
      for metric in model.metrics:
        print(metric)
        tf.summary.scalar(f"{metric.name}", metric.result(), step=e)

    # print the metrics
    print([f"{key}: {value.numpy()}" for (key, value) in metrics.items()])


    #4 reset the metrics
    model.reset_metrics()

    #5. evaluate on validation data
    for data in val_ds:
      metrics = model.test_step(data)
    
    #6. log validation metrics
    with val_summary_writer.as_default():
      for metric in model.metrics:
        print(metric)
        tf.summary.scalar(f"{metric.name}", metric.result(), step=e)

    # print the metrics
    print([f"{key}: {value.numpy()}" for (key, value) in metrics.items()])
    
    #7. reset metric objects
    model.reset_metrics()

  #8 save model weights
  model.save_weights(save_path)

## Testing

In [32]:
num_heads = 4
sentence_piece_tokenizer = tokenizer
embed_dim = 128
vocab_size = 5000
dff = 128
dropout_rate = 0.1
model = Transformer(num_heads=num_heads, 
                    sentence_piece_tokenizer=sentence_piece_tokenizer,
                    embed_dim=embed_dim,
                    vocab_size=vocab_size,
                    dff=dff,
                    dropout_rate=dropout_rate)

ValueError: ignored