# Polish to English neural machine translation
The seq2seq model architecture was written by google developers and is licensed under the Apache 2.0 License. I refitted the code to handle polish-to-english translation for long samples of text looping the original seq2seq model. It takes polish sentences with polish charecters as input and returns properly formatted english sentences. The training data was sourced from the Anki database containing sentence pairs from the Tatoeba Project. I have also implemented a mechanism to correct the model. Method `Translation.teach_model()` allows users to assess the models translation and label it as correct or incorrect and then provide the correct translation. The newly created context -> target pair then gets added to the models


* http://www.manythings.org/anki/
* https://www.tensorflow.org/text/tutorials/nmt_with_attention

## Import necessary packages

In [1]:
!pip install "tensorflow-text"
!pip install einops

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting tensorflow-text
  Downloading tensorflow_text-2.12.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (6.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.0/6.0 MB[0m [31m71.0 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: tensorflow-text
Successfully installed tensorflow-text-2.12.1
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting einops
  Downloading einops-0.6.1-py3-none-any.whl (42 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m42.2/42.2 kB[0m [31m4.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: einops
Successfully installed einops-0.6.1


In [2]:
from typing import Any
from typing import Tuple
from typing import List
from pathlib import Path
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import tensorflow_text as tf_text
import tensorflow as tf
import numpy as np
import typing
import einops
import re
import pickle
import os
import datetime

## Check colab GPU status

In [3]:
# Check if GPU is available
if tf.config.list_physical_devices('GPU'):
    # Get the number of available GPUs
    num_gpus = len(tf.config.list_physical_devices('GPU'))
    print(f"Number of available GPUs: {num_gpus}")

    # Get the name of the current GPU device
    current_gpu_name = tf.config.list_physical_devices('GPU')[0].name
    print(f"Current GPU device: {current_gpu_name}")
else:
    print("No GPU available.")

Number of available GPUs: 1
Current GPU device: /physical_device:GPU:0


## ShapeChecker class

It helps ensure the compatibility of tensor dimensions with named axes, allowing you to catch shape mismatches or inconsistencies during tensor operations.

In [4]:
class ShapeChecker():
  def __init__(self):
    # Keep a cache of every axis-name seen
    self.shapes = {}

  def __call__(self, tensor, names, broadcast=False):
    if not tf.executing_eagerly():
      return

    parsed = einops.parse_shape(tensor, names)

    for name, new_dim in parsed.items():
      old_dim = self.shapes.get(name, None)
      
      if (broadcast and new_dim == 1):
        continue

      if old_dim is None:
        # If the axis name is new, add its length to the cache.
        self.shapes[name] = new_dim
        continue

      if new_dim != old_dim:
        raise ValueError(f"Shape mismatch for dimension: '{name}'\n"
                         f"    found: {new_dim}\n"
                         f"    expected: {old_dim}\n")

## Data processing functions

In [5]:
def load_data(path):
  path = Path(path)  # Create a Path object from the string path
  text = path.read_text(encoding='utf-8')

  lines = text.splitlines()
  triplets = [line.split('\t') for line in lines] # context / target / source (source is not important)

  context = []
  target = []

  for triplet in triplets:
    target.append(triplet[0].strip())  # Extract the target and remove leading/trailing whitespaces
    context.append(triplet[1].strip())  # Extract the context and remove leading/trailing whitespaces

  context = np.array(context)
  target = np.array(target)

  return target, context

In [6]:
def tf_lower_and_split_punct(text):
  # Replace Polish letters with Latin letters.
  text = tf.strings.regex_replace(text, '[łŁ]', 'l')
  text = tf.strings.regex_replace(text, '[ąĄ]', 'a')
  text = tf.strings.regex_replace(text, '[ćĆ]', 'c')
  text = tf.strings.regex_replace(text, '[ęĘ]', 'e')
  text = tf.strings.regex_replace(text, '[ńŃ]', 'n')
  text = tf.strings.regex_replace(text, '[óÓ]', 'o') # perhaps it would be wiser to change óÓ to u
  text = tf.strings.regex_replace(text, '[śŚ]', 's')
  text = tf.strings.regex_replace(text, '[źŹ]', 'z')
  text = tf.strings.regex_replace(text, '[żŻ]', 'z')
  
  # Split accented characters.
  text = tf_text.normalize_utf8(text, 'NFKD')
  text = tf.strings.lower(text)
  # Keep space, a to z, and select punctuation.
  text = tf.strings.regex_replace(text, '[^ a-z.?!,¿]', '')
  # Add spaces around punctuation.
  text = tf.strings.regex_replace(text, '[.?!,¿]', r' \0 ')
  # Strip whitespace.
  text = tf.strings.strip(text)

  text = tf.strings.join(['[START]', text, '[END]'], separator=' ')
  return text

In [7]:
def process_text(context, target):
  context = context_text_processor(context).to_tensor()
  target = target_text_processor(target)
  targ_in = target[:,:-1].to_tensor()
  targ_out = target[:,1:].to_tensor()
  return (context, targ_in), targ_out

## Neural machine translation model architecture

### Encoder

In [8]:
class Encoder(tf.keras.layers.Layer):
  def __init__(self, text_processor, units):
    super(Encoder, self).__init__()
    self.text_processor = text_processor
    self.vocab_size = text_processor.vocabulary_size()
    self.units = units
    
    # The embedding layer converts tokens to vectors
    self.embedding = tf.keras.layers.Embedding(self.vocab_size, units,
                                               mask_zero=True)

    # The RNN layer processes those vectors sequentially.
    self.rnn = tf.keras.layers.Bidirectional(
        merge_mode='sum',
        layer=tf.keras.layers.GRU(units,
                            # Return the sequence and state
                            return_sequences=True,
                            recurrent_initializer='glorot_uniform'))

  def call(self, x):
    shape_checker = ShapeChecker()
    shape_checker(x, 'batch s')

    # The embedding layer looks up the embedding vector for each token.
    x = self.embedding(x)
    shape_checker(x, 'batch s units')

    # The GRU processes the sequence of embeddings.
    x = self.rnn(x)
    shape_checker(x, 'batch s units')

    # Returns the new sequence of embeddings.
    return x

  def convert_input(self, texts):
    texts = tf.convert_to_tensor(texts)
    if len(texts.shape) == 0:
      texts = tf.convert_to_tensor(texts)[tf.newaxis]
    context = self.text_processor(texts).to_tensor()
    context = self(context)
    return context

### Cross attention

In [9]:
class CrossAttention(tf.keras.layers.Layer):
  def __init__(self, units, **kwargs):
    super().__init__()
    self.mha = tf.keras.layers.MultiHeadAttention(key_dim=units, num_heads=1, **kwargs)
    self.layernorm = tf.keras.layers.LayerNormalization()
    self.add = tf.keras.layers.Add()

  def call(self, x, context):
    shape_checker = ShapeChecker()
 
    shape_checker(x, 'batch t units')
    shape_checker(context, 'batch s units')

    attn_output, attn_scores = self.mha(
        query=x,
        value=context,
        return_attention_scores=True)
    
    shape_checker(x, 'batch t units')
    shape_checker(attn_scores, 'batch heads t s')
    
    # Cache the attention scores for plotting later.
    attn_scores = tf.reduce_mean(attn_scores, axis=1)
    shape_checker(attn_scores, 'batch t s')
    self.last_attention_weights = attn_scores

    x = self.add([x, attn_output])
    x = self.layernorm(x)

    return x

### Decoder

In [10]:
class Decoder(tf.keras.layers.Layer):
  @classmethod
  def add_method(cls, fun):
    setattr(cls, fun.__name__, fun)
    return fun

  def __init__(self, text_processor, units):
    super(Decoder, self).__init__()
    self.text_processor = text_processor
    self.vocab_size = text_processor.vocabulary_size()
    self.word_to_id = tf.keras.layers.StringLookup(
        vocabulary=text_processor.get_vocabulary(),
        mask_token='', oov_token='[UNK]')
    self.id_to_word = tf.keras.layers.StringLookup(
        vocabulary=text_processor.get_vocabulary(),
        mask_token='', oov_token='[UNK]',
        invert=True)
    self.start_token = self.word_to_id('[START]')
    self.end_token = self.word_to_id('[END]')

    self.units = units


    # 1. The embedding layer converts token IDs to vectors
    self.embedding = tf.keras.layers.Embedding(self.vocab_size,
                                               units, mask_zero=True)

    # The RNN keeps track of what's been generated so far.
    self.rnn = tf.keras.layers.GRU(units,
                                   return_sequences=True,
                                   return_state=True,
                                   recurrent_initializer='glorot_uniform')

    # The RNN output will be the query for the attention layer.
    self.attention = CrossAttention(units)

    # This fully connected layer produces the logits for each
    # output token.
    self.output_layer = tf.keras.layers.Dense(self.vocab_size)

In [11]:
@Decoder.add_method
def call(self,
         context, x,
         state=None,
         return_state=False):  
  shape_checker = ShapeChecker()
  shape_checker(x, 'batch t')
  shape_checker(context, 'batch s units')

  # Lookup the embeddings
  x = self.embedding(x)
  shape_checker(x, 'batch t units')

  # Process the target sequence.
  x, state = self.rnn(x, initial_state=state)
  shape_checker(x, 'batch t units')

  # Use the RNN output as the query for the attention over the context.
  x = self.attention(x, context)
  self.last_attention_weights = self.attention.last_attention_weights
  shape_checker(x, 'batch t units')
  shape_checker(self.last_attention_weights, 'batch t s')

  # Generate logit predictions for the next token.
  logits = self.output_layer(x)
  shape_checker(logits, 'batch t target_vocab_size')

  if return_state:
    return logits, state
  else:
    return logits

In [12]:
@Decoder.add_method
def get_initial_state(self, context):
  batch_size = tf.shape(context)[0]
  start_tokens = tf.fill([batch_size, 1], self.start_token)
  done = tf.zeros([batch_size, 1], dtype=tf.bool)
  embedded = self.embedding(start_tokens)
  return start_tokens, done, self.rnn.get_initial_state(embedded)[0]

In [13]:
@Decoder.add_method
def tokens_to_text(self, tokens):
  words = self.id_to_word(tokens)
  result = tf.strings.reduce_join(words, axis=-1, separator=' ')
  result = tf.strings.regex_replace(result, '^ *\[START\] *', '')
  result = tf.strings.regex_replace(result, ' *\[END\] *$', '')
  return result

In [14]:
@Decoder.add_method
def get_next_token(self, context, next_token, done, state, temperature = 0.0):
  logits, state = self(
    context, next_token,
    state = state,
    return_state=True) 
  
  if temperature == 0.0:
    next_token = tf.argmax(logits, axis=-1)
  else:
    logits = logits[:, -1, :]/temperature
    next_token = tf.random.categorical(logits, num_samples=1)

  # If a sequence produces an `end_token`, set it `done`
  done = done | (next_token == self.end_token)
  # Once a sequence is done it only produces 0-padding.
  next_token = tf.where(done, tf.constant(0, dtype=tf.int64), next_token)
  
  return next_token, done, state

### Translator model

In [15]:
class Translator(tf.keras.Model):
  @classmethod
  def add_method(cls, fun):
    setattr(cls, fun.__name__, fun)
    return fun

  def __init__(self, units,
               context_text_processor,
               target_text_processor):
    super().__init__()
    # Build the encoder and decoder
    encoder = Encoder(context_text_processor, units)
    decoder = Decoder(target_text_processor, units)

    self.encoder = encoder
    self.decoder = decoder

  def call(self, inputs):
    context, x = inputs
    context = self.encoder(context)
    logits = self.decoder(context, x)

    #TODO(b/250038731): remove this
    try:
      # Delete the keras mask, so keras doesn't scale the loss+accuracy. 
      del logits._keras_mask
    except AttributeError:
      pass

    return logits

In [16]:
@Translator.add_method
def translate(self,
              texts,
              *,
              max_length=500,
              temperature=tf.constant(0.0)):
  shape_checker = ShapeChecker()
  context = self.encoder.convert_input(texts)
  batch_size = tf.shape(context)[0]
  shape_checker(context, 'batch s units')

  next_token, done, state = self.decoder.get_initial_state(context)

  # initialize the accumulator
  tokens = tf.TensorArray(tf.int64, size=1, dynamic_size=True)

  for t in tf.range(max_length):
    # Generate the next token
    next_token, done, state = self.decoder.get_next_token(
        context, next_token, done, state, temperature)
    shape_checker(next_token, 'batch t1')

    # Collect the generated tokens
    tokens = tokens.write(t, next_token)

    # if all the sequences are done, break
    if tf.reduce_all(done):
      break

  # Convert the list of generated token ids to a list of strings.
  tokens = tokens.stack()
  shape_checker(tokens, 't batch t1')
  tokens = einops.rearrange(tokens, 't batch 1 -> batch t')
  shape_checker(tokens, 'batch t')

  text = self.decoder.tokens_to_text(tokens)
  shape_checker(text, 'batch')

  return text

### Model training metrics

In [17]:
def masked_loss(y_true, y_pred):
  loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True,
                                                          reduction="none")
  loss = loss_fn(y_true, y_pred)

  mask = tf.cast(y_true != 0, loss.dtype)
  loss = tf.multiply(loss, mask)

  return tf.reduce_sum(loss) / tf.reduce_sum(mask)

In [18]:
def masked_acc(y_true, y_pred):
  y_pred = tf.argmax(y_pred, axis=-1)
  y_pred = tf.cast(y_pred, y_true.dtype)

  match_ = tf.cast(y_true == y_pred, tf.float32)
  mask = tf.cast(y_true != 0, tf.float32)

  return tf.reduce_sum(match_)/tf.reduce_sum(mask)

### Translation class
Probably could have just been a child class / extension of Translator class

In [19]:
class Translation():
  def __init__(self, model: Translator=None, model_file_path: str=None, model_folder_path: str=None,
               context: List[str]=None, target: List[str]=None):
    print("Initializing translation model...")
    self.model = model
    self.model_folder_path = model_folder_path
    self.context = context 
    self.target = target

  def translate_sentence(self, text: str) -> str:
    # Check if input is of type string
    if not isinstance(text, str):
      raise ValueError("Input must be a string.")
    # Check if multiple sentences are present
    if re.search(r'\.\s|!\s|\?\s', text):
      raise ValueError("Multiple sentences detected. \
                          Please provide a single sentence.")

    result = self.model.translate([text])
    result = result[0].numpy().decode()
    result = result.capitalize()
    result = re.sub(r'\s+([.!?])', r'\1', result)

    return result

  def split_sentences(self, text: str) -> list:
    # Define the regex pattern to match sentence boundaries
    pattern = r'(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?)\s'

    # Split the text into sentences using the regex pattern
    sentences = re.split(pattern, text)

    return sentences

  def translate_text(self, text: str) -> str:
    sentences = self.split_sentences(text)

    result = ""
    
    for sentence in sentences:
      result +=  self.translate_sentence(sentence)
    
    return result

  def teach_model(self):
    while True:
      input_text = input("Enter input text: ")
      translated_text = self.translate_text(input_text)
      print(f"Translated text: {translated_text}")

      correct = input("Is the translation correct? (yes/no): ")
      if correct.lower() == "yes":
        self.context = np.append(self.context, input_text)
        self.target = np.append(self.target, translated_text)
        print("Context and target added.")
      elif correct.lower() == "no":
        correct_translation = input("Enter the correct translation: ")
        self.context = np.append(self.context, input_text)
        self.target = np.append(self.target, correct_translation)
        print("Correct translation added.")
      else:
        print("Invalid input. Please enter 'yes' or 'no'.")

      continue_teaching = input("Do you want to teach the model more? (yes/no): ")
      if continue_teaching.lower() != "yes":
        break

  def load_model(self):
    file_path = os.path.join(self.model_file_path, "model.pkl")
    if os.path.exists(file_path):
      with open(file_path, 'rb') as file:
        self.model = pickle.load(file)
      print("Model loaded successfully.")
      return model
    else:
      raise FileNotFoundError(f"Model file '{file_path}' not found.")

  def save_model(self):
    current_datetime = datetime.datetime.now()

    filename = f"model_{current_datetime.year}_{current_datetime.month}_" \
               f"{current_datetime.day}_{current_datetime.hour}_" \
               f"{current_datetime.minute}_{current_datetime.second}.pkl"
    file_path = os.path.join(self.model_folder_path, filename)

    with open(file_path, 'wb') as file:
      pickle.dump(self.model, file)
    print(f"Model saved successfully at {file_path}.")

  def train_model(self):
    is_train = np.random.uniform(size=(len(self.target),)) < 0.8

    train_raw = (tf.data.Dataset
                .from_tensor_slices((self.context[is_train], self.target[is_train]))
                .shuffle(BUFFER_SIZE)
                .batch(BATCH_SIZE))
    
    val_raw = (tf.data.Dataset
              .from_tensor_slices((self.context[~is_train], self.target[~is_train]))
              .shuffle(BUFFER_SIZE)
              .batch(BATCH_SIZE))

    train_ds = train_raw.map(process_text, tf.data.AUTOTUNE)
    val_ds = val_raw.map(process_text, tf.data.AUTOTUNE)

    self.model = Translator(UNITS,
                      context_text_processor,
                      target_text_processor)
    
    self.model.compile(optimizer="adam",
                  loss=masked_loss,
                  metrics=[masked_acc, masked_loss])

    self.model.fit(train_ds.repeat(),
          epochs=100,
          steps_per_epoch=100,
          validation_data=val_ds,
          validation_steps=20,
          callbacks=[tf.keras.callbacks.EarlyStopping(patience=3)])
    
    return self.model

  def save_data(self, file_path: str):
    data = "\n".join([f"{context} {target}" for context, target in zip(self.context, self.target)])

    with open(file_path, 'w') as file:
      file.write(data)

    print(f"Data saved successfully to {file_path}.")

## Main build

### Data processing

In [20]:
target_raw, context_raw = load_data("/content/pol.txt")

In [21]:
BUFFER_SIZE = len(context_raw)
BATCH_SIZE = 64

is_train = np.random.uniform(size=(len(target_raw),)) < 0.8

train_raw = (
    tf.data.Dataset
    .from_tensor_slices((context_raw[is_train], target_raw[is_train]))
    .shuffle(BUFFER_SIZE)
    .batch(BATCH_SIZE))
val_raw = (
    tf.data.Dataset
    .from_tensor_slices((context_raw[~is_train], target_raw[~is_train]))
    .shuffle(BUFFER_SIZE)
    .batch(BATCH_SIZE))

In [22]:
max_vocab_size = 5000

context_text_processor = tf.keras.layers.TextVectorization(standardize=tf_lower_and_split_punct,
                                                           max_tokens=max_vocab_size,
                                                           ragged=True)

target_text_processor = tf.keras.layers.TextVectorization(standardize=tf_lower_and_split_punct,
                                                          max_tokens=max_vocab_size,
                                                          ragged=True)

In [23]:
context_text_processor.adapt(train_raw.map(lambda context, target: context))
target_text_processor.adapt(train_raw.map(lambda context, target: target))

In [24]:
train_ds = train_raw.map(process_text, tf.data.AUTOTUNE)
val_ds = val_raw.map(process_text, tf.data.AUTOTUNE)

### Model setup

In [25]:
UNITS = 256

In [26]:
model = Translator(UNITS,
                   context_text_processor,
                   target_text_processor)

In [27]:
model.compile(optimizer="adam",
              loss=masked_loss,
              metrics=[masked_acc, masked_loss])

### Training

In [28]:
model.fit(train_ds.repeat(),
          epochs=100,
          steps_per_epoch=100,
          validation_data=val_ds,
          validation_steps=20,
          callbacks=[tf.keras.callbacks.EarlyStopping(patience=3)])

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100


<keras.callbacks.History at 0x7f99212884c0>

## Testing

In [29]:
translation = Translation(model, context_raw, target_raw)

Initializing translation model...


In [37]:
translated_text = translation.translate_text("Kocham spacerować boso po trawie. Lubię patrzeć na piękne kwiaty. \
                            Słucham muzyki klasycznej. Cieszę się, że mam przyjaciół. \
                            Uwielbiam jeść frytki.")
print(translated_text)

I love walking barefoot on the grass. I like to look at the flowers. Im listening to music. Im glad i have friends. I love to eat strawberries. 


frytki != strawberries

In [31]:
translation.teach_model()

Enter input text: Idę do kina.
Translated text: Im going to the movies. 
Is the translation correct? (yes/no): yes
Context and target added.
Do you want to teach the model more? (yes/no): yes
Enter input text: Lubię podróżować koleją.
Translated text: I like to travel. 
Is the translation correct? (yes/no): no
Enter the correct translation: I like to travel by rail.
Correct translation added.
Do you want to teach the model more? (yes/no): yes
Enter input text: Czy jesteś dobrą osobą?
Translated text: Are you a good person? 
Is the translation correct? (yes/no): yes
Context and target added.
Do you want to teach the model more? (yes/no): yes
Enter input text: Czy wyjdziesz za mnie?
Translated text: Are you angry with me? 
Is the translation correct? (yes/no): no
Enter the correct translation: Will you mary me?
Correct translation added.
Do you want to teach the model more? (yes/no): yes
Enter input text: Jestem bardzo niezadowolony dzisiaj.
Translated text: Im very dissatisfied today. 
