<a href="https://colab.research.google.com/github/aakhterov/ML_algorithms_from_scratch/blob/master/machine_translation_using_bahdanau_attention.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 0. Description

We're going to build an NN model to translate from Russian to English. This notebook is committed to the implementation of the simple encoder-decoder network with LSTM elements.

We will use the following terms:
- source language - the language from which the model translates
- target language - the language to which the model translates
- token = word


Dataset: https://www.kaggle.com/datasets/hijest/englishrussian-dictionary-for-machine-translate/

References:
- https://github.com/Hvass-Labs/TensorFlow-Tutorials/blob/master/21_Machine_Translation.ipynb
- https://www.youtube.com/watch?v=vI2Y3I-JI2Q
- https://medium.com/analytics-vidhya/encoder-decoder-seq2seq-models-clearly-explained-c34186fbf49b



In [76]:
from typing import List
import numpy as np
import pandas as pd
import tensorflow as tf
import seaborn as sns
import matplotlib.pyplot as plt
from tqdm import tqdm
from string import punctuation
from tensorflow.keras.layers import Embedding, LSTM, Dense, Input
from tensorflow.keras.models import Model
from tensorflow.keras.losses import SparseCategoricalCrossentropy
from tensorflow.keras.utils import pad_sequences
from sklearn.model_selection import train_test_split

from google.colab import drive
drive.mount('/content/drive')
base_path = '/content/drive/MyDrive/Colab Notebooks/'

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# 1. Vectorization

In [3]:
UNKNOWN_TOKEN = '[UNK]' # Out of vocabulary token
START_TOKEN = '[START]' # The token that denotes the beginning of the target language phrase
END_TOKEN = '[END]' # The token that denotes the end of the target language phrase

In [26]:
class Vectorization:
  '''
    Vectorization text class.
    Main goals:
     - make a vocabulary
     - convert the list of strings to the list of integer tokens
     - convert the list of integer tokens to the list of strings
  '''

  def __init__(self,
               max_tokens,
               max_length=None,
               unknown_token=UNKNOWN_TOKEN,
               start_token=START_TOKEN,
               end_token=END_TOKEN
               ):
    '''
      :param max_tokens: length of the vocabulary
      :param max_length: max length of the phrases
      :param unknown_token: out of vocabulary token
      :param start_token: token that denotes the beginning of the phrase
      :param end_token: token that denotes the end of the phrase
    '''
    self.max_tokens = max_tokens
    self.max_length = max_length
    self.unknown_token = unknown_token
    self.start_token = start_token
    self.end_token=end_token
    # add to the vocabulary:
    #  (1) padding token (we're going to pad using 0, so padding token index is 0)
    #  (2) out of vocabulary token
    #  (3) start token
    #  (4) end token
    self.vocabulary = ['', self.unknown_token, self.start_token, self.end_token]

  def __preprocessing(self, input: str) -> str:
    '''
      Preprocess of the string (convert to lowcase and remove punctuation).
      ex.: I'm going! -> i m going
      :param input - input string
      :return preprocessed string
    '''
    output = ''.join(map(lambda ch: ch if ch not in punctuation else ' ', input.lower())).strip()
    return output

  def token_to_text(self, tokens: List) -> str:
    '''
      Convert the list of the integer tokens to the string
      :param tokens: list of the integer tokens
      :return string contains words that correspond to the integer tokens
    '''
    words = [self.vocabulary[token] for token in tokens]
    return " ".join(words)

  def fit(self, X: List):
    '''
      Make the vocabulary and calculate the max length of the phrase
      :param X: corpus - list of the strings
      :return the instance of the current class
    '''
    lens = []
    for x in X:
      # Make preprocessing and get the list of the words.
      # Ex. I'm going! -> ['i', 'm', 'going']
      words = self.__preprocessing(x).split()

      # Collect phrases lengths
      lens.append(len(words))

      # Make the vocabulary
      for word in words:
        token = word.strip()
        # Add the word to the vocabulary if it usn't "full"
        if token not in self.vocabulary and self.max_tokens is not None and len(self.vocabulary)<self.max_tokens:
          self.vocabulary.append(token)

    # Calculate the max length of the phrases if it isn't set in the __init__
    # max_length = Average length + two standard devations
    lens = np.array(lens)
    if self.max_length is None:
      self.max_length = int(np.mean(lens) + 2 * np.std(lens))
    return self

  def predict(self,
              X: List[str],
              is_padding=True,
              is_add_start_token=False,
              is_add_end_token=False
              ) -> List[List]:
    '''
      :param X - corpus - list of the strings
      :param is_padding - whether to pad the list of tokens to the max. length with 0s
      :param is_add_start_token - whether to add the start_token to the list of tokens
      :param is_add_end_token - whether to add the end_token to the list of tokens
      :return list of the lists of tokens
    '''
    output = []
    self.max_length += int(is_add_start_token) + int(is_add_end_token)

    for x in X:
      # If nedded add the index of the start_token to the beginning of the list of tokens
      vector = [self.vocabulary.index(self.start_token)] if is_add_start_token else []

      # Make preprocessing and get the list of the words.
      words = self.__preprocessing(x).split()

      # If the current word is in the vocabulary add its index to the list else add the index of the unknown_token
      for word in words:
        token = word.strip()
        vector.append(self.vocabulary.index(token) if token in self.vocabulary else self.vocabulary.index(self.unknown_token))

      # Truncate the vector to the max. length
      vector = vector[:self.max_length-1]

      # If needed add the index of the end_token
      if is_add_end_token:
        vector.append(self.vocabulary.index(self.end_token))

      output.append(vector)

    # If needed pad the vector to the max. length with 0s
    return pad_sequences(output,
                         maxlen=self.max_length,
                         padding='post',
                         truncating='post') if is_padding else output

