<a href="https://colab.research.google.com/github/ferdinandrafols/IA_LLMs/blob/main/aula_3_transformer_decoder_only_and_inference.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Aula 3 - Transformer *decoder-only*

Nesta aula você irá modificar o Transformer *decoder-only* fornecido a seguir.
Observe que em um *decoder-only* não existe:

*   cross-attention
*   encoder separado

e é utilizado com *auto-regressão*.

## Objetivo


## Exercício

Neste exercício você deve:

1.   carregar o seu conjunto de documentos
2.   treinar e usar (ou carregar) um tokenizador
3.   fazer treino de um modelo decoder-only
4.   incluir no loop de treino, inferência usando máxima probabilidade
5.   incluir no loop de treino, inferência usando amostragem com temperatura


In [None]:
import torch
import torch.nn as nn
from torch.nn import functional as F

# Passo 1: carregar conjunto de documentos

In [None]:
from datasets import load_dataset
import re

###### INSIRA AQUI O CODIGO PARA CARREGAR OS SEUS DOCUMENTOS na lista DOCUMENTOS
#ds = load_dataset("jquigl/imdb-genres") # Exemplo

def clean_ascii(text):
    text = text.encode("ascii", errors="ignore").decode()
    return re.sub(r"[^A-Za-z0-9 .,:;!?'\-]", "", text)

# Extract documentos
documentos = [clean_ascii(x["description"]) for x in ds["train"]]
documentos = [t.split(" - ")[0] for t in documentos]   # optional remove year
documentos = [t for t in documentos if len(t) > 0]

#documentos = [ "meu doc favorito 1", "meu doc menos favorito 2"]
print("Total documentos:", len(documentos))
print("Sample:", documentos[:10])

# Passo 2: Carregar ou treinar um tokenizador

In [None]:
from tokenizers import Tokenizer
from tokenizers.models import WordLevel
from tokenizers.trainers import WordLevelTrainer
from tokenizers.pre_tokenizers import Whitespace
import random
import re

##### Insira aqui o código para treinar o seu TOKENIZER
# Defina o seu tokenizador
tokenizer = Tokenizer(WordLevel(unk_token="[UNK]"))
tokenizer.pre_tokenizer = Whitespace()
trainer = WordLevelTrainer(
    vocab_size=1000,
    special_tokens=["[PAD]", "[UNK]", "[BOS]", "[EOS]"]
)
# Treino do tokenizador
tokenizer.train_from_iterator(documentos, trainer)

# Funções auxiliares para transitar entre tokens textuais e ids de tokens
def encode(text):
    ids = tokenizer.encode("[BOS] " + text + " [EOS]").ids
    return torch.tensor(ids, dtype=torch.long)

def decode(ids):
    return tokenizer.decode(ids.tolist())

vocab_size = tokenizer.get_vocab_size()

## Definição do modelo Transformer Decoder-only

In [None]:
class DecoderOnlyTransformer(nn.Module):
    "Implementação de um transformer que tem somente a parte do decoder"
    def __init__(self, vocab_size, d_model=128, n_heads=4, num_layers=3, max_len=64):
        super().__init__()
        self.max_len = max_len

        self.token_emb = nn.Embedding(vocab_size, d_model)
        self.pos_emb = nn.Embedding(max_len, d_model)

        layer = nn.TransformerDecoderLayer(
            d_model=d_model,
            nhead=n_heads,
            dim_feedforward=256,
            batch_first=True
        )
        self.decoder = nn.TransformerDecoder(layer, num_layers=num_layers)
        self.lm_head = nn.Linear(d_model, vocab_size)

    def forward(self, x):
        B, T = x.shape
        pos = torch.arange(T, device=x.device).unsqueeze(0)

        h = self.token_emb(x) + self.pos_emb(pos)

        # Máscara causal
        mask = torch.triu(torch.ones(T, T, device=x.device), diagonal=1).bool()

        out = self.decoder(h, h, tgt_mask=mask)
        logits = self.lm_head(out)
        return logits

# Códigos de inferência

## Código de inferência simples: token com maior probabilidade


In [None]:
def max_prob_sampling(logits):
    next_token = logits.argmax(dim=-1)
    return next_token.unsqueeze(0)

