<a href="https://colab.research.google.com/github/arelkeselbri/gsi073/blob/main/GSI073_aula0_seq2seq.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Preparação dos dados

Esta tarefa é inverter sequências de caracteres. Exemplo: **aabcd** em **dcbaa**.


In [None]:
import torch
import torch.nn as nn
import random

chars = list("abcde ")
vocab = {ch: i for i, ch in enumerate(chars)} # Cada letra, ganha um número
inv_vocab = {i: ch for ch, i in vocab.items()}# Tabela de decodificação
vocab_size = len(vocab)

def encode(s): # Codifica letras em números
    return torch.tensor([vocab[c] for c in s], dtype=torch.long)

def decode(t): # Decodifica números em letras
    return ''.join(inv_vocab[int(x)] for x in t)

def random_seq(n=5): # Cria novas sequências
    return ''.join(random.choice(chars[:-1]) for _ in range(n))

# Gerar dados
pairs = [(encode(s), encode(s[::-1])) for s in [random_seq() for _ in range(50000)]]

max_len = max(len(x) for x, _ in pairs) # pega maior sequência

def pad(x):  # Preenche conjunto de dados em pad no último índice
    return torch.cat([x, torch.tensor([vocab[' ']] * (max_len - len(x)))], dim=0)

inputs = torch.stack([pad(x) for x, _ in pairs])
targets = torch.stack([pad(y) for _, y in pairs])

train_ds = torch.utils.data.TensorDataset(inputs, targets)
train_dl = torch.utils.data.DataLoader(train_ds, batch_size=128, shuffle=True)

device = 'cuda' if torch.cuda.is_available() else 'cpu'

## Veja um par

In [None]:
print(pairs[1])

# Definição do modelo Seq2Seq com GRU

In [None]:
class Encoder(nn.Module):
    def __init__(self, vocab_size, emb_size, hidden_size):
        super().__init__()
        self.embed = nn.Embedding(vocab_size, emb_size)
        self.gru = nn.GRU(emb_size, hidden_size, batch_first=True)

    def forward(self, x):
        x = self.embed(x)
        _, h = self.gru(x)
        return h  # [1, B, H]

class Decoder(nn.Module):
    def __init__(self, vocab_size, emb_size, hidden_size):
        super().__init__()
        self.embed = nn.Embedding(vocab_size, emb_size)
        self.gru = nn.GRU(emb_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, vocab_size)

    def forward(self, x, h):
        """
        x: tensor que indica a parte prévia correta
        h: tensor que indica o estado do encoder da parte prévia
        """
        x = self.embed(x)
        out, h = self.gru(x, h)
        logits = self.fc(out)
        return logits, h # retorna o estado latente para atualizar o estado

class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, src, tgt):
        h = self.encoder(src)
        # usa contexto correto anterior e estado atual para prever o tgt[:, -1]
        logits, _ = self.decoder(tgt[:, :-1], h)
        return logits

# Código para usar o modelo treinado: inferência

In [None]:
def decode_step(decoder, token, h):
    logits, h = decoder(token, h) # obtém logits e atualiza estado da sequência
    next_token = logits[:, -1, :].argmax(-1, keepdim=True)
    return next_token, h

def predict(model, seq, max_len=10):
    model.eval()
    with torch.no_grad():
        src = pad(encode(seq)).unsqueeze(0).to(device, dtype=torch.long)
        h = model.encoder(src) # Obtém estado do modelo após processar entrada inicial

        # 'token' representa a geração passo a passo da sequência invertida
        token = torch.tensor([[vocab[' ']]], dtype=torch.long, device=device)
        seq_invertida = []
        for _ in range(max_len):
            token, h = decode_step(model.decoder, token, h)
            seq_invertida.append(token.item())
        return decode(seq_invertida)

# Preparação para treino

In [None]:
emb_size = 32
hidden_size = 64
encoder = Encoder(vocab_size, emb_size, hidden_size)
decoder = Decoder(vocab_size, emb_size, hidden_size)
model = Seq2Seq(encoder, decoder).to(device)

loss_fn = nn.CrossEntropyLoss(ignore_index=vocab[' ']) # ignora o pad: " "
opt = torch.optim.Adam(model.parameters(), lr=1e-3)

# Execução do treino

In [None]:
for epoch in range(20):
    model.train()
    total_loss = 0
    for xb, yb in train_dl:
        xb, yb = xb.to(device, dtype=torch.long), yb.to(device, dtype=torch.long)
        opt.zero_grad()
        logits = model(xb, yb)
        loss = loss_fn(logits.reshape(-1, vocab_size), yb[:, 1:].reshape(-1))
        loss.backward()
        opt.step()
        total_loss += loss.item()
    print(f"Epoch {epoch+1}: loss={total_loss/len(train_dl):.4f}")

# Vamos testar

In [None]:
for _ in range(10):
    s = random_seq()
    pred = predict(model, s, max_len=len(s))
    print(f"{s} -> {pred}")
