<a href="https://colab.research.google.com/github/MathBorgess/data_science_studies/blob/main/deep_learning/recurrent/seq2seq.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
#attention rnn

#https://www.tensorflow.org/text/tutorials/nmt_with_attention?hl=pt-br#shape_checker

import pathlib
import tensorflow as tf

path_to_zip = tf.keras.utils.get_file( 'spa-eng.zip', origin='http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip', extract=True)
path_to_file = pathlib.Path(path_to_zip).parent/'spa-eng/spa.txt'

def load_data(path):
  text = path.read_text(encoding='utf-8')

  lines = text.splitlines()
  pairs = [line.split('\t') for line in lines]

  inp = [inp for targ, inp in pairs]
  targ = [targ for targ, inp in pairs]

  return targ, inp

en_dataset, spa_dataset = load_data(path_to_file)
[spa_dataset[-1], en_dataset[-1]]

Downloading data from http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip


['Si quieres sonar como un hablante nativo, debes estar dispuesto a practicar diciendo la misma frase una y otra vez de la misma manera en que un músico de banjo practica el mismo fraseo una y otra vez hasta que lo puedan tocar correctamente y en el tiempo esperado.',
 'If you want to sound like a native speaker, you must be willing to practice saying the same sentence over and over in the same way that banjo players practice the same phrase over and over until they can play it correctly and at the desired tempo.']

In [2]:
dataset = tf.data.Dataset.from_tensor_slices((en_dataset, spa_dataset)).shuffle(len(en_dataset)).batch(64)

In [3]:
# tf.strings.unicode_transcode(tf.constant('Hello?'), input_encoding='UTF-8', output_encoding='UTF-8')

def tf_lower_and_split_punct(text):
  text = tf.strings.lower(text)
  # Keep space, a to z, and select punctuation.
  text = tf.strings.regex_replace(text, '[^ a-z.?!,¿]', '')
  # Add spaces around punctuation.
  text = tf.strings.regex_replace(text, '[.?!,¿]', r' \0 ')
  # Strip whitespace.
  text = tf.strings.strip(text)

  text = tf.strings.join(['[START]', text, '[END]'], separator=' ')
  return text

tf_lower_and_split_punct(tf.constant('Hellá?'))

<tf.Tensor: shape=(), dtype=string, numpy=b'[START] hell ? [END]'>

In [4]:
max_vocab_size = 5000

input_text_processor = tf.keras.layers.TextVectorization(
    standardize=tf_lower_and_split_punct,
    max_tokens=max_vocab_size)

input_text_processor.adapt(en_dataset)

output_text_processor = tf.keras.layers.TextVectorization(
    standardize=tf_lower_and_split_punct,
    max_tokens=max_vocab_size)

output_text_processor.adapt(spa_dataset)

[output_text_processor.get_vocabulary()[:8],input_text_processor.get_vocabulary()[:8]]

[['', '[UNK]', '[START]', '[END]', '.', 'de', 'que', 'a'],
 ['', '[UNK]', '[START]', '[END]', '.', 'the', 'i', 'to']]

In [5]:
import numpy as np

tokenize = input_text_processor(tf.constant('I love you'))
tokenize_decoder = np.array(input_text_processor.get_vocabulary())
' '.join(tokenize_decoder[tokenize.numpy()])

'[START] i love you [END]'

In [6]:
#Some constants

embedding_dim = 256
units = 1024

In [7]:
#the models

class Encoder(tf.keras.layers.Layer):
  def __init__(self, input_vocab_size, embedding_dim, enc_units):
    super(Encoder, self).__init__()
    self.enc_units = enc_units
    self.input_vocab_size = input_vocab_size

    # The embedding layer converts tokens to vectors
    self.embedding = tf.keras.layers.Embedding(self.input_vocab_size, embedding_dim)

    # The GRU RNN layer processes those vectors sequentially.
    self.gru = tf.keras.layers.GRU(self.enc_units,
                                   # Return the sequence and state
                                   return_sequences=True,
                                   return_state=True,
                                   recurrent_initializer='glorot_uniform')

  def call(self, tokens, state=None):

    # 2. The embedding layer looks up the embedding for each token.
    vectors = self.embedding(tokens)

    # 3. The GRU processes the embedding sequence.
    #    output shape: (batch, s, enc_units)
    #    state shape: (batch, enc_units)
    output, state = self.gru(vectors, initial_state=state)

    # 4. Returns the new sequence and its state.
    return output, state

