In [None]:
pip install -q -U tensorflow-text==2.11.*

Model Transformer

In [None]:
from tensorflow import keras
from tensorflow.keras import layers
import tensorflow as tf
import pandas as pd
import numpy as np


In [None]:
import collections
import os
import pathlib
import re
import string
import sys
import tempfile
import time

import numpy as np
import matplotlib.pyplot as plt

import tensorflow_datasets as tfds
import tensorflow_text as text
import tensorflow as tf

def add_start_end(ragged, START, END):
  count = ragged.bounding_shape()[0]
  starts = tf.fill([count,1], START) # START
  ends = tf.fill([count,1], END) # END
  return tf.concat([starts, ragged, ends], axis=1)

def cleanup_text(reserved_tokens, token_txt):
  # Drop the reserved tokens, except for "[UNK]".
  bad_tokens = [re.escape(tok) for tok in reserved_tokens if tok != "[UNK]"]
  bad_token_re = "|".join(bad_tokens)

  bad_cells = tf.strings.regex_full_match(token_txt, bad_token_re)
  result = tf.ragged.boolean_mask(token_txt, ~bad_cells)

  # Join them into strings.
  result = tf.strings.reduce_join(result, separator=' ', axis=-1)

  return result


class CustomTokenizer(tf.Module):
  def __init__(self, reserved_tokens, vocab_path):
    self.tokenizer = text.BertTokenizer(vocab_path, lower_case=True)
    self._reserved_tokens = reserved_tokens
    self._vocab_path = tf.saved_model.Asset(vocab_path)

    vocab = pathlib.Path(vocab_path).read_text().splitlines()
    self.vocab = tf.Variable(vocab)

    ## Create the signatures for export:   

    # Include a tokenize signature for a batch of strings. 
    self.tokenize.get_concrete_function(
        tf.TensorSpec(shape=[None], dtype=tf.string))

    # Include `detokenize` and `lookup` signatures for:
    #   * `Tensors` with shapes [tokens] and [batch, tokens]
    #   * `RaggedTensors` with shape [batch, tokens]
    self.detokenize.get_concrete_function(
        tf.TensorSpec(shape=[None, None], dtype=tf.int64))
    self.detokenize.get_concrete_function(
          tf.RaggedTensorSpec(shape=[None, None], dtype=tf.int64))

    self.lookup.get_concrete_function(
        tf.TensorSpec(shape=[None, None], dtype=tf.int64))
    self.lookup.get_concrete_function(
          tf.RaggedTensorSpec(shape=[None, None], dtype=tf.int64))

    # These `get_*` methods take no arguments
    self.get_vocab_size.get_concrete_function()
    self.get_vocab_path.get_concrete_function()
    self.get_reserved_tokens.get_concrete_function()

  @tf.function
  def tokenize(self, strings):
    enc = self.tokenizer.tokenize(strings)
    # Merge the `word` and `word-piece` axes.
    enc = enc.merge_dims(-2,-1)

    START = tf.argmax(tf.constant(self._reserved_tokens) == "[start]")
    END = tf.argmax(tf.constant(self._reserved_tokens) == "[end]")

    enc = add_start_end(enc, START, END)
    return enc

  @tf.function
  def detokenize(self, tokenized):
    words = self.tokenizer.detokenize(tokenized)
    return cleanup_text(self._reserved_tokens, words)

  @tf.function
  def lookup(self, token_ids):
    return tf.gather(self.vocab, token_ids)

  @tf.function
  def get_vocab_size(self):
    return tf.shape(self.vocab)[0]

  @tf.function
  def get_vocab_path(self):
    return self._vocab_path

  @tf.function
  def get_reserved_tokens(self):
    return tf.constant(self._reserved_tokens)





In [None]:
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
import tensorflow as tf

import tensorflow_text as text
from tensorflow_text.tools.wordpiece_vocab import bert_vocab_from_dataset as bert_vocab

