# Transformer PT→EN — versão **rápida e balanceada** (GPU-only)

Este notebook é **inspirado no tutorial oficial do TensorFlow** de tradução com Transformer, porém foi **ajustado para um melhor equilíbrio entre tempo de execução e qualidade**. A ideia é você **rodar tudo no Colab com GPU**, ver resultados rapidamente e ainda ter “alavancas” simples para melhorar a qualidade quando quiser.

## O que muda em relação ao tutorial
- **Subset do dataset (TFDS)** para acelerar o treinamento inicial.
- **Modelo menor**, porém não minúsculo (menos camadas e dimensões), mantendo a essência do Transformer (encoder–decoder, atenção multi-cabeças e positional encoding).
- **Hiperparâmetros acessíveis** (épocas, tamanho do lote, número de tokens, tamanho do modelo) concentrados em uma célula para você ajustar facilmente.
- **Treino apenas na GPU** (GPU-only), com opção de **mixed precision** para ganhar desempenho quando suportado.
- **Inferência com beam search** opcional (melhora a tradução sem precisar re-treinar).

> Objetivo: viabilizar uma experiência prática e repetível do Transformer em PT→EN, com um ponto de partida rápido e espaço para escalar qualidade conforme a necessidade.

In [14]:
# CÉLULA 01 — Deps e GPU
%pip -q install -U "protobuf~=3.20.3" tensorflow "tensorflow-text" tensorflow_datasets

import tensorflow as tf, sys
print("Python:", sys.version)
print("TF:", tf.__version__)

gpus = tf.config.list_physical_devices('GPU')
assert gpus, "Ative o runtime GPU (Runtime > Change runtime type > GPU)."
print("GPU detectada:", gpus)

Python: 3.11.13 (main, Jun  4 2025, 08:57:29) [GCC 11.4.0]
TF: 2.19.1
GPU detectada: [PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]


In [15]:
# CÉLULA 02 — Imports e configs (melhor qualidade)
import tensorflow as tf
import tensorflow_text
import tensorflow_datasets as tfds
import numpy as np, time, json, pathlib

MAX_TOKENS   = 128    # ↑ menos truncamento
BATCH_SIZE   = 64
EPOCHS       = 8      # ↑ mais treino

TRAIN_TAKE   = 30000  # ↑ mais dados (ajuste se faltar VRAM)
VAL_TAKE     = 1500

# Modelo médio
NUM_LAYERS   = 4
D_MODEL      = 128
DFF          = 512
NUM_HEADS    = 8
DROPOUT      = 0.1

In [16]:
# CÉLULA 03 — Carregar dataset e fazer subset
examples, info = tfds.load('ted_hrlr_translate/pt_to_en', with_info=True, as_supervised=True)
train_examples, val_examples = examples['train'], examples['validation']

train_examples = train_examples.take(TRAIN_TAKE)
val_examples   = val_examples.take(VAL_TAKE)

for pt, en in train_examples.take(2):
    print("PT:", pt.numpy().decode())
    print("EN:", en.numpy().decode(), "\n")

PT: e quando melhoramos a procura , tiramos a única vantagem da impressão , que é a serendipidade .
EN: and when you improve searchability , you actually take away the one advantage of print , which is serendipity . 

PT: mas e se estes fatores fossem ativos ?
EN: but what if it were active ? 



In [17]:
# CÉLULA 04 — Tokenizers prontos (SavedModel)
import zipfile, pathlib

model_name = 'ted_hrlr_translate_pt_en_converter'
zip_path = tf.keras.utils.get_file(
    fname=f'{model_name}.zip',
    origin=f'https://storage.googleapis.com/download.tensorflow.org/models/{model_name}.zip',
    cache_dir='/content', cache_subdir='', extract=False
)
with zipfile.ZipFile(zip_path, 'r') as zf:
    zf.extractall('/content')