# https://arxiv.org/pdf/1409.0473.pdf

class BahdanauAttention(tf.keras.layers.Layer):
    def __init__(self, units):
        super().__init__()

        self.W1 = tf.keras.layers.Dense(units, use_bias=False)
        self.W2 = tf.keras.layers.Dense(units, use_bias=False)

        self.attention = tf.keras.layers.AdditiveAttention()

    def call(self, query, value, mask):
        w1_query = self.W1(query)
        w2_key = self.W2(value)

        query_mask = tf.ones(tf.shape(query)[:-1], dtype=bool)

        context_vector, attention_weights = self.attention(
        inputs = [w1_query, value, w2_key],
        mask=[query_mask, mask],
        return_attention_scores = True
        )

        return context_vector, attention_weights

import typing
#classes to use in Decoder:
class DecoderInput(typing.NamedTuple):
  new_tokens: typing.Any
  enc_output: typing.Any
  mask: typing.Any

class DecoderOutput(typing.NamedTuple):
  logits: typing.Any
  attention_weights: typing.Any

# O decodificador recebe a saída completa do codificador.
# Ele usa um RNN para rastrear o que gerou até agora.
# Ele usa sua saída RNN como a consulta para chamar a atenção sobre a saída do codificador, produzindo o vetor de contexto.
# Ele combina a saída RNN e o vetor de contexto para gerar o "vetor de atenção".
# Ele gera previsões logit para o próximo token com base no "vetor de atenção".

class Decoder(tf.keras.Model):
    def __init__(self, output_vocabulary_size, embedding_dim, decode_units):
        super(Decoder, self).__init__()
        self.decode_units = decode_units
        self.output_vocabulary_size = output_vocabulary_size

        # For Step 1. The embedding layer convets token IDs to vectors
        self.embedding = tf.keras.layers.Embedding(self.output_vocabulary_size, embedding_dim)

        # For Step 2. The RNN keeps track of what's been generated so far.
        self.gru = tf.keras.layers.GRU(self.decode_units,
                                    return_sequences=True,
                                    return_state=True,
                                    recurrent_initializer='glorot_uniform')

        # For step 3. The RNN output will be the query for the attention layer.
        self.attention = BahdanauAttention(self.decode_units)

        # For step 4. Eqn. (3): converting `ct` to `at`
        self.Wc = tf.keras.layers.Dense(self.decode_units, activation=tf.math.tanh,
                                        use_bias=False)

        # For step 5. This fully connected layer produces the logits for each
        # output token.
        self.fc = tf.keras.layers.Dense(self.output_vocabulary_size)


    def call(self, inputs: DecoderInput, state=None)  -> typing.Tuple[DecoderOutput, tf.Tensor]:
        vectors = self.embedding(inputs.new_tokens)

        rnn_output, state = self.gru(vectors, initial_state=state)

        # Step 3. Use the RNN output as the query for the attention over the encoder output.
        context_vector, attention_weights = self.attention(query=rnn_output, value=inputs.enc_output, mask=inputs.mask)

        context_and_rnn_output = tf.concat([context_vector, rnn_output], axis=-1)
        attention_weights = self.Wc(context_and_rnn_output)
        logits = self.fc(attention_weights)

        return DecoderOutput(logits=logits, attention_weights= attention_weights), state

In [8]:
# example_tokens = input_text_processor(tf.constant('I love you'))
# Convert the input text to tokens.
example_tokens = input_text_processor([tf.constant('I love you')])

# Encode the input sequence.
encoder = Encoder(input_text_processor.vocabulary_size(),
                  embedding_dim, units)
example_enc_output, example_enc_state = encoder(example_tokens)

decoder = Decoder(output_text_processor.vocabulary_size(),
                  embedding_dim, units)

example_output_tokens = output_text_processor([tf.constant('Te amo')])

start_index = output_text_processor.get_vocabulary().index('[START]')
first_token = tf.constant([[start_index]] * example_output_tokens.shape[0])