def createSimplifiedSentence(sentence):
    #copySentence = "[start] " + sentence + " [end]"
    return sentence.encode(encoding = 'UTF-8', errors = 'strict')



def convertText(words):

    for i, sentence in enumerate(words):
      modifiedSentence = createSimplifiedSentence(sentence=sentence)
      words[i] = modifiedSentence

    return words


def createTokenizerVocab(defDS, simpDS):
  bert_tokenizer_params=dict(lower_case=True, preserve_unused_token=True)
  reserved_tokens=["[pad]", "[unk]", "[start]", "[end]"]

  bert_vocab_args = dict(
      # The target vocabulary size
      vocab_size = 30000,
      # Reserved tokens that must be included in the vocabulary
      reserved_tokens=reserved_tokens,
      # Arguments for `text.BertTokenizer`
      bert_tokenizer_params=bert_tokenizer_params,
      # Arguments for `wordpiece_vocab.wordpiece_tokenizer_learner_lib.learn`
      learn_params={},
  )

  combined = np.concatenate((defDS, simpDS))
  combinedDS = tf.data.Dataset.from_tensor_slices(combined)


  en_vocab = bert_vocab.bert_vocab_from_dataset(
    combinedDS.batch(1000).prefetch(tf.data.AUTOTUNE),
    **bert_vocab_args
  )

  write_vocab_file('ENG.txt', en_vocab)

def write_vocab_file(filepath, vocab):
  with open(filepath, 'w') as f:
    for token in vocab:
      print(token, file=f)

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
fullDs = pd.read_csv("/content/drive/MyDrive/English Def Simplifier/fullDataset.csv")
fullDs = fullDs.sample(frac=1)  # shuffle the dataset
fullDs = fullDs.astype(str)

x = fullDs['definition'].values[0:10000]
y = fullDs['simplified_definition'].values[0:10000]

# the two arrays below are tokenized and padded for thetf.data.Dataset.from_tensor_slices((full_train_x,full_train_y)) algorithm (possible to combine the two tokenizers into 1)
full_train_x = convertText(x)

full_train_y = convertText(y)

createTokenizerVocab(full_train_x, full_train_y)
# prepare the training data in a suitable format


In [None]:
tokenizers = tf.Module()
rTokens=["[pad]", "[unk]", "[start]", "[end]"]
tokenizers.en = CustomTokenizer(rTokens, 'ENG.txt')
model_name = 'EnglishTokenizer'
tf.saved_model.save(tokenizers, model_name)

In [None]:
tokenizer = tf.saved_model.load(model_name)
tokenizer.en.get_vocab_size()

<tf.Tensor: shape=(), dtype=int32, numpy=4065>

In [None]:
full_train_y[0]

b'to or at the middle point between two ends or conditions . \xc2\xa0'

In [None]:
enc = tokenizer.en.tokenize([full_train_y[0]])
enc

<tf.RaggedTensor [[2, 46, 43, 76, 45, 520, 276, 131, 106, 971, 43, 1644, 10, 3]]>

In [None]:
tokenizer.en.lookup(enc)

<tf.RaggedTensor [[b'[start]', b'to', b'or', b'at', b'the', b'middle', b'point',
  b'between', b'two', b'ends', b'or', b'conditions', b'.', b'[end]']]>

In [None]:
dec = tokenizer.en.detokenize(enc)
dec

<tf.Tensor: shape=(1,), dtype=string, numpy=
array([b'to or at the middle point between two ends or conditions .'],
      dtype=object)>

In [None]:
e = tokenizer.en.tokenize(["some random sentence"])
e

<tf.RaggedTensor [[2, 105, 32, 1130, 727, 1066, 3]]>

In [None]:
tokenizer.en.lookup(e)

<tf.RaggedTensor [[b'[start]', b'some', b'r', b'##and', b'##om', b'sentence', b'[end]']]>

In [None]:
tokenizer.en.detokenize(e)

<tf.Tensor: shape=(1,), dtype=string, numpy=array([b'some random sentence'], dtype=object)>