cands = list(pathlib.Path('/content').rglob('saved_model.pb'))
assert cands, "saved_model.pb não encontrado após extração."
tokenizers = tf.saved_model.load(str(cands[0].parent))
print("OK tokenizers. Vocabs:", int(tokenizers.pt.get_vocab_size().numpy()), int(tokenizers.en.get_vocab_size().numpy()))

OK tokenizers. Vocabs: 7765 7010


In [18]:
# CÉLULA 05 — Pipeline tf.data
def prepare_batch(pt, en):
    pt = tokenizers.pt.tokenize(pt)[:, :MAX_TOKENS].to_tensor()
    en = tokenizers.en.tokenize(en)[:, :(MAX_TOKENS+1)]
    en_in  = en[:, :-1].to_tensor()
    en_lbl = en[:,  1:].to_tensor()
    return (pt, en_in), en_lbl

def make_batches(ds, training=True):
    if training: ds = ds.shuffle(2048, reshuffle_each_iteration=True)
    return (ds.batch(BATCH_SIZE)
              .map(prepare_batch, num_parallel_calls=tf.data.AUTOTUNE)
              .cache()
              .prefetch(tf.data.AUTOTUNE))

train_batches = make_batches(train_examples, True)
val_batches   = make_batches(val_examples,   False)

print(next(iter(train_batches))[0][0].shape)  # sanity: (batch, len)

(64, 128)


In [19]:
# CÉLULA 06 — Camadas essenciais (mínimas)
def positional_encoding(length, depth):
    depth = depth/2
    positions = np.arange(length)[:, np.newaxis]
    depths = np.arange(depth)[np.newaxis, :]/depth
    angle_rates = 1/(10000**depths)
    angle_rads = positions * angle_rates
    pe = np.concatenate([np.sin(angle_rads), np.cos(angle_rads)], axis=-1)
    return tf.cast(pe, tf.float32)

class PositionalEmbedding(tf.keras.layers.Layer):
    def __init__(self, vocab_size, d_model):
        super().__init__()
        self.d_model = d_model
        self.embedding = tf.keras.layers.Embedding(vocab_size, d_model, mask_zero=True)
        self.pos_encoding = positional_encoding(2048, d_model)
    def compute_mask(self, *args, **kwargs):
        return self.embedding.compute_mask(*args, **kwargs)
    def call(self, x):
        length = tf.shape(x)[1]
        x = self.embedding(x) * tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        return x + self.pos_encoding[tf.newaxis, :length, :]

class BaseAttention(tf.keras.layers.Layer):
    def __init__(self, num_heads, key_dim, dropout=0.1):
        super().__init__()
        self.mha = tf.keras.layers.MultiHeadAttention(num_heads=num_heads, key_dim=key_dim, dropout=dropout)
        self.add = tf.keras.layers.Add()
        self.norm = tf.keras.layers.LayerNormalization()

class GlobalSelfAttention(BaseAttention):
    def call(self, x):
        attn = self.mha(x, x, use_causal_mask=False)
        return self.norm(self.add([x, attn]))

class CausalSelfAttention(BaseAttention):
    def call(self, x):
        attn = self.mha(x, x, use_causal_mask=True)
        return self.norm(self.add([x, attn]))

class CrossAttention(BaseAttention):
    def call(self, x, context):
        attn = self.mha(x, context)
        return self.norm(self.add([x, attn]))

class FeedForward(tf.keras.layers.Layer):
    def __init__(self, d_model, dff, dropout=0.1):
        super().__init__()
        self.seq = tf.keras.Sequential([
            tf.keras.layers.Dense(dff, activation='relu'),
            tf.keras.layers.Dense(d_model),
            tf.keras.layers.Dropout(dropout),
        ])
        self.add = tf.keras.layers.Add()
        self.norm = tf.keras.layers.LayerNormalization()
    def call(self, x):
        return self.norm(self.add([x, self.seq(x)]))

