<a href="https://colab.research.google.com/github/SaraBatistaPereira/GSI073/blob/main/GSI073_aula0_luong_attention.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Preparação dos dados

Esta tarefa é inverter sequências de caracteres. Exemplo: **aabcd** em **dcbaa**.


In [None]:
import torch
import torch.nn as nn
import random
import torch.nn.functional as F
import torch.utils.data # Adicionado para uso do TensorDataset/DataLoader

# --- Configurações Básicas e Vocabulário ---
chars = list("abcd ")
vocab = {ch: i for i, ch in enumerate(chars)} # Cada letra, ganha um número
inv_vocab = {i: ch for ch, i in vocab.items()}# Tabela de decodificação
vocab_size = len(vocab)
device = 'cuda' if torch.cuda.is_available() else 'cpu'

# --- Funções de Utilitários de Dados ---
def encode(s): # Codifica letras em números
    return torch.tensor([vocab[c] for c in s], dtype=torch.long)

def decode(t): # Decodifica números em letras
    return ''.join(inv_vocab[int(x)] for x in t)

def random_seq(n=5): # Cria novas sequências
    return ''.join(random.choice(chars[:-1]) for _ in range(n))

# --- NOVO: Funções de Métrica e Predição ---

def calculate_accuracy_seq(expected_str, predicted_str):
    """
    Compara a precisão (porcentagem de acerto) entre a string esperada e a predita,
    considerando o comprimento real da sequência.
    """
    L = len(expected_str)
    predicted_slice = predicted_str[:L]
    correct_chars = sum(1 for e, p in zip(expected_str, predicted_slice) if e == p)
    return (correct_chars / L) * 100 if L > 0 else 0.0

def decode_step(decoder, token, h, encoder_outputs):
    """Executa um passo de decodificação no modo greedy. (Depende da classe Decoder)"""
    logits, h = decoder(token, h, encoder_outputs)
    next_token = logits[:, -1, :].argmax(-1, keepdim=True)
    return next_token, h

def predict(model, seq, max_len):
    """Roda o Seq2Seq completo em uma sequência de entrada. (Depende das classes do modelo)"""
    model.eval()
    with torch.no_grad():
        src = pad(encode(seq)).unsqueeze(0).to(device, dtype=torch.long)
        encoder_outputs, h = model.encoder(src)
        token = torch.tensor([[vocab[' ']]], dtype=torch.long, device=device)
        seq_invertida = []

        for _ in range(max_len):
            token, h = decode_step(model.decoder, token, h, encoder_outputs)
            seq_invertida.append(token.item())

        return decode(seq_invertida)

In [None]:
print(pairs[1])

# Definição do modelo Seq2Seq com GRU

In [None]:
class Encoder(nn.Module):
    def __init__(self, vocab_size, emb_size, hidden_size):
        super().__init__()
        # 1. Camada de Embedding: transforma cada token (número) em um vetor denso
        self.embed = nn.Embedding(vocab_size, emb_size)
        # 2. Camada GRU: processa a sequência de embeddings
        self.gru = nn.GRU(emb_size, hidden_size, batch_first=True)

    def forward(self, x):
        # Passa os tokens de entrada pelo embedding
        x = self.embed(x)

        # Roda a GRU
        outputs, h = self.gru(x)

        # outputs (B, S, H): Saídas de todos os passos (h_s) - Usadas pela Atenção
        # h (1, B, H): Estado oculto final - Usado como estado inicial do Decoder
        return outputs, h   # <--- ESSENCIAL

In [None]:
class LuongAttention(nn.Module):
    def __init__(self):
        super().__init__()

    def forward(self, decoder_hidden, encoder_outputs):
        """
        decoder_hidden: (B, 1, H)
        encoder_outputs: (B, S, H)

        Retorna:
          context: (B, 1, H)
          attn_weights: (B, 1, S)
        """

        # score = h_t · h_s^T
        # (B, 1, H) x (B, H, S) -> (B, 1, S)
        attn_scores = torch.bmm(decoder_hidden, encoder_outputs.transpose(1, 2))

        attn_weights = F.softmax(attn_scores, dim=-1)  # normaliza nos steps da source

        # context = soma ponderada
        # (B, 1, S) x (B, S, H) -> (B, 1, H)
        context = torch.bmm(attn_weights, encoder_outputs)

        return context, attn_weights

In [None]:
class Decoder(nn.Module):
    def __init__(self, vocab_size, emb_size, hidden_size):
        super().__init__()
        self.embed = nn.Embedding(vocab_size, emb_size)
        self.gru = nn.GRU(emb_size, hidden_size, batch_first=True)
        self.attn = LuongAttention()

        # Luong concat: concatena hidden + context
        self.fc = nn.Linear(hidden_size * 2, vocab_size)

    def forward(self, x, h, encoder_outputs):
        """
        x: tokens anteriores corretos  (B, T)
        h: estado inicial do decoder   (1, B, H)
        encoder_outputs: todos os h_s  (B, S, H)
        """
        x = self.embed(x)  # (B, T, E)

        outputs = []
        # [NOVO] 1. Inicializa uma lista para armazenar os pesos de atenção de cada passo
        attn_weights_list = []

        seq_len = x.size(1)
        hidden = h

        for t in range(seq_len):
            inp = x[:, t:t+1]  # (B, 1, E)

            out_t, hidden = self.gru(inp, hidden)   # out_t: (B,1,H)

            # Atenção
            context, attn_w = self.attn(out_t, encoder_outputs)

            # [NOVO] 2. Armazena o peso de atenção (score) calculado
            attn_weights_list.append(attn_w)

            # concatenação [out_t ; context]
            combined = torch.cat([out_t, context], dim=-1)

            logits = self.fc(combined)  # (B,1,V)
            outputs.append(logits)

        outputs = torch.cat(outputs, dim=1)  # (B, T, V)

        # [ALTERADO] 3. Retorna a lista de pesos de atenção como terceira saída
        return outputs, hidden, attn_weights_list


