Célula 1 – Instalação e Importação das Bibliotecas

*   Item da lista
*   Item da lista



In [None]:
!pip install tensorflow_datasets
!pip install -U tensorflow-text

Imports

In [25]:
import tensorflow as tf
import tensorflow_datasets as tfds
import tensorflow_text as tf_text
import numpy as np
import matplotlib.pyplot as plt


Carregamento do Dataset e Visualização de Exemplos

In [26]:
# Carrega o dataset 'ted_hrlr_translate/pt_to_en' e separa em treino e validação
examples, metadata = tfds.load('ted_hrlr_translate/pt_to_en', with_info=True, as_supervised=True)
train_examples, val_examples = examples['train'], examples['validation']

# Exibe 3 exemplos do dataset
for pt, en in train_examples.take(3):
    print(f'Português: {pt.numpy().decode("utf-8")}')
    print(f'Inglês: {en.numpy().decode("utf-8")}\n')


Português: e quando melhoramos a procura , tiramos a única vantagem da impressão , que é a serendipidade .
Inglês: and when you improve searchability , you actually take away the one advantage of print , which is serendipity .

Português: mas e se estes fatores fossem ativos ?
Inglês: but what if it were active ?

Português: mas eles não tinham a curiosidade de me testar .
Inglês: but they did n't test for curiosity .



 Criação dos Tokenizers

In [28]:
# Cria o tokenizer para o Português
tokenizer_pt = tfds.deprecated.text.SubwordTextEncoder.build_from_corpus(
    (pt.numpy() for pt, _ in train_examples), target_vocab_size=2**13)

# Cria o tokenizer para o Inglês
tokenizer_en = tfds.deprecated.text.SubwordTextEncoder.build_from_corpus(
    (en.numpy() for _, en in train_examples), target_vocab_size=2**13)


Preparação dos Dados

In [29]:
# Função para codificar as sentenças com tokens especiais de início e fim
def encode(lang1, lang2):
    lang1 = [tokenizer_pt.vocab_size] + tokenizer_pt.encode(lang1.numpy()) + [tokenizer_pt.vocab_size+1]
    lang2 = [tokenizer_en.vocab_size] + tokenizer_en.encode(lang2.numpy()) + [tokenizer_en.vocab_size+1]
    return lang1, lang2

# Função que integra o tf.py_function para usar a função de encode
def tf_encode(pt, en):
    result_pt, result_en = tf.py_function(encode, [pt, en], [tf.int64, tf.int64])
    result_pt.set_shape([None])
    result_en.set_shape([None])
    return result_pt, result_en

BUFFER_SIZE = 20000
BATCH_SIZE = 64

# Prepara o dataset de treino
train_dataset = train_examples.map(tf_encode)
train_dataset = train_dataset.filter(lambda x, y: tf.logical_and(tf.size(x) <= 40, tf.size(y) <= 40))
train_dataset = train_dataset.cache().shuffle(BUFFER_SIZE).padded_batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)

# Prepara o dataset de validação
val_dataset = val_examples.map(tf_encode)
val_dataset = val_dataset.filter(lambda x, y: tf.logical_and(tf.size(x) <= 40, tf.size(y) <= 40))
val_dataset = val_dataset.padded_batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)


Definição do Modelo Transformer