In [None]:
ds = tf.data.Dataset.from_tensor_slices((full_train_x,full_train_y))
ds

<TensorSliceDataset element_spec=(TensorSpec(shape=(), dtype=tf.string, name=None), TensorSpec(shape=(), dtype=tf.string, name=None))>

Testing dataset for each layer

In [None]:
MAX_TOKENS=300
def prepare_batch(DEF, SIMPDEF):
    DEF = tokenizers.en.tokenize(DEF)      # Output is ragged.
    DEF = DEF[:, :MAX_TOKENS]    # Trim to MAX_TOKENS.
    DEF = DEF.to_tensor()  # Convert to 0-padded dense Tensor

    SIMPDEF = tokenizers.en.tokenize(SIMPDEF)
    SIMPDEF = SIMPDEF[:, :(MAX_TOKENS+1)]
    SIMPDEF_inputs = SIMPDEF[:, :-1].to_tensor()  # Drop the [END] tokens
    SIMPDEF_labels = SIMPDEF[:, 1:].to_tensor()   # Drop the [START] tokens

    return (DEF, SIMPDEF_inputs), SIMPDEF_labels

In [None]:
BUFFER_SIZE = 20000
BATCH_SIZE = 64

def make_batches(ds):
  return (
      ds
      .shuffle(BUFFER_SIZE)
      .batch(BATCH_SIZE)
      .map(prepare_batch, tf.data.AUTOTUNE)
      .prefetch(buffer_size=tf.data.AUTOTUNE))
  
batches = make_batches(ds) # test
batches # if it prints it works

<PrefetchDataset element_spec=((TensorSpec(shape=(None, None), dtype=tf.int64, name=None), TensorSpec(shape=(None, None), dtype=tf.int64, name=None)), TensorSpec(shape=(None, None), dtype=tf.int64, name=None))>

In [None]:
for (DEF, SIMP), SIMP_labels in batches.take(1):
  break

print(DEF.shape)
print(SIMP.shape)
print(SIMP_labels.shape)

(64, 85)
(64, 104)
(64, 104)


Defining Components of Transformer

Input/Output embedding + Encoding

In [None]:
def positional_encoding(length, depth):
  depth = depth/2

  positions = np.arange(length)[:, np.newaxis]     # (seq, 1)
  depths = np.arange(depth)[np.newaxis, :]/depth   # (1, depth)
  
  angle_rates = 1 / (10000**depths)         # (1, depth)
  angle_rads = positions * angle_rates      # (pos, depth)

  pos_encoding = np.concatenate(
      [np.sin(angle_rads), np.cos(angle_rads)],
      axis=-1) 

  return tf.cast(pos_encoding, dtype=tf.float32)

In [None]:
class PositionalEmbedding(tf.keras.layers.Layer):
  def __init__(self, vocab_size, d_model):
    super().__init__()
    self.d_model = d_model
    self.embedding = tf.keras.layers.Embedding(vocab_size, d_model, mask_zero=True) 
    self.pos_encoding = positional_encoding(length=2048, depth=d_model)

  def compute_mask(self, *args, **kwargs):
    return self.embedding.compute_mask(*args, **kwargs)

  def call(self, x):
    length = tf.shape(x)[1]
    x = self.embedding(x)
    # This factor sets the relative scale of the embedding and positonal_encoding.
    x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
    x = x + self.pos_encoding[tf.newaxis, :length, :]
    return x

In [None]:
embed_DEF = PositionalEmbedding(vocab_size=tokenizers.en.get_vocab_size(), d_model=512)
embed_SIMP = PositionalEmbedding(vocab_size=tokenizers.en.get_vocab_size(), d_model=512)

DEF_emb = embed_DEF(DEF)
SIMP_emb = embed_SIMP(SIMP)
SIMP_emb._keras_mask