In [None]:
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, src, tgt):
        # 1. Roda o Encoder
        encoder_outputs, h = self.encoder(src)

        # 2. Roda o Decoder, capturando agora 3 retornos.
        # logits: Saídas para o cálculo da loss
        # _: Estado oculto final (descartado no treino)
        # _: Lista de pesos de atenção (descartada no treino)
        logits, _, _ = self.decoder(tgt[:, :-1], h, encoder_outputs)

        return logits

# Código para usar o modelo treinado: inferência

In [None]:
def decode_step(decoder, token, h, encoder_outputs):
    """
    Executa um passo de decodificação:
    - token: tensor (B,1)
    - h: estado oculto do decoder (1,B,H)
    - encoder_outputs: (B,S,H)

    Retorna:
    - next_token (B, 1)
    - h (1, B, H)
    - attn_w (B, 1, S): Os pesos de atenção do passo atual
    """
    # [ALTERADO] Captura 3 valores do decoder
    logits, h, attn_weights_list = decoder(token, h, encoder_outputs)

    # O decoder em modo de inferência (batch size = 1) só tem 1 item na lista
    attn_w = attn_weights_list[0]

    next_token = logits[:, -1, :].argmax(-1, keepdim=True)

    # [ALTERADO] Retorna também o peso de atenção
    return next_token, h, attn_w

# Preparação para treino

In [None]:
# --- Bloco 4: Inicialização do Modelo e Configuração do Treino (Otimizado) ---

emb_size = 32
# [ALTERADO] Aumentado para 128 para maior capacidade e potencial de redução de erro.
hidden_size = 128

# Instanciação das classes (que agora retornam a atenção internamente)
encoder = Encoder(vocab_size, emb_size, hidden_size)
decoder = Decoder(vocab_size, emb_size, hidden_size)
model = Seq2Seq(encoder, decoder).to(device)

# Configuração da Loss Function
loss_fn = nn.CrossEntropyLoss(ignore_index=vocab[' ']) # ignora o pad: " "

# [ALTERADO] Diminuído para 5e-4 (0.0005) para uma convergência mais estável
opt = torch.optim.Adam(model.parameters(), lr=5e-4)

# Execução do treino

In [None]:
for epoch in range(10):
    model.train()
    total_loss = 0
    for xb, yb in train_dl:
        xb, yb = xb.to(device, dtype=torch.long), yb.to(device, dtype=torch.long)
        opt.zero_grad()
        logits = model(xb, yb)
        loss = loss_fn(logits.reshape(-1, vocab_size), yb[:, 1:].reshape(-1))
        loss.backward()
        opt.step()
        total_loss += loss.item()
    print(f"Epoch {epoch+1}: loss={total_loss/len(train_dl):.4f}")

# Vamos testar

In [None]:
# --- Bloco de Teste de Inferência e Cálculo de Acertos (FINAL) ---
import numpy as np # Adicione esta importação se não estiver no início do seu script!

print("\n--- Testando o Modelo e Calculando Acertos ---")
print(f"{'ENTRADA':^10} | {'SAÍDA ESPERADA':^15} | {'SAÍDA PREDITA':^15} | {'ACERTO':^8}")
print("-" * 55)

total_accuracy = 0
# Reduzido para 5 testes para focar na visualização da matriz de atenção
num_tests = 5
seq_len_test = max_len

for i in range(num_tests):
    # 1. Entrada aleatória
    s = random_seq(n=seq_len_test)
    target = s[::-1]

    # 2. Saída Predita do Modelo
    # [ALTERADO] Agora a função predict retorna DOIS valores: a predição e a matriz de atenção
    pred_with_padding, attn_matrix = predict(model, s, max_len=len(s))

    pred = pred_with_padding[:len(s)]

    # 3. Cálculo da Acuidade
    accuracy = calculate_accuracy_seq(target, pred)
    total_accuracy += accuracy

    # 4. Impressão dos Resultados Básicos
    print(f"{s:^10} | {target:^15} | {pred:^15} | {accuracy:^7.1f}%")

    # 5. [NOVO] Impressão da Matriz de Score de Atenção
    print("  Matriz de Score de Atenção (Pesos):")

    output_chars = list(target)
    input_chars = list(s)

    # Converte a matriz de tensores para NumPy para impressão formatada
    attn_np = attn_matrix.cpu().numpy()

    # Imprime os rótulos de coluna (Entrada)
    header = "       | " + " | ".join(f"{ch:^4}" for ch in input_chars) + " |"
    print(header)
    print("-------+" + "------" * len(input_chars) + "+")

    # Imprime os pesos linha por linha (Saída)
    for j, (out_char, row) in enumerate(zip(output_chars, attn_np)):
        # Formata cada peso com 2 casas decimais
        weights_formatted = " | ".join(f"{w:.2f}" for w in row)
        # O char de saída é o rótulo da linha
        print(f" {out_char:^5} | {weights_formatted} |")

    print("-" * 55)

# --- Cálculo da Média Geral de Acertos ---
average_accuracy = total_accuracy / num_tests

print(f"Média Geral de Acertos (em {num_tests} testes): {average_accuracy:.2f}%")
print("-" * 55)

# Exercício
Compare o resultado do uso do encoder-decoder com atenção com o encoder-decoder sem atenção.