In [30]:
# Função de codificação posicional
def positional_encoding(position, d_model):
    angle_rads = np.arange(position)[:, np.newaxis] / np.power(
        10000, (2 * (np.arange(d_model)[np.newaxis, :] // 2)) / np.float32(d_model))
    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
    pos_encoding = angle_rads[np.newaxis, ...]
    return tf.cast(pos_encoding, dtype=tf.float32)

# Camada de atenção multi-cabeças
class MultiHeadAttention(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads):
        super().__init__()
        assert d_model % num_heads == 0
        self.num_heads = num_heads
        self.depth = d_model // num_heads
        self.wq = tf.keras.layers.Dense(d_model)
        self.wk = tf.keras.layers.Dense(d_model)
        self.wv = tf.keras.layers.Dense(d_model)
        self.dense = tf.keras.layers.Dense(d_model)

    def split_heads(self, x):
        x = tf.reshape(x, (tf.shape(x)[0], -1, self.num_heads, self.depth))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def call(self, v, k, q, mask):
        q = self.split_heads(self.wq(q))
        k = self.split_heads(self.wk(k))
        v = self.split_heads(self.wv(v))
        scaled_attention, _ = scaled_dot_product_attention(q, k, v, mask)
        scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])
        concat_attention = tf.reshape(scaled_attention, (tf.shape(scaled_attention)[0], -1, self.num_heads * self.depth))
        return self.dense(concat_attention)

# Função de atenção com produto escalar
def scaled_dot_product_attention(q, k, v, mask):
    matmul_qk = tf.matmul(q, k, transpose_b=True)
    dk = tf.cast(tf.shape(k)[-1], tf.float32)
    scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)
    if mask is not None:
        scaled_attention_logits += (mask * -1e9)
    attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)
    output = tf.matmul(attention_weights, v)
    return output, attention_weights

# Máscara de padding
def create_padding_mask(seq):
    seq = tf.cast(tf.math.equal(seq, 0), tf.float32)
    return seq[:, tf.newaxis, tf.newaxis, :]

# Máscara para evitar ver tokens futuros
def create_look_ahead_mask(size):
    mask = 1 - tf.linalg.band_part(tf.ones((size, size)), -1, 0)
    return mask

# Rede Feed Forward Pontual
def point_wise_feed_forward_network(d_model, dff):
    return tf.keras.Sequential([tf.keras.layers.Dense(dff, activation='relu'),
                                tf.keras.layers.Dense(d_model)])

# Camada do Encoder
class EncoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, dff, rate=0.1):
        super().__init__()
        self.mha = MultiHeadAttention(d_model, num_heads)
        self.ffn = point_wise_feed_forward_network(d_model, dff)
        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = tf.keras.layers.Dropout(rate)
        self.dropout2 = tf.keras.layers.Dropout(rate)

    def call(self, x, *, training=False, mask=None):
        attn_output = self.mha(x, x, x, mask=mask)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(x + attn_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        out2 = self.layernorm2(out1 + ffn_output)
        return out2

# Encoder completo
class Encoder(tf.keras.layers.Layer):
    def __init__(self, num_layers, d_model, num_heads, dff,
                 input_vocab_size, rate=0.1):
        super().__init__()
        self.d_model = d_model
        self.num_layers = num_layers
        self.embedding = tf.keras.layers.Embedding(input_vocab_size, d_model)
        self.pos_encoding = positional_encoding(1000, d_model)
        self.enc_layers = [EncoderLayer(d_model, num_heads, dff, rate)
                           for _ in range(num_layers)]
        self.dropout = tf.keras.layers.Dropout(rate)

    def call(self, x, *, training=False, mask=None):
        seq_len = tf.shape(x)[1]
        x = self.embedding(x)
        x = tf.cast(x, tf.float32)
        x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        x += self.pos_encoding[:, :seq_len, :]
        x = self.dropout(x, training=training)
        for i in range(self.num_layers):
            x = self.enc_layers[i](x, training=training, mask=mask)
        return x

# Camada do Decoder
class DecoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, dff, rate=0.1):
        super().__init__()
        self.mha1 = MultiHeadAttention(d_model, num_heads)
        self.mha2 = MultiHeadAttention(d_model, num_heads)
        self.ffn = point_wise_feed_forward_network(d_model, dff)
        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm3 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = tf.keras.layers.Dropout(rate)
        self.dropout2 = tf.keras.layers.Dropout(rate)
        self.dropout3 = tf.keras.layers.Dropout(rate)

    def call(self, x, enc_output, *, training=False,
             look_ahead_mask=None, padding_mask=None):
        attn1 = self.mha1(x, x, x, mask=look_ahead_mask)
        attn1 = self.dropout1(attn1, training=training)
        out1 = self.layernorm1(attn1 + x)
        attn2 = self.mha2(enc_output, enc_output, out1, mask=padding_mask)
        attn2 = self.dropout2(attn2, training=training)
        out2 = self.layernorm2(attn2 + out1)
        ffn_output = self.ffn(out2)
        ffn_output = self.dropout3(ffn_output, training=training)
        out3 = self.layernorm3(ffn_output + out2)
        return out3