<tf.Tensor: shape=(64, 104), dtype=bool, numpy=
array([[ True,  True,  True, ..., False, False, False],
       [ True,  True,  True, ..., False, False, False],
       [ True,  True,  True, ..., False, False, False],
       ...,
       [ True,  True,  True, ..., False, False, False],
       [ True,  True,  True, ..., False, False, False],
       [ True,  True,  True, ..., False, False, False]])>

BaseAttention Layer (Multi-Head Attention + add & Norm)

In [None]:
class BaseAttention(tf.keras.layers.Layer):
  def __init__(self, **kwargs):
    super().__init__()
    self.mha = tf.keras.layers.MultiHeadAttention(**kwargs)
    self.layernorm = tf.keras.layers.LayerNormalization()
    self.add = tf.keras.layers.Add()

Cross Attention Layer (Base Attention Layer where the encoder meets the decoder)

In [None]:
class CrossAttention(BaseAttention):
  def call(self, x, context):
    attn_output, attn_scores = self.mha(
        query=x,
        key=context,
        value=context,
        return_attention_scores=True)
   
    # Cache the attention scores for plotting later.
    self.last_attn_scores = attn_scores

    x = self.add([x, attn_output])
    x = self.layernorm(x)

    return x

In [None]:
sample_ca = CrossAttention(num_heads=2, key_dim=512)

print(DEF_emb.shape)
print(SIMP_emb.shape)
print(sample_ca(SIMP_emb, DEF_emb).shape)

(64, 85, 512)
(64, 104, 512)
(64, 104, 512)


Global Self Attention (input attention)

In [None]:
class GlobalSelfAttention(BaseAttention):
  def call(self, x):
    attn_output = self.mha(
        query=x,
        value=x,
        key=x)
    x = self.add([x, attn_output])
    x = self.layernorm(x)
    return x

In [None]:
sample_gsa = GlobalSelfAttention(num_heads=2, key_dim=512)

print(DEF_emb.shape)
print(sample_gsa(DEF_emb).shape)

(64, 85, 512)
(64, 85, 512)


Casual Attention Layer (Output attention)

In [None]:
class CausalSelfAttention(BaseAttention):
  def call(self, x):
    attn_output = self.mha(
        query=x,
        value=x,
        key=x,
        use_causal_mask = True)
    x = self.add([x, attn_output])
    x = self.layernorm(x)
    return x

In [None]:
sample_csa = CausalSelfAttention(num_heads=2, key_dim=512)

print(SIMP_emb.shape)
print(sample_csa(SIMP_emb).shape)

(64, 104, 512)
(64, 104, 512)


In [None]:
out1 = sample_csa(embed_SIMP(SIMP[:, :3])) 
out2 = sample_csa(embed_SIMP(SIMP))[:, :3]

tf.reduce_max(abs(out1 - out2)).numpy()

4.7683716e-07

Feed Forward Layer (2 Dense and an add & Norm)

In [None]:
class FeedForward(tf.keras.layers.Layer):
  def __init__(self, d_model, dff, dropout_rate=0.1):
    super().__init__()
    self.seq = tf.keras.Sequential([
      tf.keras.layers.Dense(dff, activation='relu'),
      tf.keras.layers.Dense(d_model),
      tf.keras.layers.Dropout(dropout_rate)
    ])
    self.add = tf.keras.layers.Add()
    self.layer_norm = tf.keras.layers.LayerNormalization()

  def call(self, x):
    x = self.add([x, self.seq(x)])
    x = self.layer_norm(x) 
    return x

In [None]:
sample_ffn = FeedForward(512, 2048)

print(SIMP_emb.shape)
print(sample_ffn(SIMP_emb).shape)

(64, 104, 512)
(64, 104, 512)


Encoder (Not including the input embeddings)

In [None]:
class EncoderLayer(tf.keras.layers.Layer):
  def __init__(self,*, d_model, num_heads, dff, dropout_rate=0.1):
    super().__init__()

    self.self_attention = GlobalSelfAttention(
        num_heads=num_heads,
        key_dim=d_model,
        dropout=dropout_rate)

    self.ffn = FeedForward(d_model, dff)

  def call(self, x):
    x = self.self_attention(x)
    x = self.ffn(x)
    return x