In [146]:
# Read the first M samples
M = 1_000

input_phrases, output_phrases = [], []
with open('/content/drive/MyDrive/Colab Notebooks/Data/rus.txt') as f:
  for line in f.readlines()[:M]:
    x, y = line.split('CC-BY')[0].strip().split('\t')
    input_phrases.append(x)
    output_phrases.append(y)

In [147]:
input_vocab = 10_000 # size of the source language vocaulary
output_vocab = 10_000 # size of the target language vocaulary

In [148]:
# Make vectorization of the source language phrases
encoder_vec = Vectorization(max_tokens=input_vocab)
encoder_vec.fit(input_phrases)
X_encoder = encoder_vec.predict(input_phrases)

# Make vectorization of the target language phrases
decoder_vec = Vectorization(max_tokens=output_vocab)
decoder_vec.fit(output_phrases)
# For the reason of the sequence model training we need decoder input contains the start_token and
# the decoder output which is without the start_token
X_decoder = decoder_vec.predict(output_phrases, is_add_start_token=True, is_add_end_token=True)
Y_decoder = decoder_vec.predict(output_phrases, is_add_end_token=True)

In [149]:
idx = 1
print(f"Index: {idx}")
print("======= Encoder =======")
print(f"Input phrase: {input_phrases[idx]}")
print(f"Vector: {X_encoder[idx]}")
print(f"Max. length: {encoder_vec.max_length}")
print("======= Decoder =======")
print(f"Input phrase: {output_phrases[idx]}")
print(f"Vector: {X_decoder[idx]}")
print(f"Output phrase: {output_phrases[idx]}")
print(f"Vector: {Y_decoder[idx]}")
print(f"Max. length: {decoder_vec.max_length}")
print("==============")
print(f"Start phrase token index: {decoder_vec.vocabulary.index(START_TOKEN)}")
print(f"End phrase token index: {decoder_vec.vocabulary.index(END_TOKEN)}")

Index: 1
Input phrase: Go.
Vector: [4 0 0]
Max. length: 3
Input phrase: Иди.
Vector: [2 5 3 0 0]
Output phrase: Иди.
Vector: [5 3 0 0 0 0]
Max. length: 6
Start phrase token index: 2
End phrase token index: 3


# 2. Construct encoder-decoder NNs

In [65]:
embedding_dim = 64
lstm_hidden_units = 64

In [66]:
class Encoder(tf.keras.Model):

  def __init__(self, input_vocab, embedding_dim, lstm_hidden_units):
    super(Encoder, self).__init__()
    self.lstm_hidden_units = lstm_hidden_units
    self.emedding = Embedding(input_dim=input_vocab,
                              output_dim=embedding_dim,
                              mask_zero=True,
                              name='encoder_embedding')
    self.lstm = LSTM(units=lstm_hidden_units,
                    #  return_sequences=True,
                     return_state=True,
                     name='encoder_lstm')

  def __call__(self, x):
    out = self.emedding(x)
    _, h, c = self.lstm(out)
    return h, c

