<a href="https://colab.research.google.com/github/alxmarqs/LLMtopics/blob/main/1_3_transformer_decoder_only_exercicio.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Aula 3 — Exercício: Transformer Decoder-only com Inferência

## Características do Decoder-only
- **Máscara causal**: cada token só enxerga tokens anteriores (nunca o futuro)
- **Auto-regressão**: gera um token por vez, realimentando a saída na entrada
- Sem cross-attention, sem encoder separado

## O que este exercício implementa
1. Carregamento do corpus (IMDB-Genres)
2. Treinamento de tokenizador WordLevel
3. Treinamento do modelo decoder-only (linguagem causal)
4. **Inferência com máxima probabilidade** (greedy) no loop de treino
5. **Inferência com amostragem por temperatura** (+ top-k / top-p) no loop de treino
6. Visualização interativa dos efeitos de temperatura, top-k e top-p **com logits reais do modelo**

## Instalação de dependências

In [None]:
!pip install datasets tokenizers -q

In [None]:
import torch
import torch.nn as nn
from torch.nn import functional as F
import random
import re
import numpy as np
import matplotlib.pyplot as plt

torch.manual_seed(42)
random.seed(42)

device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f'Dispositivo: {device}')

---
## Passo 1 — Carregar o conjunto de documentos

In [None]:
from datasets import load_dataset

ds = load_dataset("jquigl/imdb-genres")

def clean_ascii(text):
    text = text.encode("ascii", errors="ignore").decode()
    return re.sub(r"[^A-Za-z0-9 .,:;!?'\-]", "", text)

documentos = [clean_ascii(x["description"]) for x in ds["train"]]
documentos = [t.split(" - ")[0] for t in documentos]
documentos = [t for t in documentos if len(t) > 20]

print(f"Total de documentos: {len(documentos)}")
print("\nAmostras:")
for d in documentos[:5]:
    print(" •", d)

---
## Passo 2 — Treinar o tokenizador

In [None]:
from tokenizers import Tokenizer
from tokenizers.models import WordLevel
from tokenizers.trainers import WordLevelTrainer
from tokenizers.pre_tokenizers import Whitespace

tokenizer = Tokenizer(WordLevel(unk_token="[UNK]"))
tokenizer.pre_tokenizer = Whitespace()

trainer = WordLevelTrainer(
    vocab_size=2000,
    special_tokens=["[PAD]", "[UNK]", "[BOS]", "[EOS]"]
)

tokenizer.train_from_iterator(documentos, trainer)

vocab_size = tokenizer.get_vocab_size()
pad_token_id = tokenizer.token_to_id("[PAD]")
bos_token_id = tokenizer.token_to_id("[BOS]")
eos_token_id = tokenizer.token_to_id("[EOS]")

print(f"Vocabulário: {vocab_size} tokens")
print(f"Especiais — PAD:{pad_token_id} | BOS:{bos_token_id} | EOS:{eos_token_id}")

def encode(text):
    ids = tokenizer.encode("[BOS] " + text + " [EOS]").ids
    return torch.tensor(ids, dtype=torch.long)

def decode(ids):
    """Decodifica IDs filtrando tokens especiais."""
    special = {pad_token_id, bos_token_id, eos_token_id}
    clean = [i for i in (ids.tolist() if hasattr(ids, 'tolist') else ids) if i not in special]
    return tokenizer.decode(clean)

# Exemplo
ex = encode(documentos[0])
print(f"\nExemplo: '{documentos[0][:50]}...'")
print(f"Tokens ({len(ex)}): {ex.tolist()[:10]} ...")

---
## Passo 3 — Definição do modelo Transformer Decoder-only

A **máscara causal** é o que diferencia o decoder do encoder:
- Cada posição `t` só consegue "ver" as posições `0..t` (contexto passado)
- Isso é essencial para gerar texto de forma autoregressiva: o modelo nunca "espiona" o futuro durante o treino

