# Transformer translation

This implementation of neural network transformer translation from polish to english was bulit using the transformer architecture designed by google Tensorflow engineers (source below). I created the tokenizing and data processing functionality making it possible to input textual data in polsih and receive a properly parsed output translation in english. It does not handle proper names and has a limited vocabulary since the trainging data was not vast (40 000 columns of context-target sentence pairs). It will fail to translate longer sentences, so the approach to translating longer text it to split it into sentences, translate and correct punctuation.

* Dataset: http://www.manythings.org/anki/
* Model Architecture: https://www.tensorflow.org/text/tutorials/transformer

## Standard imports

In [None]:
!pip install "tensorflow-text"

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting tensorflow-text
  Downloading tensorflow_text-2.12.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (6.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.0/6.0 MB[0m [31m21.4 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: tensorflow-text
Successfully installed tensorflow-text-2.12.1


In [None]:
import warnings
warnings.filterwarnings("ignore")
import tensorflow as tf
import tensorflow_text as tf_text
import numpy as np
from pathlib import Path
import re

## Data processing functions and tokenizers

In [None]:
def load_data(path):
  path = Path(path)  # Create a Path object from the string path
  text = path.read_text(encoding='utf-8')

  lines = text.splitlines()
  triplets = [line.split('\t') for line in lines] # context / target / source (source is not important)

  context = []
  target = []

  for triplet in triplets:
    target.append(triplet[0].strip())  # Extract the target and remove leading/trailing whitespaces
    context.append(triplet[1].strip())  # Extract the context and remove leading/trailing whitespaces

  context = np.array(context)
  target = np.array(target)

  return target, context

In [None]:
def tf_lower_and_split_punct(text):
  # Replace Polish letters with Latin letters.
  text = tf.strings.regex_replace(text, '[łŁ]', 'l')
  text = tf.strings.regex_replace(text, '[ąĄ]', 'a')
  text = tf.strings.regex_replace(text, '[ćĆ]', 'c')
  text = tf.strings.regex_replace(text, '[ęĘ]', 'e')
  text = tf.strings.regex_replace(text, '[ńŃ]', 'n')
  text = tf.strings.regex_replace(text, '[óÓ]', 'o') # perhaps it would be wiser to change óÓ to u
  text = tf.strings.regex_replace(text, '[śŚ]', 's')
  text = tf.strings.regex_replace(text, '[źŹ]', 'z')
  text = tf.strings.regex_replace(text, '[żŻ]', 'z')
  
  # Split accented characters.
  text = tf_text.normalize_utf8(text, 'NFKD')
  text = tf.strings.lower(text)
  # Keep space, a to z, and select punctuation.
  text = tf.strings.regex_replace(text, '[^ a-z.?!,¿]', '')
  # Add spaces around punctuation.
  text = tf.strings.regex_replace(text, '[.?!,¿]', r' \0 ')
  # Strip whitespace.
  text = tf.strings.strip(text)

  text = tf.strings.join(['[START]', text, '[END]'], separator=' ')
  return text

In [None]:
def process_text(context, target):
  context = context_text_processor(context).to_tensor()
  target = target_text_processor(target)
  targ_in = target[:,:-1].to_tensor()
  targ_out = target[:,1:].to_tensor()
  return (context, targ_in), targ_out

In [None]:
target_raw, context_raw = load_data("/content/pol.txt")

In [None]:
BUFFER_SIZE = len(context_raw)
BATCH_SIZE = 64

is_train = np.random.uniform(size=(len(target_raw),)) < 0.8

train_raw = (
    tf.data.Dataset
    .from_tensor_slices((context_raw[is_train], target_raw[is_train]))
    .shuffle(BUFFER_SIZE)
    .batch(BATCH_SIZE))
val_raw = (
    tf.data.Dataset
    .from_tensor_slices((context_raw[~is_train], target_raw[~is_train]))
    .shuffle(BUFFER_SIZE)
    .batch(BATCH_SIZE))

In [None]:
max_vocab_size = 5000

context_text_processor = tf.keras.layers.TextVectorization(standardize=tf_lower_and_split_punct,
                                                           max_tokens=max_vocab_size,
                                                           ragged=True)

target_text_processor = tf.keras.layers.TextVectorization(standardize=tf_lower_and_split_punct,
                                                          max_tokens=max_vocab_size,
                                                          ragged=True)

In [None]:
context_text_processor.adapt(train_raw.map(lambda context, target: context))
target_text_processor.adapt(train_raw.map(lambda context, target: target))

In [None]:
train_ds = train_raw.map(process_text, tf.data.AUTOTUNE)
val_ds = val_raw.map(process_text, tf.data.AUTOTUNE)

In [None]:
def split_sentences(text: str) -> list:
    # Define the regex pattern to match sentence boundaries
    pattern = r'(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?)\s'

    # Split the text into sentences using the regex pattern
    sentences = re.split(pattern, text)

    return sentences

def sentence_correction(text: str) -> str:
    text = text.capitalize()
    text = re.sub(r'\s+([.!?])', r'\1', text)
    
    return text

## Positional embedding

In [None]:
def positional_encoding(length: int, depth: int) -> tf.Tensor:
    depth = depth / 2

    positions = np.arange(length)[:, np.newaxis]
    depths = np.arange(depth)[np.newaxis, :] / depth

    angle_rates = np.divide(1, np.power(10000, depths))
    angle_rads = np.matmul(positions, angle_rates)

    pos_encoding = np.concatenate(
        [np.sin(angle_rads), np.cos(angle_rads)],
        axis = -1
    )

    return tf.cast(pos_encoding, dtype=tf.float32)

In [None]:
class PositionalEmbedding(tf.keras.layers.Layer):
  def __init__(self, vocab_size, d_model):
    super().__init__()
    self.d_model = d_model
    self.embedding = tf.keras.layers.Embedding(
        vocab_size,
        d_model,
        mask_zero=True
    )
    self.pos_encoding = positional_encoding(
        length=2048,
        depth=d_model
    )

  def call(self, x):
    length = tf.shape(x)[1]
    x = self.embedding(x)
    x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
    x = x + self.pos_encoding[tf.newaxis, :length, :]
    return x

  def compute_mask(self, *args, **kwargs):
    return self.embedding.compute_mask(*args, **kwargs)

## Attention layers

In [None]:
class BaseAttention(tf.keras.layers.Layer):
  def __init__(self, **kwargs):
    super().__init__()
    
    self.mha = tf.keras.layers.MultiHeadAttention(**kwargs)
    self.layernorm = tf.keras.layers.LayerNormalization()
    self.add = tf.keras.layers.Add()

  def call(self, x, context):
    return x

In [None]:
class CrossAttention(BaseAttention):
  def call(self, x, context):
    attn_output, attn_scores = self.mha(
        query=x,
        key=context,
        value=context,
        return_attention_scores=True
    )
  
    self.last_attn_scores = attn_scores

    x = self.add([x, attn_output])
    x = self.layernorm(x)

    return x

In [None]:
class GlobalSelfAttention(BaseAttention):
  def call(self, x):
    attn_output = self.mha(
        query=x,
        value=x,
        key=x
    )

    x = self.add([x, attn_output])
    x = self.layernorm(x)

    return x

In [None]:
class CausalSelfAttention(BaseAttention):
  def call(self, x):
    attn_output = self.mha(
        query=x,
        value=x,
        key=x,
        use_causal_mask=True
    )

    x = self.add([x, attn_output])
    x = self.layernorm(x)

    return x

## FFN with ReLU activation function

In [None]:
class FeedForward(tf.keras.layers.Layer):
  def __init__(self, d_model, dff, dropout_rate=0.1):
    super().__init__()
    
    self.seq = tf.keras.Sequential([
        tf.keras.layers.Dense(dff, activation="relu"),
        tf.keras.layers.Dense(d_model),
        tf.keras.layers.Dropout(dropout_rate)
    ])
    
    self.add = tf.keras.layers.Add()
    self.layer_norm = tf.keras.layers.LayerNormalization()

  def call(self, x):
    x = self.add([x, self.seq(x)])
    x = self.layer_norm(x)

    return x

## Encoder build

### Single layer

In [None]:
class EncoderLayer(tf.keras.layers.Layer):
  def __init__(self, *, d_model, num_heads, dff, dropout_rate=0.1):
    super().__init__()

    self.self_attention = GlobalSelfAttention(
        num_heads=num_heads,
        key_dim=d_model,
        dropout=dropout_rate
    )

    self.ffn = FeedForward(
        d_model,
        dff
    )
  
  def call(self, x):
    x = self.self_attention(x)
    x = self.ffn(x)

    return x

### Encoder

In [None]:
class Encoder(tf.keras.layers.Layer):
  def __init__(self, *, num_layers, d_model, num_heads,
               dff, vocab_size, dropout_rate=0.1):
    super().__init__()
    
    self.d_model = d_model
    self.num_layers = num_layers

    self.pos_embedding = PositionalEmbedding(
        vocab_size=vocab_size,
        d_model=d_model
    )

    self.enc_layers = [
        EncoderLayer(
            d_model=d_model,
            num_heads=num_heads,
            dff=dff,
            dropout_rate=dropout_rate
        )

        for _ in range(num_layers)
    ]

    self.dropout = tf.keras.layers.Dropout(dropout_rate)

  def call(self, x):
    x = self.pos_embedding(x)
    x = self.dropout(x)

    for i in range(self.num_layers):
      x = self.enc_layers[i](x)
    
    return x

## Decoder build

### Single layer

In [None]:
class DecoderLayer(tf.keras.layers.Layer):
  def __init__(self, *, d_model, num_heads,
               dff, dropout_rate=0.1):
    super(DecoderLayer, self).__init__()

    self.causal_self_attention = CausalSelfAttention(
        num_heads=num_heads,
        key_dim=d_model,
        dropout=dropout_rate
    )

    self.cross_attention = CrossAttention(
        num_heads=num_heads,
        key_dim=d_model,
        dropout=dropout_rate
    )

    self.ffn = FeedForward(
        d_model,
        dff
    )

  def call(self, x, context):
    x = self.causal_self_attention(x)
    x = self.cross_attention(x, context)

    self.last_attn_scores = self.cross_attention.last_attn_scores

    x = self.ffn(x)

    return x

### Decoder

In [None]:
class Decoder(tf.keras.layers.Layer):
  def __init__(self, *, num_layers, d_model, num_heads,
               dff, vocab_size, dropout_rate=0.1):
    super(Decoder, self).__init__()

    self.d_model = d_model
    self.num_layers = num_layers

    self.pos_embedding = PositionalEmbedding(
        vocab_size=vocab_size,
        d_model=d_model
    )

    self.dropout = tf.keras.layers.Dropout(
        dropout_rate
    )
    
    self.dec_layers = [
        DecoderLayer(
            d_model=d_model,
            num_heads=num_heads,
            dff=dff,
            dropout_rate=dropout_rate
        )

        for _ in range(num_layers)
    ]

    self.last_attn_scores = None

  def call(self, x, context):
    x = self.pos_embedding(x)
    x = self.dropout(x)
    
    for i in range(self.num_layers):
      x = self.dec_layers[i](x, context)

    self.last_attn_scores = self.dec_layers[-1].last_attn_scores

    return x

## Transformer 

In [None]:
class Transformer(tf.keras.Model):
  def __init__(self, *, num_layers, d_model, num_heads, dff,
               input_vocab_size, target_vocab_size, dropout_rate=0.1):
    super().__init__()

    self.encoder = Encoder(
        num_layers=num_layers, d_model=d_model,
        num_heads=num_heads, dff=dff,
        vocab_size=input_vocab_size,
        dropout_rate=dropout_rate
    )

    self.decoder = Decoder(
        num_layers=num_layers, d_model=d_model,
        num_heads=num_heads, dff=dff,
        vocab_size=target_vocab_size,
        dropout_rate=dropout_rate
    )

    self.final_layer = tf.keras.layers.Dense(target_vocab_size)

  def call(self, inputs):
    context, x = inputs

    context = self.encoder(context)

    x = self.decoder(x, context)

    logits = self.final_layer(x)

    try:
      del logits._keras_mask
    except AttributeError:
      pass

    return logits

## Model build

### hyper parameters

In [None]:
num_layers = 4
d_model = 128
dff = 512
num_heads = 8
dropout_rate = 0.1

### transformer initialization

In [None]:
transformer = Transformer(
    num_layers=num_layers,
    d_model=d_model,
    num_heads=num_heads,
    dff=dff,
    input_vocab_size=len(context_text_processor.get_vocabulary()),
    target_vocab_size=len(target_text_processor.get_vocabulary()),
    dropout_rate=dropout_rate
)

### custom learning rate schedule

In [None]:
class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):
  def __init__(self, d_model, warmup_steps=4000):
    super().__init__()

    self.d_model = tf.cast(d_model, tf.float32)
    self.warmup_steps = warmup_steps

  def get_config(self):
    return {'d_model': self.d_model, 'warmup_steps': self.warmup_steps}

  def __call__(self, step):
    step = tf.cast(step, tf.float32)
    arg1 = tf.math.rsqrt(step)
    arg2 = step * (self.warmup_steps ** -1.5)

    return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)

In [None]:
learning_rate = CustomSchedule(d_model)

optimizer = tf.keras.optimizers.Adam(learning_rate, beta_1=0.9, beta_2=0.98,
                                     epsilon=1e-9)

### loss function and accuracy metric definition

In [None]:
def masked_loss(label, pred):
  mask = label != 0
  loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True, reduction='none')
  loss = loss_object(label, pred)

  mask = tf.cast(mask, dtype=loss.dtype)
  loss *= mask

  loss = tf.reduce_sum(loss)/tf.reduce_sum(mask)
  return loss