## Código de inferência avançada: amostragem com temperatura

In [None]:
import torch

# Inferência (com temperature e top_p)
def sampling(logits, top_p=0.9, top_k=None, temperature=1.0):
    # Ajusta pela temperatura
    logits = logits / temperature

    # Se top_k for especificado, filtra por top_k primeiro
    if top_k is not None:
        # Ensure top_k is not larger than the vocabulary size
        k_to_use = min(top_k, logits.size(-1))
        # Get the top_k values and indices
        v, _ = torch.topk(logits, k_to_use)
        # Set logits of all values smaller than the k-th value to -inf
        logits[logits < v[:, [-1]]] = float('-inf')

    # Ordena os logits
    sorted_logits, sorted_indices = torch.sort(logits, descending=True)
    cumulative_probs = torch.softmax(sorted_logits, dim=-1).cumsum(dim=-1)

    # Mascara tokens acima do top_p
    mask = cumulative_probs > top_p
    # Garante que ao menos um token permaneça
    mask[..., 1:] = mask[..., :-1].clone()
    mask[..., 0] = False

    filtered_logits = sorted_logits.masked_fill(mask, float('-inf'))
    probs = torch.softmax(filtered_logits, dim=-1)

    # Amostra o token
    sampled_idx = torch.multinomial(probs, num_samples=1)

    # Converte para índice na tabela original
    next_token = sorted_indices[sampled_idx]

    return next_token


def generate(prompt, next_token_function, max_new_tokens=20, top_k=None, top_p=0.9, temperature=1.0):
    model.eval()
    x = encode(prompt).unsqueeze(0).to(device)

    for _ in range(max_new_tokens):
        logits = model(x)[:, -1, :]  # pega apenas o último passo

        # Passa os parâmetros de amostragem se a função for top_p_sampling
        if next_token_function == top_p_sampling:
            next_token = next_token_function(logits.squeeze(0), top_k=top_k, top_p=top_p, temperature=temperature)
        else:
            next_token = next_token_function(logits.squeeze(0))

        x = torch.cat([x, next_token.unsqueeze(0)], dim=1)

        if next_token.item() == tokenizer.token_to_id("[EOS]"):
            break

    return decode(x[0])


# 4. Fazer treino de modelo: códigos de treino

In [None]:
# Função auxiliar para gerar batches de exemplos para treino
def sample_batch(batch_size=16, max_len=20):
    batch = random.sample(documentos, batch_size)
    tokenized = [encode(t) for t in batch]

    max_t = min(max(len(x) for x in tokenized), max_len)
    padded = []

    for x in tokenized:
        x = x[:max_t]
        pad_len = max_t - len(x)
        if pad_len > 0:
            x = torch.cat([x, torch.zeros(pad_len, dtype=torch.long)])
        padded.append(x)

    return torch.stack(padded)

################################
device = "cuda" if torch.cuda.is_available() else "cpu"
model = DecoderOnlyTransformer(vocab_size).to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-5)
#################################

steps = 10000

for step in range(1, steps + 1):
    model.train()
    batch = sample_batch().to(device)

    logits = model(batch[:, :-1])
    loss = F.cross_entropy(
        logits.reshape(-1, vocab_size),
        batch[:, 1:].reshape(-1),
        ignore_index=tokenizer.token_to_id("[PAD]")
    )

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    if step % 50 == 0:
        ppl = torch.exp(loss).item()
        print(f"[step {step}] loss={loss.item():.4f}, ppl={ppl:.2f}")

    if step % 100 == 0:
        print("Generated text:")
        #print(generate("MAXPROB: This movie is a", next_token_function= ... ))
        print("--------------------------------------")

print("Training completed.")

# Exercício: controle de temperatura, Top-K e Top-P

Modifique o código a seguir para fazer a visualização de produção de tokens do modelo que você treinou.

In [None]:
# @title
import numpy as np
import matplotlib.pyplot as plt
import ipywidgets as widgets
from IPython.display import display, Markdown