In [None]:
class DecoderOnlyTransformer(nn.Module):
    """Transformer Decoder-only para geração de texto (linguagem causal)."""

    def __init__(self, vocab_size, d_model=128, n_heads=4, num_layers=3, max_len=64):
        super().__init__()
        self.max_len = max_len

        self.token_emb = nn.Embedding(vocab_size, d_model)
        self.pos_emb = nn.Embedding(max_len, d_model)

        # TransformerDecoder usado sem memória de encoder (self-attention causal)
        layer = nn.TransformerDecoderLayer(
            d_model=d_model,
            nhead=n_heads,
            dim_feedforward=256,
            dropout=0.1,
            batch_first=True
        )
        self.decoder = nn.TransformerDecoder(layer, num_layers=num_layers)
        self.lm_head = nn.Linear(d_model, vocab_size)

    def _causal_mask(self, T, device):
        """Máscara triangular superior: impede ver tokens futuros."""
        return torch.triu(torch.ones(T, T, device=device), diagonal=1).bool()

    def forward(self, x):
        B, T = x.shape
        pos = torch.arange(T, device=x.device).unsqueeze(0)
        h = self.token_emb(x) + self.pos_emb(pos)  # (B, T, d_model)

        mask = self._causal_mask(T, x.device)
        out = self.decoder(h, h, tgt_mask=mask)  # h como memória (sem encoder)
        logits = self.lm_head(out)  # (B, T, vocab_size)
        return logits

---
## Passo 4 — Estratégias de inferência

### 4a. Máxima probabilidade (greedy)
Sempre escolhe o token com maior probabilidade. É determinístico, rápido, mas pode ficar repetitivo.

### 4b. Amostragem com temperatura + Top-K + Top-P (nucleus sampling)
- **Temperatura** controla a "criatividade": valores baixos → mais conservador; valores altos → mais aleatório
- **Top-K** filtra os K tokens mais prováveis antes de amostrar
- **Top-P (nucleus)** filtra os tokens que juntos somam probabilidade ≥ P

In [None]:
# -------------------------------------------------------
# 4a. Greedy: token com maior probabilidade
# -------------------------------------------------------
def max_prob_sampling(logits):
    """
    Seleção greedy: retorna o índice do token com maior logit.
    logits: tensor (vocab_size,)
    retorna: tensor (1,)
    """
    return logits.argmax(dim=-1, keepdim=True)  # (1,)


# -------------------------------------------------------
# 4b. Amostragem com temperatura, top-k e top-p
# -------------------------------------------------------
def sampling(logits, temperature=1.0, top_k=None, top_p=0.9):
    """
    Amostragem estocástica com:
    - temperature: divide os logits (> 1 = mais aleatório, < 1 = mais concentrado)
    - top_k: mantém apenas os K tokens mais prováveis
    - top_p (nucleus): mantém o menor conjunto com prob acumulada >= p
    logits: tensor (vocab_size,)
    retorna: tensor (1,)
    """
    logits = logits.clone().float()

    # --- Temperatura ---
    logits = logits / max(temperature, 1e-5)

    # --- Top-K ---
    if top_k is not None:
        k = min(top_k, logits.size(-1))
        top_vals, _ = torch.topk(logits, k)
        min_val = top_vals[-1]  # menor valor entre os top-k
        logits = logits.masked_fill(logits < min_val, float('-inf'))

    # --- Top-P (nucleus) ---
    sorted_logits, sorted_idx = torch.sort(logits, descending=True)
    cum_probs = torch.softmax(sorted_logits, dim=-1).cumsum(dim=-1)

    # Remove tokens cuja prob acumulada ultrapassa p (exceto o primeiro)
    remove_mask = cum_probs > top_p
    remove_mask[..., 1:] = remove_mask[..., :-1].clone()
    remove_mask[..., 0] = False

    filtered = sorted_logits.masked_fill(remove_mask, float('-inf'))
    probs = torch.softmax(filtered, dim=-1)

    # Amostra e mapeia de volta ao índice original
    sampled_rank = torch.multinomial(probs, num_samples=1)  # posição no sorted
    next_token = sorted_idx[sampled_rank]  # índice real no vocab

    return next_token  # (1,)