In [20]:
# CÉLULA 07 — Modelo mínimo
class EncoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, dff, drop):
        super().__init__()
        self.sa = GlobalSelfAttention(num_heads, d_model, drop)
        self.ff = FeedForward(d_model, dff, drop)
    def call(self, x):
        x = self.sa(x); x = self.ff(x); return x

class DecoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, dff, drop):
        super().__init__()
        self.ca = CausalSelfAttention(num_heads, d_model, drop)
        self.xa = CrossAttention(num_heads, d_model, drop)
        self.ff = FeedForward(d_model, dff, drop)
    def call(self, x, context):
        x = self.ca(x); x = self.xa(x, context); x = self.ff(x); return x

class Encoder(tf.keras.layers.Layer):
    def __init__(self, n, d_model, num_heads, dff, vocab, drop):
        super().__init__()
        self.pe = PositionalEmbedding(vocab, d_model)
        self.drop = tf.keras.layers.Dropout(drop)
        self.layers = [EncoderLayer(d_model, num_heads, dff, drop) for _ in range(n)]
    def call(self, x):
        x = self.drop(self.pe(x))
        for l in self.layers: x = l(x)
        return x

class Decoder(tf.keras.layers.Layer):
    def __init__(self, n, d_model, num_heads, dff, vocab, drop):
        super().__init__()
        self.pe = PositionalEmbedding(vocab, d_model)
        self.drop = tf.keras.layers.Dropout(drop)
        self.layers = [DecoderLayer(d_model, num_heads, dff, drop) for _ in range(n)]
    def call(self, x, context):
        x = self.drop(self.pe(x))
        for l in self.layers: x = l(x, context)
        return x

class Transformer(tf.keras.Model):
    def __init__(self, *, n, d_model, num_heads, dff, in_vocab, tgt_vocab, drop):
        super().__init__()
        self.encoder = Encoder(n, d_model, num_heads, dff, in_vocab, drop)
        self.decoder = Decoder(n, d_model, num_heads, dff, tgt_vocab, drop)
        self.final = tf.keras.layers.Dense(tgt_vocab)
    def call(self, inputs):
        context, x = inputs
        c = self.encoder(context)
        x = self.decoder(x, c)
        logits = self.final(x)
        try: del logits._keras_mask
        except: pass
        return logits

In [21]:
# CÉLULA 08 — Treino rápido na GPU
gpu_strategy = tf.distribute.OneDeviceStrategy("/GPU:0")
with gpu_strategy.scope():
    transformer = Transformer(
        n=NUM_LAYERS, d_model=D_MODEL, num_heads=NUM_HEADS, dff=DFF,
        in_vocab=int(tokenizers.pt.get_vocab_size().numpy()),
        tgt_vocab=int(tokenizers.en.get_vocab_size().numpy()),
        drop=DROPOUT
    )
    # "tocar" o modelo
    (pt_b, en_in_b), en_lbl_b = next(iter(train_batches))
    _ = transformer((pt_b, en_in_b))

    class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):
        def __init__(self, d_model, warmup_steps=2000):  # warmup menor
            super().__init__()
            self.d_model = tf.cast(d_model, tf.float32)
            self.warmup_steps = warmup_steps
        def __call__(self, step):
            step = tf.cast(step, tf.float32)
            return tf.math.rsqrt(self.d_model) * tf.math.minimum(
                tf.math.rsqrt(step), step * (self.warmup_steps ** -1.5)
            )

    def masked_loss(y_true, y_pred):
        y_true = tf.cast(y_true, tf.int32)
        loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none')(y_true, y_pred)
        mask = tf.cast(tf.not_equal(y_true, 0), tf.float32)
        return tf.reduce_sum(loss * mask) / tf.reduce_sum(mask)

    def masked_accuracy(y_true, y_pred):
        y_true = tf.cast(y_true, tf.int64)
        y_pred = tf.argmax(y_pred, axis=-1)
        mask = tf.cast(tf.not_equal(y_true, 0), tf.float32)
        match = tf.cast(tf.equal(y_true, y_pred), tf.float32)
        return tf.reduce_sum(match * mask) / tf.reduce_sum(mask)

    lr = CustomSchedule(D_MODEL)
    opt = tf.keras.optimizers.Adam(lr, beta_1=0.9, beta_2=0.98, epsilon=1e-9)
    transformer.compile(optimizer=opt, loss=masked_loss, metrics=[masked_accuracy])