# Decoder completo
class Decoder(tf.keras.layers.Layer):
    def __init__(self, num_layers, d_model, num_heads, dff,
                 target_vocab_size, rate=0.1):
        super().__init__()
        self.d_model = d_model
        self.num_layers = num_layers
        self.embedding = tf.keras.layers.Embedding(target_vocab_size, d_model)
        self.pos_encoding = positional_encoding(1000, d_model)
        self.dec_layers = [DecoderLayer(d_model, num_heads, dff, rate)
                           for _ in range(num_layers)]
        self.dropout = tf.keras.layers.Dropout(rate)

    def call(self, x, enc_output, *, training=False,
             look_ahead_mask=None, padding_mask=None):
        seq_len = tf.shape(x)[1]
        x = self.embedding(x)
        x = tf.cast(x, tf.float32)
        x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        x += self.pos_encoding[:, :seq_len, :]
        x = self.dropout(x, training=training)
        for i in range(self.num_layers):
            x = self.dec_layers[i](x, enc_output, training=training,
                                   look_ahead_mask=look_ahead_mask,
                                   padding_mask=padding_mask)
        return x

# Modelo Transformer completo
class Transformer(tf.keras.Model):
    def __init__(self, num_layers, d_model, num_heads, dff,
                 input_vocab_size, target_vocab_size, rate=0.1):
        super().__init__()
        self.encoder = Encoder(num_layers, d_model, num_heads, dff,
                               input_vocab_size, rate)
        self.decoder = Decoder(num_layers, d_model, num_heads, dff,
                               target_vocab_size, rate)
        self.final_layer = tf.keras.layers.Dense(target_vocab_size)

    def call(self, inp, tar, *, training=False):
        enc_padding_mask = create_padding_mask(inp)
        look_ahead_mask = create_look_ahead_mask(tf.shape(tar)[1])
        dec_padding_mask = create_padding_mask(inp)
        enc_output = self.encoder(inp, training=training, mask=enc_padding_mask)
        dec_output = self.decoder(
            tar, enc_output, training=training,
            look_ahead_mask=look_ahead_mask, padding_mask=dec_padding_mask)
        final_output = self.final_layer(dec_output)
        return final_output


Adicionando a Métrica de Accuracy

In [None]:
# Função para calcular a acurácia ignorando os tokens de padding (token 0)
def compute_accuracy(tar_real, predictions):
    predicted_ids = tf.argmax(predictions, axis=-1)
    mask = tf.math.logical_not(tf.math.equal(tar_real, 0))
    matches = tf.cast(tf.math.equal(predicted_ids, tar_real), tf.float32)
    accuracy = tf.reduce_sum(matches * tf.cast(mask, tf.float32)) / tf.reduce_sum(tf.cast(mask, tf.float32))
    return accuracy


Configuração do Otimizador e Função de Perda

In [31]:
learning_rate = tf.keras.optimizers.schedules.ExponentialDecay(
    initial_learning_rate=0.001, decay_steps=10000, decay_rate=0.96)
optimizer = tf.keras.optimizers.Adam(learning_rate)

loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none')

def loss_function(real, pred):
    mask = tf.math.logical_not(tf.math.equal(real, 0))
    loss_ = loss_object(real, pred)
    mask = tf.cast(mask, dtype=loss_.dtype)
    loss_ *= mask
    return tf.reduce_sum(loss_) / tf.reduce_sum(mask)


Treinamento do Modelo

In [None]:
import time

