# Configuração

In [4]:

# Tipagem
import typing 
import string
from typing import Any, Tuple, NamedTuple
from string import digits

# Auxiliares
import pathlib
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker

# Core
import tensorflow as tf
import tensorflow_text as tf_text
from tensorflow.keras.layers.experimental import preprocessing

# Classe auxliar
class ShapeChecker():
  def __init__(self):
    # Keep a cache of every axis-name seen
    self.shapes = {}

  def __call__(self, tensor, names, broadcast=False):
    if not tf.executing_eagerly():
      return

    if isinstance(names, str):
      names = (names,)

    shape = tf.shape(tensor)
    rank = tf.rank(tensor)

    if rank != len(names):
      raise ValueError(f'Rank mismatch:\n'
                       f'    found {rank}: {shape.numpy()}\n'
                       f'    expected {len(names)}: {names}\n')

    for i, name in enumerate(names):
      if isinstance(name, int):
        old_dim = name
      else:
        old_dim = self.shapes.get(name, None)
      new_dim = shape[i]

      if (broadcast and new_dim == 1):
        continue

      if old_dim is None:
        # If the axis name is new, add its length to the cache.
        self.shapes[name] = new_dim
        continue

      if new_dim != old_dim:
        raise ValueError(f"Shape mismatch for dimension: '{name}'\n"
                         f"    found: {new_dim}\n"
                         f"    expected: {old_dim}\n")

# Dados

## Carregando os dados

In [5]:
#path_to_zip = tf.keras.utils.get_file('spa-eng.zip', origin='C://Users/ricar/Downloads/spa-eng.zip', extract=True)
path_of_file = pathlib.Path('./data/dataset.txt')

# função para carregar os dados
def load_data(path):
  text = path.read_text(encoding='utf-8')
  
  #pairs vai ter várias listas, com dois elementos em cada uma: a palavra em Inglês e Espanhol
  lines = text.splitlines()
  pairs = [line.split('\t') for line in lines]

  input = [input for target, input in pairs]
  target = [target for target, input in pairs]

  return target, input

target, input = load_data(path_of_file)
print(f'espanhol: {input[-1]}\n')
print(f'inglês: {target[-1]}')


espanhol: Si quieres sonar como un hablante nativo, debes estar dispuesto a practicar diciendo la misma frase una y otra vez de la misma manera en que un músico de banjo practica el mismo fraseo una y otra vez hasta que lo puedan tocar correctamente y en el tiempo esperado.

inglês: If you want to sound like a native speaker, you must be willing to practice saying the same sentence over and over in the same way that banjo players practice the same phrase over and over until they can play it correctly and at the desired tempo.


## Estruturando os dados

In [6]:
BUFFER_SIZE = len(input)
BATCH_SIZE = 64

#Vai separar todas as listas (input, targe) em partes menores, de acordo com o tamanho por parte (BUFFER_ZISE)
dataset = tf.data.Dataset.from_tensor_slices((input, target)).shuffle(BUFFER_SIZE)

#Combina elementos do dataset em lotes (listas) de acordo com o tamanho (BATCH_SIZE)
dataset = dataset.batch(BATCH_SIZE)

for example_input_batch, example_target_batch in dataset.take(1):
  print(example_input_batch[:5])
  print()
  print(example_target_batch[:5])
  break


tf.Tensor(
[b'Cog\xc3\xad un taxi desde la estaci\xc3\xb3n hasta el hotel.'
 b'Con un grado universitario, Tom conseguir\xc3\xa1 un mejor empleo.'
 b'Casi todas las hojas se han ca\xc3\xaddo.'
 b'Tom no est\xc3\xa1 ayudando para nada.'
 b'Tom se est\xc3\xa1 ti\xc3\xb1endo el pelo.'], shape=(5,), dtype=string)

tf.Tensor(
[b'I caught a cab from the station to the hotel.'
 b'Tom will get a better job with a college degree.'
 b'Almost all the leaves have fallen.' b"Tom isn't helping any."
 b'Tom is dyeing his hair.'], shape=(5,), dtype=string)


