In [1]:
import gc
import torch

def cleanup():
    gc.collect()
    torch.cuda.empty_cache()
    # Ativa a sugestão do próprio erro para evitar fragmentação
    import os
    os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"

cleanup()

In [2]:
import torch
from torch import nn
from transformers import AutoTokenizer
from datasets import load_dataset
from torch.utils.data import DataLoader
from torch.amp import autocast, GradScaler
from collections import Counter
from datasets import load_dataset
import random

In [3]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
scaler = GradScaler(enabled=(device.type == "cuda"))

In [4]:
class SimpleTokenizer:
    def __init__(self, min_freq=2):
        self.pad_token = "<pad>"
        self.bos_token = "<bos>"
        self.eos_token = "<eos>"
        self.unk_token = "<unk>"

        self.special_tokens = [
            self.pad_token,
            self.bos_token,
            self.eos_token,
            self.unk_token
        ]

        self.word2id = {tok: i for i, tok in enumerate(self.special_tokens)}
        self.id2word = {i: tok for tok, i in self.word2id.items()}
        self.min_freq = min_freq

    def build_vocab(self, texts):
        counter = Counter()
        for text in texts:
            counter.update(text.split())

        for word, freq in counter.items():
            if freq >= self.min_freq and word not in self.word2id:
                idx = len(self.word2id)
                self.word2id[word] = idx
                self.id2word[idx] = word

    @property
    def vocab_size(self):
        return len(self.word2id)

    @property
    def pad_token_id(self):
        return self.word2id[self.pad_token]

    @property
    def bos_token_id(self):
        return self.word2id[self.bos_token]

    @property
    def eos_token_id(self):
        return self.word2id[self.eos_token]

    @property
    def unk_token_id(self):
        return self.word2id[self.unk_token]

    def encode(self, text):
        tokens = text.split()
        ids = [self.word2id.get(t, self.unk_token_id) for t in tokens]
        return [self.bos_token_id] + ids + [self.eos_token_id]

    def decode(self, ids, skip_special_tokens=True):
        words = []
        for i in ids:
            w = self.id2word.get(i, self.unk_token)
            if skip_special_tokens and w in self.special_tokens:
                continue
            words.append(w)
        return " ".join(words)


In [5]:
class TinyTransformer(nn.Module):
    def __init__(self, vocab_size, emb_dim=256):
        super().__init__()

        self.emb = nn.Embedding(vocab_size, emb_dim)
        self.pos_emb = nn.Parameter(torch.zeros(1, 200, emb_dim))

        self.transformer = nn.Transformer(
            d_model=emb_dim,
            nhead=4,
            num_encoder_layers=4,
            num_decoder_layers=4,
            dim_feedforward=512,
            dropout=0.0,
            batch_first=True,
             norm_first=True
        )

        self.fc_out = nn.Linear(emb_dim, vocab_size)

    def forward(self, src, tgt):
        tgt_mask = torch.triu(
            torch.ones(tgt.size(1), tgt.size(1), device=src.device),
            diagonal=1
        ).bool()

        src_pad = (src == tokenizer.pad_token_id)
        tgt_pad = (tgt == tokenizer.pad_token_id)

        src = self.emb(src) + self.pos_emb[:, :src.size(1)]
        tgt = self.emb(tgt) + self.pos_emb[:, :tgt.size(1)]

        out = self.transformer(
            src, tgt,
            tgt_mask=tgt_mask,
            src_key_padding_mask=src_pad,
            tgt_key_padding_mask=tgt_pad,
            memory_key_padding_mask=src_pad
        )

        return self.fc_out(out)


In [6]:
raw = load_dataset("VanessaSchenkel/translation-en-pt", split="train")
print(raw[0])

README.md:   0%|          | 0.00/743 [00:00<?, ?B/s]

translate-en-pt.json:   0%|          | 0.00/59.8M [00:00<?, ?B/s]