def masked_accuracy(label, pred):
  pred = tf.argmax(pred, axis=2)
  label = tf.cast(label, pred.dtype)
  match_ = label == pred

  mask = label != 0

  match_ = match_ & mask

  match_ = tf.cast(match_, dtype=tf.float32)
  mask = tf.cast(mask, dtype=tf.float32)
  return tf.reduce_sum(match_)/tf.reduce_sum(mask)

In [None]:
transformer.compile(
    loss=masked_loss,
    optimizer=optimizer,
    metrics=[masked_accuracy])

### training

In [None]:
transformer.fit(train_ds,
                epochs=20,
                validation_data=val_ds)

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


<keras.callbacks.History at 0x7fca7d4964d0>

## Translator 

In [None]:
MAX_TOKENS = 20

class Translator(tf.Module):
    def __init__(self, context_text_processor, target_text_processor, transformer):
        self.context_tokenizer = context_text_processor
        self.target_tokenizer = target_text_processor
        self.transformer = transformer

    def __call__(self, sentence, max_length=MAX_TOKENS):
        assert isinstance(sentence, tf.Tensor)
        if len(sentence.shape) == 0:
            sentence = sentence[tf.newaxis]

        sentence = self.context_tokenizer(sentence).to_tensor()

        encoder_input = sentence

        start_end = self.target_tokenizer([''])[0]
        start = start_end[0][tf.newaxis]
        end = start_end[1][tf.newaxis]

        output_array = tf.TensorArray(dtype=tf.int64, size=0, dynamic_size=True)
        output_array = output_array.write(0, start)

        for i in tf.range(max_length):
            output = tf.transpose(output_array.stack())
            predictions = self.transformer([encoder_input, output], training=False)

            # Select the last token from the `seq_len` dimension.
            predictions = predictions[:, -1:, :]  # Shape `(batch_size, 1, vocab_size)`.

            predicted_id = tf.argmax(predictions, axis=-1)

            # Terminate if the predicted ID is the end token.
            if tf.reduce_all(tf.equal(predicted_id, end)):
                break

            # Write the predicted ID to the output array.
            output_array = output_array.write(i + 1, predicted_id[0])

        # Convert the output array to a tensor and remove the batch dimension.
        output = output_array.stack()
        output = tf.squeeze(output, axis=1)

        # Convert token IDs to text.
        predicted_sentence = [self.target_tokenizer.get_vocabulary()[token_id] for token_id in output.numpy()]

        # Remove the start and end tokens from the predicted sentence.
        predicted_sentence = predicted_sentence[1:-1]

        return predicted_sentence