[first_token, start_index]

dec_result, dec_state = decoder(
    inputs = DecoderInput(new_tokens=first_token,
                          enc_output=example_enc_output,
                          mask=(example_tokens != 0)),
    state = example_enc_state
)
dec_result.logits

<tf.Tensor: shape=(1, 1, 5000), dtype=float32, numpy=
array([[[ 0.00108569,  0.00162426,  0.00185247, ..., -0.00279206,
          0.00406274,  0.00171106]]], dtype=float32)>

In [9]:
#Defining the training loop
class MaskedLoss(tf.keras.losses.Loss):
  def __init__(self):
    self.name = 'masked_loss'
    self.loss = tf.keras.losses.SparseCategoricalCrossentropy(
        from_logits=True, reduction='none')

  def __call__(self, y_true, y_pred):
    # Calculate the loss for each item in the batch.
    loss = self.loss(y_true, y_pred)

    # Mask off the losses on padding.
    mask = tf.cast(y_true != 0, tf.float32)
    loss *= mask

    # Return the total.
    return tf.reduce_sum(loss)

class TrainTranslator(tf.keras.Model):
  def __init__(self, embedding_dim, units,
               input_text_processor,
               output_text_processor,
               use_tf_function=True):
    super().__init__()
    # Build the encoder and decoder
    encoder = Encoder(input_text_processor.vocabulary_size(),
                      embedding_dim, units)
    decoder = Decoder(output_text_processor.vocabulary_size(),
                      embedding_dim, units)

    self.encoder = encoder
    self.decoder = decoder
    self.input_text_processor = input_text_processor
    self.output_text_processor = output_text_processor
    self.use_tf_function = use_tf_function

  def train_step(self, inputs):
    return self._train_step(inputs)

  def _preprocess(self, input_text, target_text):
    # Convert the text to token IDs
    input_tokens = self.input_text_processor(input_text)
    target_tokens = self.output_text_processor(target_text)

    # Convert IDs to masks.
    input_mask = input_tokens != 0

    target_mask = target_tokens != 0

    return input_tokens, input_mask, target_tokens, target_mask
  @tf.function(input_signature=[[tf.TensorSpec(dtype=tf.string, shape=[None]),
                               tf.TensorSpec(dtype=tf.string, shape=[None])]])
  def _train_step(self, inputs):
    input_text, target_text = inputs

    (input_tokens, input_mask,
    target_tokens, target_mask) = self._preprocess(input_text, target_text)

    max_target_length = tf.shape(target_tokens)[1]

    with tf.GradientTape() as tape:
      # Encode the input
      enc_output, enc_state = self.encoder(input_tokens)

      # Initialize the decoder's state to the encoder's final state.
      # This only works if the encoder and decoder have the same number of
      # units.
      dec_state = enc_state
      loss = tf.constant(0.0)

      for t in tf.range(max_target_length-1):
        # Pass in two tokens from the target sequence:
        # 1. The current input to the decoder.
        # 2. The target for the decoder's next prediction.
        new_tokens = target_tokens[:, t:t+2]
        step_loss, dec_state = self._loop_step(new_tokens, input_mask,
                                              enc_output, dec_state)
        loss = loss + step_loss

      # Average the loss over all non padding tokens.
      average_loss = loss / tf.reduce_sum(tf.cast(target_mask, tf.float32))

    # Apply an optimization step
    variables = self.trainable_variables
    gradients = tape.gradient(average_loss, variables)
    self.optimizer.apply_gradients(zip(gradients, variables))

    # Return a dict mapping metric names to current value
    return {'batch_loss': average_loss}

  def _loop_step(self, new_tokens, input_mask, enc_output, dec_state):
    input_token, target_token = new_tokens[:, 0:1], new_tokens[:, 1:2]

    # Run the decoder one step.
    decoder_input = DecoderInput(new_tokens=input_token,
                                enc_output=enc_output,
                                mask=input_mask)

    dec_result, dec_state = self.decoder(decoder_input, state=dec_state)

    # `self.loss` returns the total for non-padded tokens
    y = target_token
    y_pred = dec_result.logits
    step_loss = self.loss(y, y_pred)

    return step_loss, dec_state