Generating train split: 0 examples [00:00, ? examples/s]

{'data': {'id': '0', 'translation': {'english': "Let's try something.", 'portuguese': 'Vamos tentar alguma coisa!'}}}


In [7]:
raw = load_dataset("VanessaSchenkel/translation-en-pt", split="train")

dados = []
for item in raw:
    en = item["data"]["translation"]["english"].lower()
    pt = item["data"]["translation"]["portuguese"].lower()
    if len(en.split()) <= 20:
        dados.append((en, pt))

dados = random.sample(dados, 100000)
print(dados[0])

('are you planning to return to boston soon?', 'você está planejando voltar para boston em breve?')


In [8]:
texts = [en for en, pt in dados] + [pt for en, pt in dados]
tokenizer = SimpleTokenizer(min_freq=2)
tokenizer.build_vocab(texts)

print("Vocab size:", tokenizer.vocab_size)

Vocab size: 33974


In [9]:
train_loader = DataLoader(
    dados,
    batch_size=64,
    shuffle=True
)

In [10]:
modelo = TinyTransformer(tokenizer.vocab_size).to(device)

optimizer = torch.optim.AdamW(modelo.parameters(), lr=5e-4)
loss_fn = nn.CrossEntropyLoss(ignore_index=tokenizer.pad_token_id)



In [11]:
def pad_batch(seqs, pad_id):
    max_len = max(len(s) for s in seqs)
    return torch.tensor(
        [s + [pad_id] * (max_len - len(s)) for s in seqs],
        device=device
    )

In [12]:
def greedy_decode(model, src_text, tokenizer, max_len=30):
    model.eval()

    src_ids = torch.tensor(
        [tokenizer.encode(src_text)],
        device=device
    )

    tgt = torch.tensor(
        [[tokenizer.bos_token_id]],
        device=device
    )

    generated = []

    with torch.no_grad():
        for _ in range(max_len):
            logits = model(src_ids, tgt)
            next_token = logits[:, -1].argmax(-1).item()

            if next_token == tokenizer.eos_token_id:
                break

            generated.append(next_token)
            tgt = torch.cat(
                [tgt, torch.tensor([[next_token]], device=device)],
                dim=1
            )

    return tokenizer.decode(generated)


In [13]:
print("Iniciando treinamento...")

for epoch in range(21):
    total_loss = 0.0

    for src_texts, tgt_texts in train_loader:
        optimizer.zero_grad()

        src_ids = [tokenizer.encode(s) for s in src_texts]
        tgt_ids = [tokenizer.encode(t) for t in tgt_texts]

        src = pad_batch(src_ids, tokenizer.pad_token_id)
        tgt = pad_batch(tgt_ids, tokenizer.pad_token_id)

        tgt_input = tgt[:, :-1]
        tgt_output = tgt[:, 1:]

        with autocast(device_type="cuda", enabled=(device.type == "cuda")):
            logits = modelo(src, tgt_input)
            loss = loss_fn(
                logits.reshape(-1, logits.size(-1)),
                tgt_output.reshape(-1)
            )

        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        total_loss += loss.item()

    if epoch % 5 == 0:
        print(f"Epoch {epoch} | loss: {total_loss / len(train_loader):.4f}")

print("Treinamento concluído!")


Iniciando treinamento...
Epoch 0 | loss: 3.6982
Epoch 5 | loss: 0.7018
Epoch 10 | loss: 0.3726
Epoch 15 | loss: 0.2650
Epoch 20 | loss: 0.2121
Treinamento concluído!


In [14]:
print(greedy_decode(modelo, "i like apples", tokenizer))
print(greedy_decode(modelo, "the cat is black", tokenizer))
print(greedy_decode(modelo, "Eating fruit is good for you.", tokenizer))
print(greedy_decode(modelo, "Do you know them?", tokenizer))

eu gosto de maçãs
o gato está preto.
a é boa para você.
você as ajudará?