In [67]:
class BahdanauAttention(tf.keras.layers.Layer):

  def __init__(self, units: int, name=None):
    super(BahdanauAttention, self).__init__(name=name)
    self.units = units
    self.fc_encoder_states = Dense(units=units, activation='linear')
    self.fc_decoder_states = Dense(units=units, activation='linear')
    self.fc_combined = Dense(units=1, activation='linear')

  def __call__(self, encoder_states, decoder_hidden_state):
    fc_encoder_out = self.fc_encoder_states(encoder_states)
    # print(f"fc_encoder_out.shape={fc_encoder_out.shape}")
    fc_decoder_out = self.fc_decoder_states(decoder_hidden_state)
    # print(f"fc_decoder_out.shape={fc_decoder_out.shape}")
    fc_decoder_out = tf.expand_dims(fc_decoder_out, axis=1)
    # print(f"Expanded fc_decoder_out.shape={fc_decoder_out.shape}")
    alignment_scores = self.fc_combined(tf.math.tanh(fc_encoder_out + fc_decoder_out))
    # print(f"alignment_scores.shape={alignment_scores.shape}")
    softmax_alignment_scores = tf.nn.softmax(alignment_scores)
    context_vector = tf.reduce_sum(softmax_alignment_scores * encoder_states, axis=1)
    # print(f"context_vector.shape={context_vector.shape}")
    return context_vector, softmax_alignment_scores

In [68]:
class Decoder(tf.keras.Model):

  def __init__(self, output_vocab, embedding_dim, lstm_hidden_units):
    super(Decoder, self).__init__()
    self.emedding = Embedding(input_dim=output_vocab,
                              output_dim=embedding_dim,
                              mask_zero=True,
                              name='decoder_embedding')

    self.lstm = LSTM(units=lstm_hidden_units,
                     return_sequences=True,
                     return_state=True,
                     name='decoder_lstm')

    self.attention = BahdanauAttention(units=lstm_hidden_units,
                                       name='decoder_attention')

    # Dense layer with softmax activation function
    self.output_dense = Dense(units=output_vocab,
                              activation='softmax',
                              name='decoder_output')

  def __call__(self, x, decoder_states, encoder_states):
    hidden_state, cell_state = decoder_states
    context_vector, _ = self.attention(encoder_states, hidden_state)
    # print(context_vector.shape)
    out = self.emedding(x)
    # print(out.shape)
    input = tf.concat([tf.expand_dims(context_vector, axis=1), out], axis=-1)
    out, h, c = self.lstm(input, initial_state=decoder_states)
    # print(out.shape)
    # print(h.shape)
    out = self.output_dense(out)
    return out, h, c