# -----------------------------
# Mock language-model logits
# -----------------------------
TOKENS = ["blue", "purple", "violet", "vio", "not", "Blue", "green", "gray", "grey", "black"]
BASE_LOGITS = np.array([6.0, 1.5, 0.5, 0.2, 0.2, 0.0, -5.0, -5.0, -5.0, -5.0])

def softmax(x):
    e = np.exp(x - np.max(x))
    return e / e.sum()

def apply_temperature(logits, temperature):
    return logits / max(temperature, 1e-5)

def apply_top_k(probs, k):
    if k >= len(probs):
        return probs
    idx = np.argsort(probs)[::-1]
    mask = np.zeros_like(probs)
    mask[idx[:k]] = 1
    probs = probs * mask
    return probs / probs.sum()

def apply_top_p(probs, p):
    idx = np.argsort(probs)[::-1]
    cumulative = np.cumsum(probs[idx])
    mask = cumulative <= p
    mask[np.argmax(mask)] = True
    new_probs = np.zeros_like(probs)
    new_probs[idx[mask]] = probs[idx[mask]]
    return new_probs / new_probs.sum()

# -----------------------------
# Widgets
# -----------------------------
prompt_dropdown = widgets.Dropdown(
    options=["Roses are red, violets are..."],
    value="Roses are red, violets are...",
    description="Prompt",
    layout=widgets.Layout(width="95%")
)

temperature_slider = widgets.FloatSlider(
    value=1.0, min=0.1, max=5.0, step=0.1,
    description="Temperatura",
    readout_format=".1f",
    layout=widgets.Layout(width="90%")
)

topk_slider = widgets.IntSlider(
    value=6, min=1, max=10, step=1,
    description="Top-K",
    layout=widgets.Layout(width="90%")
)

topp_slider = widgets.FloatSlider(
    value=1.0, min=0.1, max=1.0, step=0.05,
    description="Top-P",
    readout_format=".2f",
    layout=widgets.Layout(width="90%")
)

# -----------------------------
# Plot
# -----------------------------
output_plot = widgets.Output()

def update_plot(*args):
    with output_plot:
        output_plot.clear_output()

        logits = apply_temperature(BASE_LOGITS, temperature_slider.value)
        probs = softmax(logits)
        probs = apply_top_k(probs, topk_slider.value)
        probs = apply_top_p(probs, topp_slider.value)

        fig, ax = plt.subplots(figsize=(10, 4))
        bars = ax.bar(TOKENS, probs * 100)
        ax.set_ylim(0, 100)
        ax.set_ylabel("Probabilidade (%)")
        ax.set_title("Probabilidade do próximo token")

        for bar, p in zip(bars, probs):
            ax.text(
                bar.get_x() + bar.get_width() / 2,
                bar.get_height(),
                f"{p*100:.2f}%",
                ha="center",
                va="bottom",
                fontsize=9
            )

        plt.xticks(rotation=0)
        plt.show()

for w in [temperature_slider, topk_slider, topp_slider]:
    w.observe(update_plot, names="value")

# -----------------------------
# Collapsible explanation
# -----------------------------
accordion = widgets.Accordion(
    children=[widgets.HTML(
        """
        <ul>
          <li><b>Temperatura</b>: Controla aleatoriedade com mudança na escala dos logits.</li>
          <li><b>Top-K</b>: Restringe a amostra aos K tokens mais prováveis.</li>
          <li><b>Top-P</b>: Usa somente os menor conjunto de tokens que resultam em probabilidade acumulada até P.</li>
        </ul>
        """
    )]
)
accordion.set_title(0, "Entenda a visualização")

# -----------------------------
# Layout
# -----------------------------
controls = widgets.VBox([
    prompt_dropdown,
    widgets.HBox([temperature_slider]),
    widgets.HBox([topk_slider]),
    widgets.HBox([topp_slider])
])

display(Markdown("## Visualização de controle de Temperatura, Top-K e Top-P."))
display(widgets.VBox([
    widgets.HTML("<b>Parameters</b>"),
    controls,
    output_plot,
    accordion
]))

update_plot()


## Visualização de controle de Temperatura, Top-K e Top-P.

VBox(children=(HTML(value='<b>Parameters</b>'), VBox(children=(Dropdown(description='Prompt', layout=Layout(wi…