t0 = time.time()
hist = transformer.fit(train_batches, epochs=EPOCHS, validation_data=val_batches, verbose=1)
t1 = time.time()
print(f"Tempo total: {t1 - t0:.1f}s")



Epoch 1/8
[1m469/469[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m130s[0m 237ms/step - loss: 7.7166 - masked_accuracy: 0.0854 - val_loss: 5.0514 - val_masked_accuracy: 0.2476
Epoch 2/8
[1m469/469[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m108s[0m 230ms/step - loss: 4.7768 - masked_accuracy: 0.2766 - val_loss: 4.1437 - val_masked_accuracy: 0.3517
Epoch 3/8
[1m469/469[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m108s[0m 230ms/step - loss: 3.9646 - masked_accuracy: 0.3657 - val_loss: 3.6169 - val_masked_accuracy: 0.4076
Epoch 4/8
[1m469/469[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m108s[0m 230ms/step - loss: 3.4340 - masked_accuracy: 0.4198 - val_loss: 3.3371 - val_masked_accuracy: 0.4400
Epoch 5/8
[1m469/469[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m142s[0m 230ms/step - loss: 3.0383 - masked_accuracy: 0.4613 - val_loss: 3.0551 - val_masked_accuracy: 0.4822
Epoch 6/8
[1m469/469[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m108s[0m 230ms/step - loss: 2.5841 -

In [22]:
# CÉLULA 09 — Tradução greedy simples
class Translator(tf.Module):
    def __init__(self, tokenizers, transformer, max_len=MAX_TOKENS):
        self.tk = tokenizers
        self.m = transformer
        self.max_len = max_len

    @tf.function(input_signature=[tf.TensorSpec(shape=[], dtype=tf.string)])
    def __call__(self, sentence):
        if tf.rank(sentence) == 0: sentence = sentence[tf.newaxis]
        enc_in = self.tk.pt.tokenize(sentence).to_tensor()
        se = self.tk.en.tokenize([''])[0]
        start, end = se[0][tf.newaxis], se[1][tf.newaxis]
        out = tf.TensorArray(tf.int64, size=0, dynamic_size=True).write(0, start)
        for _ in tf.range(self.max_len):
            dec = tf.transpose(out.stack())
            logits = self.m([enc_in, dec], training=False)[:, -1:, :]
            next_id = tf.argmax(logits, axis=-1)
            out = out.write(out.size(), next_id[0])
            if tf.reduce_all(tf.equal(next_id, end)): break
        tokens = tf.transpose(out.stack())
        text = self.tk.en.detokenize(tokens)[0]
        return text

translator = Translator(tokenizers, transformer)

def demo(s, ref=None):
    pred = translator(tf.constant(s)).numpy().decode()
    print("PT:", s)
    print("EN:", pred)
    if ref: print("REF:", ref)
    print()

demo("este é um problema que temos que resolver.", "this is a problem we have to solve .")
demo("os meus vizinhos ouviram sobre esta ideia.", "and my neighboring homes heard about this idea .")
demo("estou aprendendo um modelo transformer pequeno.")

PT: este é um problema que temos que resolver.
EN: this is a problem that we have to solve . we have to solve .
REF: this is a problem we have to solve .

PT: os meus vizinhos ouviram sobre esta ideia.
EN: my neighbors had heard this idea about this idea .
REF: and my neighboring homes heard about this idea .

PT: estou aprendendo um modelo transformer pequeno.
EN: i ' m studying a little model , i ' m transformed with small model .