# -------------------------------------------------------
# Função de geração autoregressiva (comum às duas estratégias)
# -------------------------------------------------------
def generate(prompt, strategy='greedy', max_new_tokens=20,
             temperature=1.0, top_k=None, top_p=0.9):
    """
    Gera texto a partir de um prompt usando o modelo decoder-only.

    strategy:
        'greedy' → max_prob_sampling (determinístico)
        'sampling' → amostragem com temperatura/top-k/top-p
    """
    model.eval()
    with torch.no_grad():
        x = encode(prompt).unsqueeze(0).to(device)  # (1, T)

        for _ in range(max_new_tokens):
            # Trunca se exceder max_len do modelo
            x_cond = x[:, -model.max_len:]

            logits_all = model(x_cond)  # (1, T, vocab_size)
            logits = logits_all[0, -1]  # último passo: (vocab_size,)

            if strategy == 'greedy':
                next_token = max_prob_sampling(logits)  # (1,)
            else:  # 'sampling'
                next_token = sampling(logits,
                                    temperature=temperature,
                                    top_k=top_k,
                                    top_p=top_p)  # (1,)

            x = torch.cat([x, next_token.unsqueeze(0)], dim=1)  # (1, T+1)

            if next_token.item() == eos_token_id:
                break

        return decode(x[0])

print("Funções de inferência definidas.")

---
## Passo 5 — Treinamento do modelo com inferência periódica

A cada 500 steps, o loop **interrompe** o treino e gera texto com as duas estratégias:
- **Greedy** — saída determinística e conservadora
- **Sampling** — saída estocástica e mais variada

In [None]:
MAX_LEN = 32
BATCH_SIZE = 32

def sample_batch(batch_size=BATCH_SIZE, max_len=MAX_LEN):
    """Amostra um batch, tokeniza e faz padding com [PAD]."""
    batch = random.sample(documentos, batch_size)
    tokenized = [encode(t) for t in batch]
    max_t = min(max(len(x) for x in tokenized), max_len)
    padded = []
    for x in tokenized:
        x = x[:max_t]
        pad_len = max_t - len(x)
        if pad_len > 0:
            x = torch.cat([x, torch.full((pad_len,), pad_token_id, dtype=torch.long)])
        padded.append(x)
    return torch.stack(padded)  # (B, T)


# ---- instancia modelo e otimizador ----
model = DecoderOnlyTransformer(vocab_size).to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-3)

total_params = sum(p.numel() for p in model.parameters())
print(f"Parâmetros totais: {total_params:,}")
print(f"Dispositivo: {device}")

In [None]:
STEPS = 5000
LOG_EVERY = 200  # imprime loss
GEN_EVERY = 500  # gera texto com as duas estratégias
PROMPT = "This movie is a"

loss_history = []

print(f"Treinando por {STEPS} steps...")
print(f"Prompt de teste: '{PROMPT}'\n")
print("=" * 65)

for step in range(1, STEPS + 1):
    model.train()
    batch = sample_batch().to(device)

    # Objetivo causal: prever batch[:, 1:] dado batch[:, :-1]
    logits = model(batch[:, :-1])
    loss = F.cross_entropy(
        logits.reshape(-1, vocab_size),
        batch[:, 1:].reshape(-1),
        ignore_index=pad_token_id
    )

    optimizer.zero_grad()
    loss.backward()
    torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
    optimizer.step()

    loss_history.append(loss.item())

    # --- Log periódico ---
    if step % LOG_EVERY == 0:
        avg_loss = np.mean(loss_history[-LOG_EVERY:])
        ppl = np.exp(avg_loss)
        print(f"[step {step:>5}] loss = {avg_loss:.4f} | perplexidade = {ppl:.2f}")

    # --- Geração com as duas estratégias ---
    if step % GEN_EVERY == 0:
        print(f"\n{'─'*65}")
        print(f"  Geração no step {step}")
        print(f"  Prompt: '{PROMPT}'")
        print(f"{'─'*65}")

        # Estratégia 1: greedy (máxima probabilidade)
        texto_greedy = generate(
            PROMPT,
            strategy='greedy',
            max_new_tokens=15
        )
        print(f"  [Greedy] {texto_greedy}")

        # Estratégia 2: amostragem com temperatura
        texto_sample = generate(
            PROMPT,
            strategy='sampling',
            max_new_tokens=15,
            temperature=0.8,
            top_k=50,
            top_p=0.9
        )
        print(f"  [Sampling T=0.8, K=50, P=0.9] {texto_sample}")
        print()

print("=" * 65)
print("Treinamento concluído!")

In [None]:
# Curva de loss com média móvel
window = 100
smooth = np.convolve(loss_history, np.ones(window)/window, mode='valid')