## Pré processamento do texto

In [7]:
#Aqui neste método há várias regras para processar os textos nas línguas distintas
def tf_lower_and_split_punctuation(text):
  text = tf_text.normalize_utf8(text, 'NFKD')
  text = tf.strings.lower(text)
  text = tf.strings.regex_replace(text, '[^ a-z.?!,¿]', '')
  text = tf.strings.regex_replace(text, '[.?!,¿]', r' \0 ')
  text = tf.strings.strip(text)
 
  text = tf.strings.join(['[START]', text, '[END]'], separator=' ')
  return text

tf_lower_and_split_punctuation('¿Todavía está en casa?').numpy().decode()


'[START] ¿ todavia esta en casa ? [END]'

## Vetorização dos textos

In [8]:
max_vocabulary_size = 5000

#O adapt é usado para fazer um treinamento dos vetores em cima dos dados 

#camada de vetorização do input 
input_text_processor = preprocessing.TextVectorization(standardize=tf_lower_and_split_punctuation, max_tokens=max_vocabulary_size)
input_text_processor.adapt(input)

#camada de vetorização do output 
output_text_processor = preprocessing.TextVectorization(standardize=tf_lower_and_split_punctuation, max_tokens=max_vocabulary_size)
output_text_processor.adapt(target)


## Usando as camadas vetorizadas


In [9]:
#As camadas têm capacidade agora de converter uma lista (ou lote) de strings em textos
#E com o vocabulário, pode-se converter os tokens para textos

print(f'Vocabulário da camada input: {input_text_processor.get_vocabulary()[:10]}')
print(f'Vocabulário da camada output: {output_text_processor.get_vocabulary()[:10]}\n')

example_tokens = input_text_processor(example_input_batch)

print(f'Exemplo de tokens: {example_tokens[:3, :10]}\n')
print(f'Tokens da primeira frase: {example_tokens[0]}\n')

input_vocab = np.array(input_text_processor.get_vocabulary())
tokens = input_vocab[example_tokens[0].numpy()]
first_phrase = ' '.join(tokens)

print(f'Primeira frase "destokenizada": {tokens}\n')
print(f'Primeira frase "destokenizada" e formatada: {first_phrase}')

Vocabulário da camada input: ['', '[UNK]', '[START]', '[END]', '.', 'que', 'de', 'el', 'a', 'no']
Vocabulário da camada output: ['', '[UNK]', '[START]', '[END]', '.', 'the', 'i', 'to', 'you', 'tom']

Exemplo de tokens: [[   2 3481   16  832  212   11  354  143    7  476]
 [   2   27   16 4197 3361   19   10    1   16  113]
 [   2  188  229   34 1651   17  300 3183    4    3]]

Tokens da primeira frase: [   2 3481   16  832  212   11  354  143    7  476    4    3    0    0
    0    0    0    0    0    0    0    0]

Primeira frase "destokenizada": ['[START]' 'cogi' 'un' 'taxi' 'desde' 'la' 'estacion' 'hasta' 'el' 'hotel'
 '.' '[END]' '' '' '' '' '' '' '' '' '' '']

Primeira frase "destokenizada" e formatada: [START] cogi un taxi desde la estacion hasta el hotel . [END]          


# Modelos

## Codificador

In [10]:
#Variáveis "globais"

embedding_dim = 256 #Dimensão da camada de embedding
units = 1024 #

In [11]:
# Codificador

# Considerações: o uso da classe ShapeChecker() serve para verificar os formatos dos tensores
#                o codificador precisa retornar uma saída codificada e também o seu estado para que seja passado ao decodificador  