In [None]:
sample_encoder_layer = EncoderLayer(d_model=512, num_heads=8, dff=2048)

print(SIMP_emb.shape)
print(sample_encoder_layer(DEF_emb).shape)

(64, 104, 512)
(64, 85, 512)


Full Encoder Including Embeddings

In [None]:
class Encoder(tf.keras.layers.Layer):
  def __init__(self, *, num_layers, d_model, num_heads,
               dff, vocab_size, dropout_rate=0.1):
    super().__init__()

    self.d_model = d_model
    self.num_layers = num_layers

    self.pos_embedding = PositionalEmbedding(
        vocab_size=vocab_size, d_model=d_model)

    self.enc_layers = [
        EncoderLayer(d_model=d_model,
                     num_heads=num_heads,
                     dff=dff,
                     dropout_rate=dropout_rate)
        for _ in range(num_layers)]
    self.dropout = tf.keras.layers.Dropout(dropout_rate)

  def call(self, x):
    # `x` is token-IDs shape: (batch, seq_len)
    x = self.pos_embedding(x)  # Shape `(batch_size, seq_len, d_model)`.
    
    # Add dropout.
    x = self.dropout(x)

    for i in range(self.num_layers):
      x = self.enc_layers[i](x)

    return x  # Shape `(batch_size, seq_len, d_model)`.

In [None]:
# Instantiate the encoder.
sample_encoder = Encoder(num_layers=4,
                         d_model=512,
                         num_heads=8,
                         dff=2048,
                         vocab_size=8500)

sample_encoder_output = sample_encoder(DEF, training=False)

# Print the shape.
print(DEF.shape)
print(sample_encoder_output.shape)  # Shape `(batch_size, input_seq_len, d_model)`.

(64, 85)
(64, 85, 512)


Decoder Without Embeddings

In [None]:
class DecoderLayer(tf.keras.layers.Layer):
  def __init__(self,
               *,
               d_model,
               num_heads,
               dff,
               dropout_rate=0.1):
    super(DecoderLayer, self).__init__()

    self.causal_self_attention = CausalSelfAttention(
        num_heads=num_heads,
        key_dim=d_model,
        dropout=dropout_rate)
    
    self.cross_attention = CrossAttention(
        num_heads=num_heads,
        key_dim=d_model,
        dropout=dropout_rate)

    self.ffn = FeedForward(d_model, dff)

  def call(self, x, context):
    x = self.causal_self_attention(x=x)
    x = self.cross_attention(x=x, context=context)

    # Cache the last attention scores for plotting later
    self.last_attn_scores = self.cross_attention.last_attn_scores

    x = self.ffn(x)  # Shape `(batch_size, seq_len, d_model)`.
    return x

In [None]:
sample_decoder_layer = DecoderLayer(d_model=512, num_heads=8, dff=2048)

sample_decoder_layer_output = sample_decoder_layer(
    x=SIMP_emb, context=DEF_emb)

print(SIMP_emb.shape)
print(DEF_emb.shape)
print(sample_decoder_layer_output.shape)  # `(batch_size, seq_len, d_model)`

(64, 104, 512)
(64, 85, 512)
(64, 104, 512)


Decoder With Embeddings

In [None]:
class Decoder(tf.keras.layers.Layer):
  def __init__(self, *, num_layers, d_model, num_heads, dff, vocab_size,
               dropout_rate=0.1):
    super(Decoder, self).__init__()

    self.d_model = d_model
    self.num_layers = num_layers

    self.pos_embedding = PositionalEmbedding(vocab_size=vocab_size,
                                             d_model=d_model)
    self.dropout = tf.keras.layers.Dropout(dropout_rate)
    self.dec_layers = [
        DecoderLayer(d_model=d_model, num_heads=num_heads,
                     dff=dff, dropout_rate=dropout_rate)
        for _ in range(num_layers)]

    self.last_attn_scores = None

  def call(self, x, context):
    # `x` is token-IDs shape (batch, target_seq_len)
    x = self.pos_embedding(x)  # (batch_size, target_seq_len, d_model)

    x = self.dropout(x)

    for i in range(self.num_layers):
      x  = self.dec_layers[i](x, context)

    self.last_attn_scores = self.dec_layers[-1].last_attn_scores

    # The shape of x is (batch_size, target_seq_len, d_model).
    return x