# Função que treina o modelo por uma época, calculando perda e acurácia
def train_epoch(model, dataset, optimizer, loss_function):
    """
    Treina o modelo por uma época inteira e calcula a perda e acurácia média.

    Args:
        model: O modelo Transformer.
        dataset: Conjunto de dados de treinamento.
        optimizer: Otimizador do TensorFlow.
        loss_function: Função de perda utilizada.

    Returns:
        Tuple com a perda média e acurácia média da época.
    """
    start = time.time()
    total_loss = 0.0
    total_accuracy = 0.0
    batches = 0

    for (batch, (inp, tar)) in enumerate(dataset):
        tar_inp = tar[:, :-1]
        tar_real = tar[:, 1:]
        with tf.GradientTape() as tape:
            predictions = model(inp, tar_inp, training=True)
            loss = loss_function(tar_real, predictions)
        gradients = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))
        
        # Calcula a acurácia para o batch atual
        batch_accuracy = compute_accuracy(tar_real, predictions)
        
        total_loss += loss.numpy()
        total_accuracy += batch_accuracy.numpy()
        batches += 1
        
        if batch % 50 == 0:
            print(f'Batch {batch}: Loss {loss.numpy():.4f}, Accuracy {batch_accuracy.numpy():.4f}')
    
    epoch_loss = total_loss / batches
    epoch_accuracy = total_accuracy / batches
    print(f'Epoch Loss: {epoch_loss:.4f}, Epoch Accuracy: {epoch_accuracy:.4f}, Time: {time.time() - start:.2f}s')
    return epoch_loss, epoch_accuracy


Epoch 1/10
Batch 0 Loss 9.0045
Batch 50 Loss 6.2222
Batch 100 Loss 5.7607
Batch 150 Loss 5.3140
Batch 200 Loss 5.0649
Batch 250 Loss 4.9034
Batch 300 Loss 4.6653
Batch 350 Loss 4.8617
Batch 400 Loss 4.4801
Batch 450 Loss 4.4654
Batch 500 Loss 4.2953
Batch 550 Loss 4.3886
Batch 600 Loss 4.2095
Batch 650 Loss 4.1269
Batch 700 Loss 4.0038
Epoch 2/10
Batch 0 Loss 4.0263
Batch 50 Loss 3.6415
Batch 100 Loss 3.8744
Batch 150 Loss 3.8057
Batch 200 Loss 3.7986
Batch 250 Loss 3.4736
Batch 300 Loss 3.5591
Batch 350 Loss 3.7871
Batch 400 Loss 3.3668
Batch 450 Loss 3.5351
Batch 500 Loss 3.3349
Batch 550 Loss 3.4294
Batch 600 Loss 3.6005
Batch 650 Loss 3.4399
Batch 700 Loss 3.3751
Epoch 3/10
Batch 0 Loss 3.1173
Batch 50 Loss 3.1209
Batch 100 Loss 2.8753
Batch 150 Loss 2.9303
Batch 200 Loss 3.1350
Batch 250 Loss 2.9459
Batch 300 Loss 2.8699
Batch 350 Loss 3.3069
Batch 400 Loss 2.8119
Batch 450 Loss 2.7582
Batch 500 Loss 2.8467
Batch 550 Loss 2.8481
Batch 600 Loss 2.8170
Batch 650 Loss 2.8465
Batch 70

Função para Traduzir uma Frase

In [None]:
# Função simples para traduzir uma frase usando o modelo treinado.
def simple_translate(sentence):
    """
    Traduz uma sentença de entrada utilizando o modelo Transformer treinado.
    A implementação tokeniza a sentença de entrada e gera a tradução token a token.
    
    Args:
        sentence: String com a sentença em Português.
    
    Returns:
        String com a tradução prevista em Inglês.
    """
    # Tokeniza a sentença e adiciona tokens de início e fim
    tokenized_input = [tokenizer_pt.vocab_size] + tokenizer_pt.encode(sentence) + [tokenizer_pt.vocab_size+1]
    input_tensor = tf.expand_dims(tokenized_input, 0)
    # Token inicial para a tradução em inglês
    output = tf.expand_dims([tokenizer_en.vocab_size], 0)
    
    for i in range(40):
        predictions = transformer(input_tensor, output, training=False)
        predictions = predictions[:, -1, :]  # Pega o último token previsto
        predicted_id = tf.argmax(predictions, axis=-1, output_type=tf.int32).numpy()[0]
        if predicted_id == tokenizer_en.vocab_size+1:
            break
        output = tf.concat([output, tf.expand_dims([predicted_id], 0)], axis=-1)
    
    # Decodifica os tokens para formar a sentença traduzida
    translated_sentence = tokenizer_en.decode([int(i) for i in output.numpy()[0] if i < tokenizer_en.vocab_size])
    return translated_sentence