# O codificador, simplificando, é uma camada da inteligência de tradução, por isso herda de Layer
class Encoder(tf.keras.layers.Layer):
  def __init__(self, input_vocab_size, embedding_dim, enc_units):
    super(Encoder, self).__init__()
    self.enc_units = enc_units
    self.input_vocab_size = input_vocab_size

    #Camada de embedding para converter tokens em vetores 
    self.embedding = tf.keras.layers.Embedding(self.input_vocab_size, embedding_dim)

    #Essa é a camada que processa os vetores sequencialmente (camada RNN) 
    self.gru = tf.keras.layers.GRU(
      self.enc_units,
      return_sequences=True,
      return_state=True,
      recurrent_initializer='glorot_uniform'
    )
  
  def call(self, tokens, state=None):
    shape_checker = ShapeChecker()
    shape_checker(tokens, ('batch', 's'))

    #Camada de embedding que procura os tokens
    vectors = self.embedding(tokens)
    shape_checker(vectors, ('batch', 's', 'embed_dim'))

    #Camada RNN que processa a sequencia de vetores
    output, state = self.gru(vectors, initial_state=state)
    shape_checker(output, ('batch', 's', 'enc_units'))
    shape_checker(state, ('batch', 'enc_units'))

    return output, state

## Testando o codificador

In [12]:
#Convertendo a entrada (texto) para tokens
example_tokens = input_text_processor(example_input_batch)

#Tenho 64 frases dentro de "example_input_batch"
print(f'formato do lote de entrada (lote de frases): {example_input_batch.shape}')
print(f'algumas frases do lote: {example_input_batch[:2]}\n')

#Tenho 64 lista de tokens, cada lista contendo 15 tokens
print(f'formato do lote de tokens de entrada (lote de tokens): {example_tokens.shape}')
print(f'alguns tokens que estão sendo passados: {example_tokens[:1]}\n')

#Codificando a sequência de tokens de entrada
encoder = Encoder(input_text_processor.vocabulary_size(), embedding_dim, units)
example_enc_output, example_enc_state = encoder(example_tokens)

#Eu tenho uma saída, codificada, com o seguinte shape (x, 15, 1024)
#Onde x é quantidade de frases que passei, 15 é quantidade de tokens e 1024 é a dimensão de cada embedding 
print(f'Saída codificada, shape (batch, s, units): {example_enc_output.shape}')
print(f'Estado de saída, shape (batch, units): {example_enc_state.shape}\n')

#Cada frase vai ter uma lista de tokens que foram vetorizados à embeddings (transformado em uma lista de valores de embeddings)
#Então agora temos 15 listas, referente ao valor dos tokens, e cada lista tem uma lista com 1024 valores de embedding 
print(f'Primeira frase: {example_enc_output[0].shape}')
print(f'Valor de embedding do primeiro token: {example_enc_output[0][0].shape}')

#----
print(f'\nInput batch, shape (batch): {example_input_batch.shape}')
print(f'Input batch tokens, shape (batch, s): {example_tokens.shape}')
print(f'Input batch tokens, shape (batch, s): {example_enc_output.shape}')
print(f'Encoder state, shape (batch, units): {example_enc_state.shape}')


formato do lote de entrada (lote de frases): (64,)
algumas frases do lote: [b'Cog\xc3\xad un taxi desde la estaci\xc3\xb3n hasta el hotel.'
 b'Con un grado universitario, Tom conseguir\xc3\xa1 un mejor empleo.']

formato do lote de tokens de entrada (lote de tokens): (64, 22)
alguns tokens que estão sendo passados: [[   2 3481   16  832  212   11  354  143    7  476    4    3    0    0
     0    0    0    0    0    0    0    0]]

Saída codificada, shape (batch, s, units): (64, 22, 1024)
Estado de saída, shape (batch, units): (64, 1024)

Primeira frase: (22, 1024)
Valor de embedding do primeiro token: (1024,)

Input batch, shape (batch): (64,)
Input batch tokens, shape (batch, s): (64, 22)
Input batch tokens, shape (batch, s): (64, 22, 1024)
Encoder state, shape (batch, units): (64, 1024)