## Testing

In [None]:
translator = Translator(
    context_text_processor, target_text_processor, transformer
)

In [None]:
def print_translation(sentence, tokens):
    print(f'{"Input:":15s}: {sentence}')
    print(f'{"Prediction":15s}: {" ".join(tokens)}')

In [None]:
sentences = [
    "Cześć, jak się masz?",
    "Gdzie jest najbliższy sklep spożywczy?",
    "Jaki jest twój ulubiony kolor?",
    "Ile masz lat?",
    "Co chciałbyś zjeść na obiad?",
    "Gdzie mieszkasz?",
    "Czy możesz mi pomóc?",
    "Dziękuję Ci za pomoc!",
    "Jak się nazywasz?",
    "Czy lubisz sport?",
    "Czy możesz mi powiedzieć, która godzina?",
    "Którego języka obcego chciałbyś nauczyć się?",
    "Co robisz w wolnym czasie?",
    "Jakie są twoje plany na weekend?",
    "Gdzie można znaleźć dobre miejsce, aby wypić kawę?",
    "Jakie są twoje zainteresowania?",
    "Czy umiesz gotować?",
    "Jakie jest twoje ulubione danie?",
    "Co myślisz o polityce?",
    "Czy masz rodzeństwo?"
]

In [None]:
for sentence in sentences:
  translated_text = translator(tf.constant(sentence))
  print_translation(sentence, translated_text)

Input:         : Cześć, jak się masz?
Prediction     : hi , how are you
Input:         : Gdzie jest najbliższy sklep spożywczy?
Prediction     : wheres the nearest store
Input:         : Jaki jest twój ulubiony kolor?
Prediction     : whats your favorite color
Input:         : Ile masz lat?
Prediction     : how old do you have
Input:         : Co chciałbyś zjeść na obiad?
Prediction     : what would you like to eat for dinner
Input:         : Gdzie mieszkasz?
Prediction     : where do you live
Input:         : Czy możesz mi pomóc?
Prediction     : can you help me
Input:         : Dziękuję Ci za pomoc!
Prediction     : thanks for helping you
Input:         : Jak się nazywasz?
Prediction     : how are you going to show up
Input:         : Czy lubisz sport?
Prediction     : do you like sports
Input:         : Czy możesz mi powiedzieć, która godzina?
Prediction     : can you tell me what time it is
Input:         : Którego języka obcego chciałbyś nauczyć się?
Prediction     : what language