In [186]:
class Seq2SeqBahdanauAttention(tf.keras.Model):

  def __init__(self,
               input_vocab,
               output_vocab,
               encoder_embd_dim,
               decoder_embd_dim,
               encoder_lstm_units,
               decoder_lstm_units,
               max_output_length,
               start_token_index,
               end_token_index):

    super(Seq2SeqBahdanauAttention, self).__init__()
    self.encoder = Encoder(input_vocab=input_vocab,
                           embedding_dim=encoder_embd_dim,
                           lstm_hidden_units=encoder_lstm_units)
    self.decoder = Decoder(output_vocab=output_vocab,
                           embedding_dim=decoder_embd_dim,
                           lstm_hidden_units=decoder_lstm_units)
    self.max_output_length = max_output_length
    self.start_token_index = start_token_index
    self.end_token_index = end_token_index

  def __forward(self, X_encoder, X_decoder=None, Y_decoder=None):
    output = []
    batch_size = X_encoder.shape[0]
    # sequence_length = X_encoder.shape[1]
    encoder_states = []
    loss = 0
    for t in range(X_encoder.shape[1]):
      h, c = self.encoder(X_encoder[:, :t+1])
      encoder_states.append(h)
    encoder_states = tf.stack(encoder_states, axis=1)
    hidden_state = encoder_states[:, -1, :]
    cell_state = c

    if X_decoder is not None and Y_decoder is not None:
      decoder_input = X_decoder[:, :1]
      for t in range(X_decoder.shape[1]):
        out, hidden_state, cell_state = self.decoder(x=decoder_input,
                                                     decoder_states=(hidden_state, cell_state),
                                                     encoder_states=encoder_states)
        output.append(out)
        loss += tf.keras.losses.sparse_categorical_crossentropy(Y_decoder[:, t], out)
    else:
        current_step = 0
        token = self.start_token_index
        while current_step<self.max_output_length and token!=self.end_token_index:
          decoder_input = np.array([[token]])
          out, hidden_state, cell_state = self.decoder(x=decoder_input,
                                                      decoder_states=(hidden_state, cell_state),
                                                      encoder_states=encoder_states)
          if Y_decoder is not None:
            loss += tf.keras.losses.sparse_categorical_crossentropy(Y_decoder[:, current_step], out)

          token = np.argmax(out)
          output.append(token)
          current_step += 1

    batch_loss = tf.reduce_sum(loss) / Y_decoder.shape[0] if Y_decoder is not None else None
    return output, loss, batch_loss

  def __train_step(self, X_encoder, X_decoder, Y_decoder):
    optimizer = tf.keras.optimizers.Adam()
    with tf.GradientTape() as tape:
      _, loss, batch_loss = self.__forward(X_encoder, X_decoder, Y_decoder)
      variables = self.encoder.trainable_variables + self.decoder.trainable_variables
      gradients = tape.gradient(loss, variables)
      optimizer.apply_gradients(zip(gradients, variables))
    return batch_loss

  def fit(self, X_encoder, X_decoder, Y_decoder, epoch=20, batch_size=64, train_size=0.8):
    M = X_encoder.shape[0]
    train_sample_size = int(train_size*M)
    full_dataset = tf.data.Dataset.from_tensor_slices((X_encoder, X_decoder, Y_decoder)).shuffle(M)
    train_ds = full_dataset.take(train_sample_size).batch(batch_size)
    test_ds = full_dataset.skip(train_sample_size)

    total_train_batches = train_ds.cardinality()
    test_ds_size = test_ds.cardinality()

    history = {"train_loss": [], "test_loss": []}

    for ep in range(1, epoch+1):
      print(f"Epoch {ep}/{epoch}")
      total_loss = 0
      for batch, (X_batch_encoder,
                  X_batch_decoder,
                  Y_batch_decoder) in tqdm(enumerate(train_ds.take(total_train_batches)),
                                           desc=f"Train dataset ({total_train_batches} batches. Total {batch_size*total_train_batches} samples)"):
        batch_loss = self.__train_step(X_batch_encoder, X_batch_decoder, Y_batch_decoder)
        total_loss += batch_loss

      history["train_loss"].append(total_loss)
      print(f"Loss on train: {total_loss}")

      total_loss = 0
      for X_test_encoder, X_test_decoder, Y_test_decoder in tqdm(test_ds,
                                                                 desc=f"Test dataset ({test_ds_size} samples)"):
        _, _, batch_loss = self.__forward(tf.expand_dims(X_test_encoder, 0),
                                          None,
                                          tf.expand_dims(Y_test_decoder, 0))
        total_loss += batch_loss

      history["test_loss"].append(total_loss)
      print(f"Loss on test: {total_loss}")
    return history

  def predict(self, X_encoder):
    out, _, _ = self.__forward(X_encoder)
    return out

In [187]:
model = Seq2SeqBahdanauAttention(input_vocab=input_vocab,
                                 output_vocab=output_vocab,
                                 encoder_embd_dim=embedding_dim,
                                 decoder_embd_dim=embedding_dim,
                                 encoder_lstm_units=lstm_hidden_units,
                                 decoder_lstm_units=lstm_hidden_units,
                                 max_output_length=decoder_vec.max_length,
                                 start_token_index=decoder_vec.vocabulary.index(START_TOKEN),
                                 end_token_index=decoder_vec.vocabulary.index(END_TOKEN))

In [185]:
history = model.fit(X_encoder, X_decoder, Y_decoder, epoch=20, batch_size=64, train_size=0.8)

Epoch 1/20


Train dataset (13 batches. Total 832 samples): 13it [00:06,  1.99it/s]


Loss on train: 596.3899536132812


Test dataset (200 samples):  50%|████▉     | 99/200 [00:14<00:13,  7.57it/s]Exception ignored in: <generator object tqdm.__iter__ at 0x7e9010f0cb30>
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/tqdm/std.py", line 1183, in __iter__
    yield obj
KeyboardInterrupt: 
Test dataset (200 samples):  50%|████▉     | 99/200 [00:20<00:20,  4.84it/s]


KeyboardInterrupt: ignored