## Atenção  

In [13]:
# a cabeça da camada de atenção a ser usada aqui é a Bahdanau's additive attention. 
# ref: https://arxiv.org/pdf/1409.0473.pdf

class BahadanauAttention(tf.keras.layers.Layer):
    def __init__(self, units):
        super().__init__()

        self.W1 = tf.keras.layers.Dense(units, use_bias=False)
        self.W2 = tf.keras.layers.Dense(units, use_bias=False)

        self.attention = tf.keras.layers.AdditiveAttention()

    #query vai ser algo gerado pelo decodificador
    #value é a saída do codificador
    #maks vai servir para excluir o padding
    def call(self, query, value, mask):
        shape_checker = ShapeChecker()
        shape_checker(query, ('batch', 't', 'query_units')) 
        shape_checker(value, ('batch', 's', 'value_units')) 
        shape_checker(mask, ('batch', 's')) 

        w1_query = self.W1(query)
        shape_checker(w1_query, ('batch', 't', 'attn_units'))

        w2_key = self.W2(value)
        shape_checker(w2_key, ('batch', 's', 'attn_units'))

        query_mask = tf.ones(tf.shape(query)[:-1], dtype=bool)
        value_mask = mask

        context_vector, attention_weights = self.attention(
            inputs = [w1_query, value, w2_key],
            mask=[query_mask, value_mask],
            return_attention_scores = True,
        )
        shape_checker(context_vector, ('batch', 't', 'value_units'))
        shape_checker(attention_weights, ('batch', 't', 's'))

        return context_vector, attention_weights

## Testando a camada de atenção

In [14]:
attention_layer = BahadanauAttention(units)
print((example_tokens != 0).shape)

#Este é um exemplo de consulta que o decodificador fará
example_attention_query = tf.random.normal(shape=[len(example_tokens), 2, 10])

#
context_vector, attention_weights = attention_layer(query=example_attention_query, value=example_enc_output, mask=(example_tokens != 0))

print(f'Shape do resultado do vetor de atenção: {context_vector.shape}')
print(f'Shape dos pesos retornados da camada de atenção: {attention_weights.shape}')

(64, 22)
Shape do resultado do vetor de atenção: (64, 2, 1024)
Shape dos pesos retornados da camada de atenção: (64, 2, 22)


## Decodificador

In [18]:
#Como são diversos tensores que o decoder recebe e retorna, foi criada classes auxiliares 
class DecoderInput(typing.NamedTuple):
    new_tokens: Any
    enc_output: Any
    mask: Any

class DecoderOutput(typing.NamedTuple):
    logits: Any
    attention_weights: Any    