In [None]:
# Instantiate the decoder.
sample_decoder = Decoder(num_layers=4,
                         d_model=512,
                         num_heads=8,
                         dff=2048,
                         vocab_size=8000)

output = sample_decoder(
    x=SIMP,
    context=DEF_emb)

# Print the shapes.
print(SIMP.shape)
print(DEF_emb.shape)
print(output.shape)

(64, 104)
(64, 85, 512)
(64, 104, 512)


Full Transformer with all layers combined

In [None]:
class Transformer(tf.keras.Model):
  def __init__(self, *, num_layers, d_model, num_heads, dff,
               input_vocab_size, target_vocab_size, dropout_rate=0.1):
    super().__init__()
    self.encoder = Encoder(num_layers=num_layers, d_model=d_model,
                           num_heads=num_heads, dff=dff,
                           vocab_size=input_vocab_size,
                           dropout_rate=dropout_rate)

    self.decoder = Decoder(num_layers=num_layers, d_model=d_model,
                           num_heads=num_heads, dff=dff,
                           vocab_size=target_vocab_size,
                           dropout_rate=dropout_rate)

    self.final_layer = tf.keras.layers.Dense(target_vocab_size)

  def call(self, inputs):
    # To use a Keras model with `.fit` you must pass all your inputs in the
    # first argument.
    context, x  = inputs

    context = self.encoder(context)  # (batch_size, context_len, d_model)

    x = self.decoder(x, context)  # (batch_size, target_len, d_model)

    # Final linear layer output.
    logits = self.final_layer(x)  # (batch_size, target_len, target_vocab_size)

    try:
      # Drop the keras mask, so it doesn't scale the losses/metrics.
      # b/250038731
      del logits._keras_mask
    except AttributeError:
      pass

    # Return the final output and the attention weights.
    return logits

Setting up the model

In [None]:
num_layers = 4
d_model = 128
dff = 512
num_heads = 8
dropout_rate = 0.1

In [None]:
inputVocabSize = tokenizers.en.get_vocab_size().numpy()
outputVocabSize = inputVocabSize

transformer = Transformer(
    num_layers=num_layers,
    d_model=d_model,
    num_heads=num_heads,
    dff=dff,
    input_vocab_size=inputVocabSize, # this has been changed
    target_vocab_size=outputVocabSize, # this has been changed
    dropout_rate=dropout_rate)

In [None]:
#testing the model
output = transformer((DEF, SIMP))

print(SIMP.shape)
print(DEF.shape)
print(output.shape)

(64, 104)
(64, 85)
(64, 104, 4065)


Creating the optimizer and find the bets learning rate

In [None]:
class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):
  def __init__(self, d_model, warmup_steps=4000):
    super().__init__()

    self.d_model = d_model
    self.d_model = tf.cast(self.d_model, tf.float32)

    self.warmup_steps = warmup_steps

  def __call__(self, step):
    step = tf.cast(step, dtype=tf.float32)
    arg1 = tf.math.rsqrt(step)
    arg2 = step * (self.warmup_steps ** -1.5)

    return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)

learning_rate = CustomSchedule(d_model)

optimizer = tf.keras.optimizers.Adam(learning_rate, beta_1=0.9, beta_2=0.98,
                                     epsilon=1e-9)

Creating the masked loss (removes any weights that the padding may have)

In [None]:
def masked_loss(label, pred):
  mask = label != 0
  loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True, reduction='none')
  loss = loss_object(label, pred)

  mask = tf.cast(mask, dtype=loss.dtype)
  loss *= mask

  loss = tf.reduce_sum(loss)/tf.reduce_sum(mask)
  return loss