fig, ax = plt.subplots(figsize=(10, 3))
ax.plot(loss_history, alpha=0.2, color='royalblue')
ax.plot(range(window-1, len(loss_history)), smooth, color='royalblue', lw=2,
        label=f'Média móvel ({window} steps)')
ax.set_xlabel('Step')
ax.set_ylabel('Cross-Entropy Loss')
ax.set_title('Curva de Treinamento — Decoder-only (Linguagem Causal)')
ax.legend()
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

---
## Passo 6 — Comparação de estratégias de geração

Agora comparamos as diferentes configurações sobre o **mesmo prompt**, com o modelo já treinado.

In [None]:
prompts = [
    "This movie is a",
    "A story about love and",
    "In the future, the world",
]

configs = [
    {"label": "Greedy", "strategy": "greedy"},
    {"label": "Temp=0.5 K=10 P=0.9", "strategy": "sampling", "temperature": 0.5, "top_k": 10, "top_p": 0.9},
    {"label": "Temp=1.0 K=50 P=0.9", "strategy": "sampling", "temperature": 1.0, "top_k": 50, "top_p": 0.9},
    {"label": "Temp=1.5 K=None P=0.95", "strategy": "sampling", "temperature": 1.5, "top_k": None, "top_p": 0.95},
]

print("Comparação de estratégias de geração\n")
for prompt in prompts:
    print(f"  Prompt: '{prompt}'")
    print("  " + "-" * 55)
    for cfg in configs:
        kw = {k: v for k, v in cfg.items() if k not in ('label', 'strategy')}
        texto = generate(prompt, strategy=cfg['strategy'], max_new_tokens=15, **kw)
        print(f"  [{cfg['label']:<25}] {texto}")
    print()

---
## Passo 7 — Visualização interativa: Temperatura, Top-K, Top-P

Diferente do exemplo original (que usava logits fictícios), aqui usamos **logits reais do modelo treinado** para um prompt real. Isso torna a visualização diretamente conectada ao comportamento aprendido.

In [None]:
import ipywidgets as widgets
from IPython.display import display

# ------------------------------------------------------------------
# Funções NumPy para a visualização (replicam a lógica PyTorch)
# ------------------------------------------------------------------
def np_softmax(x):
    e = np.exp(x - np.max(x))
    return e / e.sum()

def apply_temperature_np(logits, temperature):
    return logits / max(temperature, 1e-5)

def apply_top_k_np(logits, k):
    if k is None or k >= len(logits):
        return logits
    threshold = np.sort(logits)[::-1][k - 1]
    result = logits.copy()
    result[result < threshold] = -np.inf
    return result

def apply_top_p_np(logits, p):
    sorted_idx = np.argsort(logits)[::-1]
    sorted_logs = logits[sorted_idx]
    cum_probs = np.cumsum(np_softmax(sorted_logs))
    remove = cum_probs > p
    remove[1:] = remove[:-1].copy()
    remove[0] = False
    result = logits.copy()
    result[sorted_idx[remove]] = -np.inf
    return result

# ------------------------------------------------------------------
# Extrai logits reais para um prompt
# ------------------------------------------------------------------
def get_real_logits(prompt, top_n=15):
    """Retorna os top_n tokens e logits reais do modelo para o próximo token."""
    model.eval()
    with torch.no_grad():
        x = encode(prompt).unsqueeze(0).to(device)
        logits = model(x)[0, -1].cpu().numpy()  # (vocab_size,)

    top_idx = np.argsort(logits)[::-1][:top_n]
    top_logits = logits[top_idx]
    top_tokens = [tokenizer.id_to_token(int(i)) for i in top_idx]
    return top_tokens, top_logits, top_idx, logits


PROMPT_VIZ = "This movie is a"
TOKENS_VIZ, BASE_LOGITS_VIZ, TOP_IDX_VIZ, FULL_LOGITS_VIZ = get_real_logits(PROMPT_VIZ)

print(f"Prompt: '{PROMPT_VIZ}'")
print(f"Top 5 tokens: {TOKENS_VIZ[:5]}")
print(f"Top 5 logits: {BASE_LOGITS_VIZ[:5].round(2)}")

In [None]:
# ------------------------------------------------------------------
# Widgets interativos
# ------------------------------------------------------------------
prompt_widget = widgets.Dropdown(
    options=["This movie is a", "A story about love and", "In the future, the world"],
    value="This movie is a",
    description="Prompt:",
    layout=widgets.Layout(width="95%")
)