In [10]:
translator = TrainTranslator(embedding_dim=embedding_dim,
                            input_text_processor=input_text_processor,
                            output_text_processor=output_text_processor,
                            units=units,
                            use_tf_function=True)
translator.compile(loss=MaskedLoss(), optimizer='adam')

In [12]:
translator.fit(dataset, epochs=4)

Epoch 1/4
Epoch 2/4
Epoch 3/4
Epoch 4/4


<keras.callbacks.History at 0x7b3e16a45ba0>

In [23]:
class Translator(tf.Module):
  def __init__(self, encoder, decoder, input_text_processor,
               output_text_processor):
    self.encoder = encoder
    self.decoder = decoder
    self.input_text_processor = input_text_processor
    self.output_text_processor = output_text_processor

    self.output_token_string_from_index = (
        tf.keras.layers.StringLookup(
            vocabulary=output_text_processor.get_vocabulary(),
            mask_token='',
            invert=True))

    # The output should never generate padding, unknown, or start.
    index_from_string = tf.keras.layers.StringLookup(
        vocabulary=output_text_processor.get_vocabulary(), mask_token='')
    token_mask_ids = index_from_string(['', '[UNK]', '[START]']).numpy()

    token_mask = np.zeros([index_from_string.vocabulary_size()], dtype=bool)
    token_mask[np.array(token_mask_ids)] = True
    self.token_mask = token_mask

    self.start_token = index_from_string(tf.constant('[START]'))
    self.end_token = index_from_string(tf.constant('[END]'))

  def sample(self, logits, temperature):
    # Set the logits for all masked tokens to -inf, so they are never chosen.
    logits = tf.where(self.token_mask, -np.inf, logits)

    if temperature == 0.0:
      new_tokens = tf.argmax(logits, axis=-1)
    else:
      logits = tf.squeeze(logits, axis=1)
      new_tokens = tf.random.categorical(logits/temperature,
                                          num_samples=1)

    return new_tokens

  def tokens_to_text(self, result_tokens):
    result_text_tokens = self.output_token_string_from_index(result_tokens)

    result_text = tf.strings.reduce_join(result_text_tokens,
                                        axis=1, separator=' ')
    result_text = tf.strings.strip(result_text)
    return result_text

  def translate(self,
                       input_text, *,
                       max_length=50,
                       return_attention=True,
                       temperature=1.0):
    batch_size = tf.shape(input_text)[0]
    input_tokens = self.input_text_processor(input_text)
    enc_output, enc_state = self.encoder(input_tokens)

    dec_state = enc_state
    new_tokens = tf.fill([batch_size, 1], self.start_token)

    result_tokens = []
    attention = []
    done = tf.zeros([batch_size, 1], dtype=tf.bool)

    for _ in range(max_length):
      dec_input = DecoderInput(new_tokens=new_tokens,
                              enc_output=enc_output,
                              mask=(input_tokens!=0))

      dec_result, dec_state = self.decoder(dec_input, state=dec_state)

      attention.append(dec_result.attention_weights)

      new_tokens = self.sample(dec_result.logits, temperature)

      # If a sequence produces an `end_token`, set it `done`
      done = done | (new_tokens == self.end_token)
      # Once a sequence is done it only produces 0-padding.
      new_tokens = tf.where(done, tf.constant(0, dtype=tf.int64), new_tokens)

      # Collect the generated tokens
      result_tokens.append(new_tokens)

      if tf.executing_eagerly() and tf.reduce_all(done):
        break

    # Convert the list of generates token ids to a list of strings.
    result_tokens = tf.concat(result_tokens, axis=-1)
    result_text = self.tokens_to_text(result_tokens)

    if return_attention:
      attention_stack = tf.concat(attention, axis=1)
      return {'text': result_text, 'attention': attention_stack}
    else:
      return {'text': result_text}

In [38]:
model = Translator(encoder=translator.encoder,
                   decoder=translator.decoder,
                   input_text_processor=translator.input_text_processor,
                   output_text_processor=translator.output_text_processor)

model.translate(input_text=tf.constant(['I really want to know what is happen with my job']))['text'][0].numpy().decode()

'de verdad que quiero saber qu es por mi trabajo con el suyo de trabajo .'