#O decodificador vai receber a saída inteira do codificador para gerar as previsões.
class Decoder(tf.keras.layers.Layer): 
    def __init__(self, output_vocab_size, embedding_dim, dec_units):
        super(Decoder, self).__init__()

        self.dec_units = dec_units
        self.output_vocab_size = output_vocab_size
        self.embedding_dim = embedding_dim

        #1º passo: criar camada de Embedding para vetorizar os IDs de tokens
        self.embedding = tf.keras.layers.Embedding(self.output_vocab_size, embedding_dim)

        #2º passo: a camada GRU para gerar as previsões 
        self.gru = tf.keras.layers.GRU(self.dec_units, return_sequences=True, return_state=True, recurrent_initializer='glorot_uniform')

        #3º passo: gerar a camada de atenção para melhorar a previsão. A saída da camada GRU vai servir como query para esta camada
        self.attention = BahadanauAttention(self.dec_units)

        #4º passo: 
        self.Wc = tf.keras.layers.Dense(dec_units, activation=tf.math.tanh, use_bias=False)

        #5º passo: uma camada que irá produzir previsões logísticas para cada token de saída
        self.fc = tf.keras.layers.Dense(self.output_vocab_size)

    def call(self, inputs: DecoderInput, state=None) -> Tuple[DecoderOutput, tf.Tensor]:
        shape_checker = ShapeChecker()
        shape_checker(inputs.new_tokens, ('batch', 't'))
        shape_checker(inputs.enc_output, ('batch', 's', 'enc_units'))
        shape_checker(inputs.mask, ('batch', 's'))
        
        if state is not None:
          shape_checker(state, ('batch', 'dec_units'))
        
        #1º passo. Vetorizando os tokens
        vectors = self.embedding(inputs.new_tokens)
        shape_checker(vectors, ('batch', 't', 'embedding_dim'))
        
        #2º passo. Processa o vetor de embeddings com a camada GRU
        rnn_output, state = self.gru(vectors, initial_state=state)
        
        shape_checker(rnn_output, ('batch', 't', 'dec_units'))
        shape_checker(state, ('batch', 'dec_units'))
        
        #3º passo. Usa a saída da camda GRU como query para a camada de atenção
        context_vector, attention_weights = self.attention( query=rnn_output, value=inputs.enc_output, mask=inputs.mask)
        
        shape_checker(context_vector, ('batch', 't', 'dec_units'))
        shape_checker(attention_weights, ('batch', 't', 's'))
        
        #4º passo. Eqn. (3): Join the context_vector and rnn_output [ct; ht] shape: (batch t, value_units + query_units)
        context_and_rnn_output = tf.concat([context_vector, rnn_output], axis=-1)
        
        #5º passo. Eqn. (3): `at = tanh(Wc@[ct; ht])`
        attention_vector = self.Wc(context_and_rnn_output)
        shape_checker(attention_vector, ('batch', 't', 'dec_units'))
        
        #6º passo. Gera as previsões logísticas:
        logits = self.fc(attention_vector)
        shape_checker(logits, ('batch', 't', 'output_vocab_size'))
        
        return DecoderOutput(logits, attention_weights), state


## Testando o decodificador

In [40]:
# Relembrando os dados

print(f'frase: {example_target_batch[1]}')
print(f'frase vetorizada: {output_text_processor(example_target_batch[1])}')

#O decoder precisa ter o vocabulário para converter os tokens
#A camada de embedding precisa ter o mesmo tamanho da camada do codificador
print(f'embedding: {embedding_dim}')
print(f'unidades: {units}') #Relemebrar

decoder = Decoder(output_text_processor.vocabulary_size(), embedding_dim, units)

example_output_tokens = output_text_processor(example_target_batch)
start_index = output_text_processor.get_vocabulary().index('[START]')
first_token = tf.constant([[start_index]] * example_output_tokens.shape[0]) #revisar isso

# Testando o decoder
dec_result, dec_state = decoder(
    inputs = DecoderInput(new_tokens=first_token, enc_output=example_enc_output, mask=(example_tokens != 0 )),
    state = example_enc_state
)

print(f'\nshape do cálculo logístico: (batch_size, t, output_vocab_size) {dec_result.logits.shape}')
print(f'shape do estado do decoder: (batch_size, dec_units) {dec_state.shape}')

sampled_token = tf.random.categorical(dec_result.logits[:, 0, :], num_samples=1)

#decodificando os tokens usando o vocabulário
vocab = np.array(output_text_processor.get_vocabulary())
first_word = vocab[sampled_token.numpy()]
first_word[:5]

print(f'\nVocabulário: {vocab}')
print(f'Palavras: {first_word[:5]}')


frase: b'Tom will get a better job with a college degree.'
frase vetorizada: [   2    9   49   66   10  192  183   36   10  960 3769    4    3]
embedding: 256
unidades: 1024

shape do cálculo logístico: (batch_size, t, output_vocab_size) (64, 1, 5000)
shape do estado do decoder: (batch_size, dec_units) (64, 1024)

Vocabulário: ['' '[UNK]' '[START]' ... 'productive' 'printer' 'principles']
Palavras: [['lot']
 ['blue']
 ['award']
 ['sells']
 ['hi']]