temp_slider = widgets.FloatSlider(
    value=1.0, min=0.1, max=3.0, step=0.1,
    description="Temperatura:",
    readout_format=".1f",
    layout=widgets.Layout(width="90%")
)

topk_slider = widgets.IntSlider(
    value=15, min=1, max=15, step=1,
    description="Top-K:",
    layout=widgets.Layout(width="90%")
)

topp_slider = widgets.FloatSlider(
    value=1.0, min=0.1, max=1.0, step=0.05,
    description="Top-P:",
    readout_format=".2f",
    layout=widgets.Layout(width="90%")
)

output_plot = widgets.Output()


def update_plot(*args):
    """Atualiza o gráfico com logits reais do modelo."""
    tokens_viz, base_logits, _, _ = get_real_logits(prompt_widget.value)

    # Aplica transformações
    logits = apply_temperature_np(base_logits.copy(), temp_slider.value)
    logits = apply_top_k_np(logits, topk_slider.value)
    logits = apply_top_p_np(logits, topp_slider.value)
    probs = np_softmax(logits)

    # Cores: azul = disponível, cinza = filtrado (-inf)
    colors = ['#2196F3' if np.isfinite(l) else '#BDBDBD' for l in logits]

    with output_plot:
        output_plot.clear_output(wait=True)
        fig, ax = plt.subplots(figsize=(11, 4))
        bars = ax.bar(tokens_viz, probs * 100, color=colors, edgecolor='white')

        for bar, p, l in zip(bars, probs, logits):
            if np.isfinite(l):
                ax.text(bar.get_x() + bar.get_width()/2,
                        bar.get_height() + 0.5,
                        f"{p*100:.1f}%",
                        ha='center', va='bottom', fontsize=8)
            else:
                ax.text(bar.get_x() + bar.get_width()/2,
                        1, "X", ha='center', va='bottom', fontsize=9, color='gray')

        ax.set_ylim(0, max(probs)*100 * 1.25 + 2)
        ax.set_ylabel("Probabilidade (%)")
        ax.set_title(
            f"Próximo token após: '{prompt_widget.value}'\n"
            f"[Logits reais do modelo — T={temp_slider.value:.1f} K={topk_slider.value} P={topp_slider.value:.2f}]",
            fontsize=10
        )
        ax.tick_params(axis='x', rotation=30)
        ax.grid(axis='y', alpha=0.3)

        # Legenda
        from matplotlib.patches import Patch
        legend_elem = [
            Patch(facecolor='#2196F3', label='Token disponível'),
            Patch(facecolor='#BDBDBD', label='Filtrado (Top-K/P)'),
        ]
        ax.legend(handles=legend_elem, fontsize=8, loc='upper right')

        plt.tight_layout()
        plt.show()


for w in [temp_slider, topk_slider, topp_slider, prompt_widget]:
    w.observe(update_plot, names='value')

# Exibe o painel
panel = widgets.VBox([
    widgets.HTML("<h3>Visualização Interativa: Temperatura, Top-K e Top-P</h3>"),
    prompt_widget,
    temp_slider,
    topk_slider,
    topp_slider,
    output_plot
])

display(panel)
update_plot()  # Gera a primeira visualização

---
## Conclusão

Neste notebook implementamos:

1. Um **Transformer Decoder-only** com máscara causal
2. **Inferência greedy** (determinística, máxima probabilidade)
3. **Inferência estocástica** com temperatura, top-k e top-p
4. Treinamento com geração periódica no loop
5. Comparação de múltiplas estratégias de geração
6. Visualização interativa com **logits reais do modelo**

### Principais Conceitos

- **Máscara causal**: Permite que cada posição veja apenas o passado (essencial para modelos autoregressivos)
- **Temperatura**: Controla a "criatividade" da geração (baixo = conservador, alto = aleatório)
- **Top-K**: Limita candidatos aos K tokens mais prováveis
- **Top-P (nucleus)**: Limita candidatos ao menor conjunto cuja probabilidade acumulada ≥ P

### Próximos Passos

- Experimente diferentes configurações de temperatura, top-k e top-p
- Teste com prompts personalizados
- Aumente o número de steps de treinamento para melhorar a qualidade
- Explore diferentes arquiteturas (mais camadas, mais heads de atenção)