<a href="https://colab.research.google.com/github/fojoshi/MachineTranslation/blob/main/NMT_with_Attention.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
!pip install tensorflow_text

Collecting tensorflow_text
[?25l  Downloading https://files.pythonhosted.org/packages/c0/ed/bbb51e9eccca0c2bfdf9df66e54cdff563b6f32daed9255da9b9a541368f/tensorflow_text-2.5.0-cp37-cp37m-manylinux1_x86_64.whl (4.3MB)
[K     |████████████████████████████████| 4.3MB 11.6MB/s 
Installing collected packages: tensorflow-text
Successfully installed tensorflow-text-2.5.0


In [3]:
import numpy as np

import typing
from typing import Any, Tuple

import tensorflow as tf
from tensorflow.keras.layers.experimental import preprocessing

import tensorflow_text as tf_text

import matplotlib.pyplot as plt
import matplotlib.ticker as ticker

In [4]:
# Download the file
import pathlib

path_to_zip = tf.keras.utils.get_file(
    'spa-eng.zip', origin='http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip',
    extract=True)

path_to_file = pathlib.Path(path_to_zip).parent/'spa-eng/spa.txt'

Downloading data from http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip


In [5]:
def load_data(path):
  text = path_to_file.read_text(encoding='utf-8')

  lines = text.splitlines()
  pairs = [line.split('\t') for line in lines]

  inp = [inp for targ, inp in pairs]
  targ = [targ for targ, inp in pairs]

  return targ, inp

In [6]:
targ, inp = load_data(path_to_file)
print(inp[-1])
print(targ[-1])

Si quieres sonar como un hablante nativo, debes estar dispuesto a practicar diciendo la misma frase una y otra vez de la misma manera en que un músico de banjo practica el mismo fraseo una y otra vez hasta que lo puedan tocar correctamente y en el tiempo esperado.
If you want to sound like a native speaker, you must be willing to practice saying the same sentence over and over in the same way that banjo players practice the same phrase over and over until they can play it correctly and at the desired tempo.


In [516]:
BUFFER_SIZE = len(inp)
BATCH_SIZE = 64

dataset = tf.data.Dataset.from_tensor_slices((inp, targ)).shuffle(BUFFER_SIZE)
dataset = dataset.batch(BATCH_SIZE)

In [8]:
def tf_lower_and_split_punct(text):
  # Split accecented characters.
  text = tf_text.normalize_utf8(text, 'NFKD')
  text = tf.strings.lower(text)
  # Keep space, a to z, and select punctuation.
  text = tf.strings.regex_replace(text, '[^ a-z.?!,¿]', '')
  # Add spaces around punctuation.
  text = tf.strings.regex_replace(text, '[.?!,¿]', r' \0 ')
  # Strip whitespace.
  text = tf.strings.strip(text)

  text = tf.strings.join(['[START]', text, '[END]'], separator=' ')
  return text

In [9]:
for spanish, english in dataset.take(1):
  pass
print(spanish[0].numpy().decode())
print(tf_lower_and_split_punct(spanish[0]).numpy().decode())

No quiero hacerlo otra vez.
[START] no quiero hacerlo otra vez . [END]


In [10]:
max_vocab_size = 5000

input_text_processor = preprocessing.TextVectorization(
    standardize=tf_lower_and_split_punct,
    max_tokens=max_vocab_size)

input_text_processor.adapt(inp)

# Here are the first 10 words from the vocabulary:
input_text_processor.get_vocabulary()[:10]

['', '[UNK]', '[START]', '[END]', '.', 'que', 'de', 'el', 'a', 'no']

In [11]:
output_text_processor = preprocessing.TextVectorization(
    standardize=tf_lower_and_split_punct,
    max_tokens=max_vocab_size)

output_text_processor.adapt(targ)
output_text_processor.get_vocabulary()[:10]

['', '[UNK]', '[START]', '[END]', '.', 'the', 'i', 'to', 'you', 'tom']

In [12]:
output_text_processor(english[1])

<tf.Tensor: shape=(6,), dtype=int64, numpy=array([  2, 209,  10, 965,   4,   3])>

In [13]:
english[1].numpy()

b"There's a difference."

In [14]:
english_vocab = output_text_processor.get_vocabulary()

In [15]:
from tensorflow.keras.layers import Embedding, GRU, Layer


In [390]:
class Encoder(Layer):
  def __init__(self, input_vocab_size, embedding_dim, enc_units):
    super(Encoder, self).__init__()
    self.enc_units = enc_units
    self.input_lang_embedding = Embedding(input_vocab_size, embedding_dim)
    self.gru = GRU(self.enc_units,
                                   # Return the sequence and state
                                   return_sequences=True,
                                   return_state=True,
                                   recurrent_initializer='glorot_uniform')
    
  def call(self, word_indices, state=None):
    word_embeddings =  self.input_lang_embedding(word_indices)
    whole_sequence_output, final_state = self.gru(word_embeddings)
    return whole_sequence_output, final_state



In [391]:
enc = Encoder(max_vocab_size, 300, 32)

In [552]:
input_text = spanish
input_word_indices = input_text_processor(spanish)

whole_encoder_states, final_hidden_state = enc(input_word_indices)
print(input_word_indices[:5])
print(whole_encoder_states.shape, final_hidden_state.shape)
# whole_sequence_output, final_state = enc(output_text_processor(english[1:3]))

tf.Tensor(
[[   2    9   48  198  176   72    4    3    0    0    0    0    0    0
     0]
 [   2   59   23 1108    4    3    0    0    0    0    0    0    0    0
     0]
 [   2   43    9  904   77    4    3    0    0    0    0    0    0    0
     0]
 [   2    9   74  927  503    6   87 1809    4    3    0    0    0    0
     0]
 [   2   24  125   17    1   46  588   46 1244    4    3    0    0    0
     0]], shape=(5, 15), dtype=int64)
(64, 15, 32) (64, 32)


In [393]:
from tensorflow.python.ops import math_ops

In [394]:
class BahdanauAttention(tf.keras.layers.Layer):
  def __init__(self, units):
      super().__init__()
      self.W1 = tf.keras.layers.Dense(units, use_bias=False)
      self.W2 = tf.keras.layers.Dense(units, use_bias=False)

  def call(self, query, keys, mask):
      query_weights = tf.expand_dims(self.W1(query), 2)
      keys_weights = tf.expand_dims(self.W2(keys), 1)
      score = tf.reduce_sum(tf.nn.tanh(query_weights + keys_weights), -1)
      padding_mask = tf.expand_dims(math_ops.logical_not(mask), 1)
      score -= 1e9 * math_ops.cast(padding_mask, dtype=score.dtype)
      attention_scores = tf.expand_dims(tf.nn.softmax(score, axis=2), -1)
      context = tf.reduce_sum(attention_scores * tf.expand_dims(keys, axis=1), axis=2)
      return context, attention_scores, 


In [395]:
att = BahdanauAttention(10)

In [396]:
context, score = att(query=o, keys=whole_encoder_states, mask = (input_word_indices != 0))

In [397]:
o.shape

TensorShape([64, 14, 32])

In [398]:
whole_encoder_states.shape

TensorShape([64, 15, 32])

In [399]:
score.shape

TensorShape([64, 14, 15, 1])

In [400]:
context.shape

TensorShape([64, 14, 32])

In [401]:
class Decoder(tf.keras.layers.Layer):
  def __init__(self, output_vocab_size, embedding_dim, dec_units):
    super(Decoder, self).__init__()
    self.english_embedding = Embedding(output_vocab_size, embedding_dim)
    self.gru = GRU(dec_units, return_sequences=True, return_state=True)
    self.dense = tf.keras.layers.Dense(output_vocab_size)
    self.attention = BahdanauAttention(dec_units)

  def call(self, input_word_indices, encoder_keys, mask, state=None):
    embedding_ = self.english_embedding(input_word_indices)
    output, state = self.gru(embedding_, initial_state=state)
    context, attention_scores = self.attention(query=output, 
                                              keys=encoder_keys, 
                                              mask = mask)
    concat = tf.concat([output, context], axis=-1)

    
    vocab_output = self.dense(concat)

    return vocab_output, state

In [402]:
decoder = Decoder(max_vocab_size, 300, 32)

In [403]:
start_index = output_text_processor._index_lookup_layer('[START]').numpy()
first_token = tf.constant([[start_index]] * english.shape[0])

In [404]:
vocab_output, state = decoder(input_word_indices=first_token,
                     encoder_keys = whole_encoder_states,
                     mask = (input_word_indices != 0),
                     state=final_hidden_state )

In [315]:
sampled_token = tf.random.categorical(vocab_output[:, 0, :], num_samples=1)

In [316]:
vocab = np.array(output_text_processor.get_vocabulary())
first_word = vocab[sampled_token.numpy()]
first_word[:5]

array([['quickly'],
       ['possibly'],
       ['pipe'],
       ['health'],
       ['oven']], dtype='<U16')

In [318]:
vocab_output, state = decoder(input_word_indices=output_text_processor(first_word),
                     encoder_keys = whole_encoder_states,
                     mask = (input_word_indices != 0),
                     state = state )

(64, 32)
(64, 15, 32) (64, 3, 15, 1)


In [319]:
sampled_token = tf.random.categorical(vocab_output[:, 0, :], num_samples=1)
vocab = np.array(output_text_processor.get_vocabulary())
first_word = vocab[sampled_token.numpy()]
first_word[:5]

array([['buying'],
       ['section'],
       ['records'],
       ['normal'],
       ['kinds']], dtype='<U16')

In [474]:
for data in dataset.take(1):
  pass

In [481]:
data

(<tf.Tensor: shape=(4,), dtype=string, numpy=
 array([b'Fue ir\xc3\xb3nico.', b'No tienes mucho tiempo.',
        b'Quer\xc3\xada preguntarte una cosa.',
        b'No fue lo suficientemente r\xc3\xa1pido.'], dtype=object)>,
 <tf.Tensor: shape=(4,), dtype=string, numpy=
 array([b'It was ironic.', b"You haven't got much time.",
        b'I wanted to ask you something.', b'He was not quick enough.'],
       dtype=object)>)

In [511]:
class ModelTrain(tf.keras.Model):
  def __init__(self, input_text_processor, output_text_processor, embedding_dim, units):
    super(ModelTrain, self).__init__()
    self.encoder = Encoder(input_text_processor.vocabulary_size(), embedding_dim, units)
    self.decoder = Decoder(output_text_processor.vocabulary_size(), embedding_dim, units)
    self.input_text_processor = input_text_processor
    self.output_text_processor = output_text_processor
    
  @tf.function
  def train_step(self, data):
    input_sentence, output_sentence = data
    input_word_indices = self.input_text_processor(input_sentence)
    output_word_indices = self.output_text_processor(output_sentence)
    output_mask = tf.cast(output_word_indices != 0, tf.float32)

    with tf.GradientTape() as tape:
      whole_encoder_states, final_hidden_state = self.encoder(input_word_indices)
      vocab_output, decoder_last_state = self.decoder(input_word_indices=output_word_indices,
                                                      encoder_keys = whole_encoder_states,
                                                      mask = (input_word_indices != 0),
                                                      state=final_hidden_state )
    
      loss = tf.reduce_sum(self.loss(y_true=output_word_indices[:, 1:], y_pred=vocab_output[:, :-1]) * output_mask[:, 1:])
    variables = self.trainable_variables 
    gradients = tape.gradient(loss, variables)
    self.optimizer.apply_gradients(zip(gradients, variables))
    return {'loss': loss}


In [514]:
model = ModelTrain(input_text_processor, output_text_processor, 300, 128)

model.compile(
    optimizer=tf.optimizers.Adam(),
    loss=tf.keras.losses.SparseCategoricalCrossentropy(
        from_logits=True, reduction='none')
)

In [529]:
model.fit(dataset, epochs=10)

Epoch 1/10
Epoch 2/10

KeyboardInterrupt: ignored

In [519]:
class Infer(tf.Module):
  def __init__(self,encoder, decoder, 
               input_text_processor,
               output_text_processor):
    self.encoder = encoder
    self.decoder = decoder
    self.input_text_processor = input_text_processor
    self.output_text_processor = output_text_processor

    self.idx_to_word = (
        tf.keras.layers.experimental.preprocessing.StringLookup(
            vocabulary=output_text_processor.get_vocabulary(),
            invert=True))

    # The output should never generate padding, unknown, or start.
    index_from_string = tf.keras.layers.experimental.preprocessing.StringLookup(
      vocabulary=output_text_processor.get_vocabulary())
    
    token_mask_ids = index_from_string(['',
                                    '[UNK]',
                                    '[START]']).numpy()

    token_mask = np.zeros([index_from_string.vocabulary_size()], dtype=np.bool)
    token_mask[np.array(token_mask_ids)] = True
    self.token_mask = token_mask[tf.newaxis, tf.newaxis, :]


    self.start_token = index_from_string('[START]')
    self.end_token = index_from_string('[END]')

  def tokens_to_text(self, result_tokens):
    result_text_tokens = self.idx_to_word(result_tokens)

    result_text = tf.strings.reduce_join(result_text_tokens,
                                        axis=1, separator=' ')

    result_text = tf.strings.strip(result_text)
    return result_text

  def sample(self, logits):
    logits = tf.where(self.token_mask, -np.inf, logits)
    return tf.argmax(logits, axis=-1)

  def translate(self, input_sentence, max_length=50,):
    batch_size = tf.shape(input_sentence)[0]
    input_word_indices = self.input_text_processor(input_sentence)
    input_mask = (input_word_indices != 0)
    whole_encoder_output, final_hidden_state = self.encoder(input_word_indices)
    dec_state = final_hidden_state
    vocab_input = tf.fill([batch_size, 1], self.start_token)
    result_tokens = []
    done = tf.zeros([batch_size, 1], dtype=tf.bool)
    for current_length in range(max_length):
      vocab_output, dec_state = self.decoder(input_word_indices=vocab_input, 
                                             encoder_keys=whole_encoder_output, 
                                             mask= input_mask, state= dec_state)
      vocab_input = self.sample(vocab_output)
      

      done = done | (vocab_input == self.end_token)
      vocab_input = tf.where(done, tf.constant(0, dtype=tf.int64), vocab_input)
      result_tokens.append(vocab_input)

      if tf.reduce_all(done):
        break
    
    result_tokens = tf.concat(result_tokens, axis=-1)
    result_text = self.tokens_to_text(result_tokens)
    return {'text': result_text}

In [530]:
infer = Infer(model.encoder, model.decoder, input_text_processor, output_text_processor)

In [550]:
infer.translate(["Te quiero.", "mi pasaporte esta aqui", "¿Dónde está el hotel?", "Quiero irme a casa hoy.", "La tierra gira alrededor del sol."])['text'].numpy()

array([b'i love you .', b'my passport is here .', b'wheres the hotel ?',
       b'i want to go home today .', b'the earth begins on the sun .'],
      dtype=object)