def masked_accuracy(label, pred):
  pred = tf.argmax(pred, axis=2)
  label = tf.cast(label, pred.dtype)
  match = label == pred

  mask = label != 0

  match = match & mask

  match = tf.cast(match, dtype=tf.float32)
  mask = tf.cast(mask, dtype=tf.float32)
  return tf.reduce_sum(match)/tf.reduce_sum(mask)

Compile the model

In [None]:
transformer.compile(
    loss=masked_loss,
    optimizer=optimizer,
    metrics=[masked_accuracy])

transformer.summary()

Model: "transformer"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 encoder_1 (Encoder)         multiple                  3159168   
                                                                 
 decoder_1 (Decoder)         multiple                  5270144   
                                                                 
 dense_38 (Dense)            multiple                  524385    
                                                                 
Total params: 8,953,697
Trainable params: 8,953,697
Non-trainable params: 0
_________________________________________________________________


Fit the model

In [None]:
transformer.fit(batches,
                epochs=5,
                validation_data=batches)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<keras.callbacks.History at 0x7f4ee38f0f70>

Predicting on New Text

In [None]:
class Translator(tf.Module):
  def __init__(self, tokenizers, transformer):
    self.tokenizers = tokenizers
    self.transformer = transformer

  def __call__(self, sentence, max_length=MAX_TOKENS):
    # The input sentence is Portuguese, hence adding the `[START]` and `[END]` tokens.
    assert isinstance(sentence, tf.Tensor)
    if len(sentence.shape) == 0:
      sentence = sentence[tf.newaxis]

    sentence = self.tokenizers.en.tokenize(sentence).to_tensor()

    encoder_input = sentence

    # As the output language is English, initialize the output with the
    # English `[START]` token.
    start_end = self.tokenizers.en.tokenize([''])[0]
    start = start_end[0][tf.newaxis]
    end = start_end[1][tf.newaxis]

    # `tf.TensorArray` is required here (instead of a Python list), so that the
    # dynamic-loop can be traced by `tf.function`.
    output_array = tf.TensorArray(dtype=tf.int64, size=0, dynamic_size=True)
    output_array = output_array.write(0, start)

    for i in tf.range(max_length):
      output = tf.transpose(output_array.stack())
      predictions = self.transformer([encoder_input, output], training=False)

      # Select the last token from the `seq_len` dimension.
      predictions = predictions[:, -1:, :]  # Shape `(batch_size, 1, vocab_size)`.

      predicted_id = tf.argmax(predictions, axis=-1)

      # Concatenate the `predicted_id` to the output which is given to the
      # decoder as its input.
      output_array = output_array.write(i+1, predicted_id[0])

      if predicted_id == end:
        break

    output = tf.transpose(output_array.stack())
    # The output shape is `(1, tokens)`.
    text = tokenizers.en.detokenize(output)[0]  # Shape: `()`.

    tokens = tokenizers.en.lookup(output)[0]

    # `tf.function` prevents us from using the attention_weights that were
    # calculated on the last iteration of the loop.
    # So, recalculate them outside the loop.
    self.transformer([encoder_input, output[:,:-1]], training=False)
    attention_weights = self.transformer.decoder.last_attn_scores

    return text, tokens, attention_weights

In [None]:
translator = Translator(tokenizers, transformer)

Exporting

In [None]:
class ExportTranslator(tf.Module):
  def __init__(self, translator):
    self.translator = translator

  @tf.function(input_signature=[tf.TensorSpec(shape=[], dtype=tf.string)])
  def __call__(self, sentence):
    (result,
     tokens,
     attention_weights) = self.translator(sentence, max_length=MAX_TOKENS)

    return result

In [None]:
translator = ExportTranslator(translator)

Random Predictions (Hopefully Works)

In [None]:
translator('made, done, happening, or chosen without method or conscious decision.').numpy()

b'a person who is used to be used to be used to be not make a person or other .'