<a href="https://colab.research.google.com/github/diegomrodrigues/my_llama_2/blob/main/LLaMA%202%20SFT.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## My Custom LLaMA Implementation

### LLaMA Config

In [1]:
from transformers import PretrainedConfig
from typing import Dict, Any, Optional, Union

class LlamaConfig(PretrainedConfig):
    """
    Configuração para o modelo LLaMA.

    Esta classe define todos os parâmetros necessários para construir e configurar
    um modelo LLaMA. Herda de PretrainedConfig da biblioteca Transformers.

    Atributos:
        vocab_size (int): Tamanho do vocabulário do modelo.
        hidden_size (int): Dimensão dos vetores de estado oculto e embeddings.
        intermediate_size (int): Dimensão da camada intermediária no MLP.
        num_hidden_layers (int): Número de camadas de transformer no modelo.
        num_attention_heads (int): Número de cabeças de atenção em cada camada.
        num_key_value_heads (int): Número de cabeças para key e value (para atenção agrupada).
        hidden_act (str): Função de ativação usada no MLP.
        max_position_embeddings (int): Número máximo de posições para embeddings.
        initializer_range (float): Desvio padrão da distribuição normal para inicialização de pesos.
        rms_norm_eps (float): Epsilon usado na normalização RMS.
        use_cache (bool): Se deve usar cache para geração incremental.
        pad_token_id (int): ID do token de padding.
        bos_token_id (int): ID do token de início de sequência.
        eos_token_id (int): ID do token de fim de sequência.
        pretraining_tp (int): Grau de paralelismo de tensor usado no pré-treinamento.
        tie_word_embeddings (bool): Se deve compartilhar pesos entre embeddings de entrada e saída.
        rope_theta (float): Valor theta para RoPE (Rotary Position Embedding).
        rope_scaling (Dict[str, Any]): Configuração de escala para RoPE.
        attention_bias (bool): Se deve usar bias nos cálculos de atenção.
        attention_dropout (float): Taxa de dropout aplicada na camada de atenção.

    Exemplo:
        >>> config = LlamaConfig(
        ...     vocab_size=32000,
        ...     hidden_size=4096,
        ...     intermediate_size=11008,
        ...     num_hidden_layers=32,
        ...     num_attention_heads=32,
        ... )
        >>> print(config)
    """

    model_type = "llama"
    keys_to_ignore_at_inference = ["past_key_values"]

    def __init__(
        self,
        vocab_size: int = 32000,
        hidden_size: int = 4096,
        intermediate_size: int = 11008,
        num_hidden_layers: int = 32,
        num_attention_heads: int = 32,
        num_key_value_heads: Optional[int] = None,
        hidden_act: str = "silu",
        rotary_emb_base: float = 10000.0,
        rotary_emb_fraction: float = 1.0,
        max_position_embeddings: int = 2048,
        initializer_range: float = 0.02,
        rms_norm_eps: float = 1e-6,
        use_cache: bool = True,
        pad_token_id: int = -1,
        bos_token_id: int = 1,
        eos_token_id: int = 2,
        pretraining_tp: int = 1,
        tie_word_embeddings: bool = False,
        rope_theta: float = 10000.0,
        rope_scaling: Optional[Dict[str, Union[float, str]]] = None,
        attention_bias: bool = False,
        mlp_bias: bool = False,
        attention_dropout: float = 0.0,
        **kwargs
    ):
        self.vocab_size = vocab_size
        self.max_position_embeddings = max_position_embeddings
        self.hidden_size = hidden_size
        self.intermediate_size = intermediate_size
        self.num_hidden_layers = num_hidden_layers
        self.num_attention_heads = num_attention_heads
        self.num_key_value_heads = num_key_value_heads if num_key_value_heads is not None else num_attention_heads
        self.hidden_act = hidden_act
        self.rotary_emb_base = rotary_emb_base
        self.rotary_emb_fraction = rotary_emb_fraction
        self.initializer_range = initializer_range
        self.rms_norm_eps = rms_norm_eps
        self.pretraining_tp = pretraining_tp
        self.use_cache = use_cache
        self.rope_theta = rope_theta
        self.rope_scaling = rope_scaling
        self.attention_bias = attention_bias
        self.mlp_bias = mlp_bias
        self.attention_dropout = attention_dropout

        super().__init__(
            pad_token_id=pad_token_id,
            bos_token_id=bos_token_id,
            eos_token_id=eos_token_id,
            tie_word_embeddings=tie_word_embeddings,
            **kwargs
        )

    @property
    def head_dim(self) -> int:
        """
        Retorna a dimensão de cada cabeça de atenção.

        Returns:
            int: Dimensão de cada cabeça de atenção.
        """
        return self.hidden_size // self.num_attention_heads

    def to_dict(self) -> Dict[str, Any]:
        """
        Converte a configuração para um dicionário.

        Returns:
            Dict[str, Any]: Dicionário contendo todos os parâmetros da configuração.
        """
        output = super().to_dict()
        output["head_dim"] = self.head_dim
        return output

    def __repr__(self):
        return (f"LlamaConfig(vocab_size={self.vocab_size}, "
                f"hidden_size={self.hidden_size}, "
                f"intermediate_size={self.intermediate_size}, "
                f"num_hidden_layers={self.num_hidden_layers}, "
                f"num_attention_heads={self.num_attention_heads}, "
                f"max_position_embeddings={self.max_position_embeddings})")

### LLaMA Rotary Embedding

In [2]:
import torch
import torch.nn as nn
import math
from typing import Optional, Tuple

class LlamaRotaryEmbedding(nn.Module):
    def __init__(
        self,
        dim: int,  # Dimensão do embedding
        max_position_embeddings: int = 2048,  # Comprimento máximo da sequência
        base: float = 10000.0,  # Base para o cálculo das frequências
        device: Optional[torch.device] = None,
        rope_scaling: Optional[Dict[str, Union[float, str]]] = None
    ):
        super().__init__()
        self.dim = dim
        self.max_position_embeddings = max_position_embeddings
        self.base = base

        # Calcula as frequências inversas para RoPE
        # Dimensão: [dim/2]
        inv_freq = 1.0 / (base ** (torch.arange(0, dim, 2).float().to(device) / dim))
        self.register_buffer("inv_freq", inv_freq)

        if rope_scaling is not None:
            scaling_type = rope_scaling["type"]
            scaling_factor = rope_scaling["factor"]
            if scaling_type == "linear":
                self.seq_len_scaling = scaling_factor
            else:
                raise ValueError(f"Tipo de scaling desconhecido: {scaling_type}")
        else:
            self.seq_len_scaling = 1.0

        # Cache para sequência máxima
        self.max_seq_len_cached = max_position_embeddings

        # Para garantir compatibilidade com HF
        self._build_cache()

    def _build_cache(self):
        seq_len = self.max_seq_len_cached
        # Dimensão: [max_seq_len_cached]
        t = torch.arange(self.max_seq_len_cached, device=self.inv_freq.device, dtype=self.inv_freq.dtype)
        if self.seq_len_scaling != 1.0:
            t = t / self.seq_len_scaling

        # Dimensão: [max_seq_len_cached, dim/2]
        freqs = torch.einsum("i,j->ij", t, self.inv_freq)

        # Dimensão: [max_seq_len_cached, dim]
        emb = torch.cat((freqs, freqs), dim=-1)

        # Dimensão: [1, 1, max_seq_len_cached, dim]
        self.register_buffer("cos_cached", emb.cos()[None, None, :, :], persistent=False)
        self.register_buffer("sin_cached", emb.sin()[None, None, :, :], persistent=False)

    def forward(self, x: torch.Tensor, position_ids: Optional[torch.LongTensor] = None) -> Tuple[torch.Tensor, torch.Tensor]:
        """
        Calcula os embeddings rotacionais para as posições dadas.

        Args:
            x: Tensor de entrada (batch_size, seq_len, num_heads, head_dim)
            position_ids: IDs das posições (batch_size, seq_len)

        Returns:
            Tupla de tensores cosseno e seno para os embeddings rotacionais
        """
        if position_ids is None:
            # Se position_ids não for fornecido, assume-se sequência contínua
            # Dimensão: [seq_len]
            position_ids = torch.arange(x.shape[1], device=x.device)

        # Verifica se é necessário recalcular o cache para sequências mais longas
        if position_ids.max() >= self.max_seq_len_cached:
            self._update_cache(position_ids.max())

        # Seleciona os valores de cosseno e seno correspondentes às posições
        # Dimensão: [1, 1, seq_len, dim]
        cos = self.cos_cached[:, :, position_ids, :]
        sin = self.sin_cached[:, :, position_ids, :]

        return (cos.to(x.device), sin.to(x.device))


    def _update_cache(self, max_position: int):
        """
        Atualiza o cache de cossenos e senos para uma sequência mais longa.

        Args:
            max_position: Nova posição máxima a ser suportada
        """
        self.max_seq_len_cached = max_position
        # Dimensão: [max_seq_len_cached]
        t = torch.arange(self.max_seq_len_cached, device=self.inv_freq.device, dtype=self.inv_freq.dtype)
        # Dimensão: [max_seq_len_cached, dim/2]
        freqs = torch.einsum("i,j->ij", t, self.inv_freq)
        if self.rope_type == "linear":
            freqs = freqs * self.scaling_factor
        # Dimensão: [max_seq_len_cached, dim]
        emb = torch.cat((freqs, freqs), dim=-1)
        # Dimensão: [1, 1, max_seq_len_cached, dim]
        self.register_buffer("cos_cached", emb.cos()[None, None, :, :], persistent=False)
        self.register_buffer("sin_cached", emb.sin()[None, None, :, :], persistent=False)


def rotate_half(x: torch.Tensor) -> torch.Tensor:
    """
    Rotaciona metade das dimensões do tensor.
    Usado como parte do processo de aplicação do RoPE.

    Args:
        x: Tensor de entrada

    Returns:
        Tensor com metade das dimensões rotacionadas
    """
    x1, x2 = x.chunk(2, dim=-1)
    return torch.cat((-x2, x1), dim=-1)

def apply_rotary_pos_emb(q: torch.Tensor, k: torch.Tensor, cos: torch.Tensor, sin: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
    """
    Aplica os embeddings posicionais rotacionais aos tensores de query e key.

    Args:
        q: Tensor de query
        k: Tensor de key
        cos: Tensor de cossenos dos embeddings rotacionais
        sin: Tensor de senos dos embeddings rotacionais

    Returns:
        Tupla de tensores q e k com RoPE aplicado
    """
    q_embed = (q * cos) + (rotate_half(q) * sin)
    k_embed = (k * cos) + (rotate_half(k) * sin)
    return q_embed, k_embed

### LLaMA MLP

In [3]:
import torch
import torch.nn as nn
from typing import Optional

class LlamaMLP(nn.Module):
    """
    Implementa a camada de Perceptron Multicamadas (MLP) do modelo LLaMA.

    Esta classe realiza as transformações não-lineares nos estados ocultos do modelo,
    utilizando projeções lineares e uma função de ativação. Suporta implementações
    com e sem tensor parallelism.

    Atributos:
        config: Configuração do modelo LLaMA.
        hidden_size (int): Tamanho do espaço oculto de entrada e saída.
        intermediate_size (int): Tamanho do espaço intermediário onde ocorre a transformação principal.
        gate_proj (nn.Linear): Projeção linear para o mecanismo de gate.
        up_proj (nn.Linear): Projeção linear de expansão.
        down_proj (nn.Linear): Projeção linear de contração.
        act_fn (callable): Função de ativação não-linear.

    Args:
        config: Um objeto de configuração contendo os parâmetros do modelo.

    Exemplo:
        >>> config = LlamaConfig(hidden_size=768, intermediate_size=3072)
        >>> mlp = LlamaMLP(config)
        >>> input_tensor = torch.randn(1, 10, 768)  # [batch_size, seq_length, hidden_size]
        >>> output = mlp(input_tensor)
        >>> print(output.shape)
        torch.Size([1, 10, 768])
    """

    def __init__(self, config):
        super().__init__()
        self.config = config
        self.hidden_size = config.hidden_size
        self.intermediate_size = config.intermediate_size

        # Projeção de gate: hidden_size -> intermediate_size
        self.gate_proj = nn.Linear(self.hidden_size, self.intermediate_size, bias=config.mlp_bias)

        # Projeção up: hidden_size -> intermediate_size
        self.up_proj = nn.Linear(self.hidden_size, self.intermediate_size, bias=config.mlp_bias)

        # Projeção down: intermediate_size -> hidden_size
        self.down_proj = nn.Linear(self.intermediate_size, self.hidden_size, bias=config.mlp_bias)

        # Função de ativação (geralmente SiLU/Swish)
        self.act_fn = nn.SiLU()

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        Realiza a passagem forward da camada MLP.

        Args:
            x (torch.Tensor): Tensor de entrada com shape [batch_size, seq_length, hidden_size].

        Returns:
            torch.Tensor: Tensor de saída com shape [batch_size, seq_length, hidden_size].

        Raises:
            ValueError: Se as dimensões do tensor de entrada não forem compatíveis.
        """
        # Verificação das dimensões de entrada
        if x.dim() != 3 or x.size(-1) != self.hidden_size:
            raise ValueError(f"Entrada esperada de shape [batch_size, seq_length, {self.hidden_size}], "
                             f"mas recebeu {x.shape}")

        if self.config.pretraining_tp > 1:
            # Implementação para tensor parallelism (TP)
            slice = self.intermediate_size // self.config.pretraining_tp

            # Divide os pesos das projeções em fatias
            gate_proj_slices = self.gate_proj.weight.split(slice, dim=0)
            up_proj_slices = self.up_proj.weight.split(slice, dim=0)
            down_proj_slices = self.down_proj.weight.split(slice, dim=1)

            # Aplica as projeções em paralelo
            # Cada operação: [batch_size, seq_length, slice]
            gate_proj = torch.cat(
                [nn.functional.linear(x, gate_proj_slices[i]) for i in range(self.config.pretraining_tp)],
                dim=-1
            )
            up_proj = torch.cat(
                [nn.functional.linear(x, up_proj_slices[i]) for i in range(self.config.pretraining_tp)],
                dim=-1
            )

            # Aplica a função de ativação e multiplicação elemento a elemento
            # Dimensão: [batch_size, seq_length, intermediate_size]
            intermediate_states = (self.act_fn(gate_proj) * up_proj).split(slice, dim=2)

            # Aplica a projeção down em paralelo
            # Cada operação: [batch_size, seq_length, hidden_size // pretraining_tp]
            down_proj = [
                nn.functional.linear(intermediate_states[i], down_proj_slices[i])
                for i in range(self.config.pretraining_tp)
            ]

            # Soma os resultados das projeções down
            # Dimensão final: [batch_size, seq_length, hidden_size]
            down_proj = sum(down_proj)

        else:
            # Implementação padrão sem tensor parallelism

            # Aplica as projeções gate e up
            # Dimensões: [batch_size, seq_length, intermediate_size]
            gate_proj = self.gate_proj(x)
            up_proj = self.up_proj(x)

            # Aplica a função de ativação no gate e multiplica pelo resultado de up
            # Dimensão: [batch_size, seq_length, intermediate_size]
            intermediate_states = self.act_fn(gate_proj) * up_proj

            # Aplica a projeção down
            # Dimensão final: [batch_size, seq_length, hidden_size]
            down_proj = self.down_proj(intermediate_states)

        return down_proj

    def __repr__(self):
        return (f"LlamaMLP(hidden_size={self.hidden_size}, "
                f"intermediate_size={self.intermediate_size}, "
                f"act_fn={self.act_fn.__class__.__name__})")

### LLaMA RMS Norm

In [4]:
import torch
import torch.nn as nn

class LlamaRMSNorm(nn.Module):
    """
    LlamaRMSNorm é uma variante de normalização de camada utilizada no modelo LLaMA.

    Esta normalização usa a raiz quadrada da média dos quadrados (RMS) para normalizar
    os inputs, em vez da média e variância usadas na normalização de camada padrão.

    Atributos:
        weight (nn.Parameter): Parâmetro aprendível para escala.
        variance_epsilon (float): Pequeno valor adicionado ao denominador para estabilidade numérica.

    Args:
        hidden_size (int): Dimensão do espaço oculto a ser normalizado.
        eps (float, opcional): Epsilon para estabilidade numérica. Padrão é 1e-6.

    Forma do Input:
        - Input: (batch_size, seq_length, hidden_size)
        - Output: (batch_size, seq_length, hidden_size)

    Exemplo:
        >>> rms_norm = LlamaRMSNorm(hidden_size=768, eps=1e-6)
        >>> input_tensor = torch.randn(32, 50, 768)  # (batch_size, seq_length, hidden_size)
        >>> normalized_tensor = rms_norm(input_tensor)
        >>> print(normalized_tensor.shape)
        torch.Size([32, 50, 768])
    """

    def __init__(self, hidden_size: int, eps: float = 1e-6):
        super().__init__()
        self.weight = nn.Parameter(torch.ones(hidden_size))
        self.variance_epsilon = eps

    def forward(self, hidden_states: torch.Tensor) -> torch.Tensor:
        """
        Aplica a normalização RMS ao tensor de entrada.

        Args:
            hidden_states (torch.Tensor): Tensor de entrada a ser normalizado.
                Shape: (batch_size, seq_length, hidden_size)

        Returns:
            torch.Tensor: Tensor normalizado.
                Shape: (batch_size, seq_length, hidden_size)
        """
        input_dtype = hidden_states.dtype
        hidden_states = hidden_states.to(torch.float32)

        # Calcula a variância (média dos quadrados)
        variance = hidden_states.pow(2).mean(-1, keepdim=True)

        # Normaliza usando RMS
        hidden_states = hidden_states * torch.rsqrt(variance + self.variance_epsilon)

        # Aplica o peso aprendível
        return (self.weight * hidden_states).to(input_dtype)

    def extra_repr(self) -> str:
        """
        Retorna uma representação de string dos principais parâmetros.

        Returns:
            str: String representando os parâmetros do módulo.
        """
        return f"hidden_size={self.weight.numel()}, eps={self.variance_epsilon}"

### LLaMA Attention

#### Vanilla Attention

In [5]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
from typing import Optional, Tuple

class LlamaAttention(nn.Module):
    """
    Implementa o mecanismo de atenção multi-cabeça do modelo LLaMA.

    Esta classe realiza a operação de auto-atenção, permitindo que o modelo foque
    em diferentes partes da sequência de entrada. Suporta diferentes implementações
    de atenção e otimizações como grouped-query attention.

    Atributos:
        config: Configuração do modelo LLaMA.
        layer_idx (int): Índice da camada atual.
        hidden_size (int): Dimensão do espaço oculto.
        num_heads (int): Número de cabeças de atenção.
        head_dim (int): Dimensão de cada cabeça de atenção.
        num_key_value_heads (int): Número de cabeças para key e value (pode ser menor que num_heads).
        max_position_embeddings (int): Número máximo de posições para embeddings.
        rotary_emb (LlamaRotaryEmbedding): Instância para aplicar embeddings rotacionais.

    Args:
        config: Um objeto de configuração contendo os parâmetros do modelo.
        layer_idx (Optional[int]): Índice da camada. Necessário para algumas otimizações.

    Exemplo:
        >>> config = LlamaConfig(hidden_size=512, num_attention_heads=8)
        >>> attention = LlamaAttention(config, layer_idx=0)
        >>> hidden_states = torch.randn(1, 10, 512)  # [batch_size, seq_length, hidden_size]
        >>> attention_mask = torch.ones(1, 1, 10, 10)  # [batch_size, 1, seq_length, seq_length]
        >>> output, _ = attention(hidden_states, attention_mask=attention_mask)
        >>> print(output.shape)
        torch.Size([1, 10, 512])
    """

    def __init__(self, config):
        super().__init__()
        self.config = config

        self.hidden_size = config.hidden_size
        self.num_heads = config.num_attention_heads
        self.head_dim = self.hidden_size // self.num_heads
        self.num_key_value_heads = config.num_key_value_heads
        self.num_key_value_groups = self.num_heads // self.num_key_value_heads
        self.max_position_embeddings = config.max_position_embeddings

        # Verifica se as dimensões são compatíveis
        if self.head_dim * self.num_heads != self.hidden_size:
            raise ValueError(f"hidden_size deve ser divisível por num_heads. "
                             f"Got {self.hidden_size} e {self.num_heads}.")

        # Projections para query, key, value e output
        self.q_proj = nn.Linear(self.hidden_size, self.num_heads * self.head_dim, bias=config.attention_bias)
        self.k_proj = nn.Linear(self.hidden_size, self.num_key_value_heads * self.head_dim, bias=config.attention_bias)
        self.v_proj = nn.Linear(self.hidden_size, self.num_key_value_heads * self.head_dim, bias=config.attention_bias)
        self.o_proj = nn.Linear(self.num_heads * self.head_dim, self.hidden_size, bias=config.attention_bias)

        self.rotary_emb = LlamaRotaryEmbedding(
            self.head_dim,
            max_position_embeddings=self.max_position_embeddings,
            base=config.rotary_emb_base,
            rope_scaling=config.rope_scaling
        )

    def _shape(self, tensor: torch.Tensor, seq_len: int, bsz: int):
        """Reshape and transpose tensor for attention computation."""
        return tensor.view(bsz, seq_len, self.num_heads, self.head_dim).transpose(1, 2).contiguous()

    def forward(
        self,
        hidden_states: torch.Tensor,
        attention_mask: Optional[torch.Tensor] = None,
        position_ids: Optional[torch.LongTensor] = None,
        past_key_value: Optional[Tuple[torch.Tensor]] = None,
        output_attentions: bool = False,
        use_cache: bool = False,
    ) -> Tuple[torch.Tensor, Optional[torch.Tensor], Optional[Tuple[torch.Tensor]]]:
        """
        Realiza a passagem forward do mecanismo de atenção.

        Args:
            hidden_states (torch.Tensor): Estados ocultos de entrada. Shape [batch_size, seq_length, hidden_size]
            attention_mask (Optional[torch.Tensor]): Máscara de atenção. Shape [batch_size, 1, tgt_seq_length, src_seq_length]
            position_ids (Optional[torch.LongTensor]): IDs das posições. Shape [batch_size, seq_length]
            past_key_value (Optional[Tuple[torch.Tensor]]): Cache de estados passados para geração autoregressiva.
            output_attentions (bool): Se True, retorna os pesos de atenção.
            use_cache (bool): Se True, retorna o cache para uso futuro.

        Returns:
            Tuple[torch.Tensor, Optional[torch.Tensor], Optional[Tuple[torch.Tensor]]]:
                - Estados ocultos atualizados
                - Pesos de atenção (opcional)
                - Novo cache de estados (opcional)
        """
        bsz, q_len, _ = hidden_states.size()

        # Calcula query, key, value
        query_states = self.q_proj(hidden_states).view(bsz, q_len, self.num_heads, self.head_dim).transpose(1, 2)
        key_states = self.k_proj(hidden_states).view(bsz, q_len, self.num_key_value_heads, self.head_dim).transpose(1, 2)
        value_states = self.v_proj(hidden_states).view(bsz, q_len, self.num_key_value_heads, self.head_dim).transpose(1, 2)

        # [batch_size, num_heads, seq_length, head_dim]

        # Aplica RoPE (Rotary Position Embedding)
        query_states, key_states = self.rotary_emb(query_states, key_states, position_ids)

        # Lida com o cache de estados passados para geração autoregressiva
        if past_key_value is not None:
            key_states = torch.cat([past_key_value[0], key_states], dim=2)
            value_states = torch.cat([past_key_value[1], value_states], dim=2)

        past_key_value = (key_states, value_states) if use_cache else None

        # Repete key e value para cada grupo de query em grouped-query attention
        key_states = repeat_kv(key_states, self.num_key_value_groups)
        value_states = repeat_kv(value_states, self.num_key_value_groups)

        # [batch_size, num_heads, seq_length, head_dim]

        # Calcula os scores de atenção
        attn_weights = torch.matmul(query_states, key_states.transpose(2, 3)) / math.sqrt(self.head_dim)

        # [batch_size, num_heads, seq_length, seq_length]

        if attention_mask is not None:
            attn_weights = attn_weights + attention_mask

        # Normaliza os pesos de atenção
        attn_weights = F.softmax(attn_weights, dim=-1, dtype=torch.float32).to(query_states.dtype)

        # Calcula o output da atenção
        attn_output = torch.matmul(attn_weights, value_states)

        # [batch_size, num_heads, seq_length, head_dim]

        attn_output = attn_output.transpose(1, 2).contiguous()
        attn_output = attn_output.reshape(bsz, q_len, self.hidden_size)

        # [batch_size, seq_length, hidden_size]

        # Projeção final
        attn_output = self.o_proj(attn_output)

        if not output_attentions:
            attn_weights = None

        return attn_output, attn_weights, past_key_value

def repeat_kv(hidden_states: torch.Tensor, n_rep: int) -> torch.Tensor:
    """
    Repete os estados de key e value para grouped-query attention.

    Args:
        hidden_states (torch.Tensor): Estados de entrada [batch, num_key_value_heads, seqlen, head_dim]
        n_rep (int): Número de repetições

    Returns:
        torch.Tensor: Estados repetidos [batch, num_attention_heads, seqlen, head_dim]
    """
    batch, num_key_value_heads, slen, head_dim = hidden_states.shape
    if n_rep == 1:
        return hidden_states
    hidden_states = hidden_states[:, :, None, :, :].expand(batch, num_key_value_heads, n_rep, slen, head_dim)
    return hidden_states.reshape(batch, num_key_value_heads * n_rep, slen, head_dim)

#### Sdpa Attention

In [6]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import Optional, Tuple
import warnings

class LlamaSdpaAttention(nn.Module):
    """
    Implementação otimizada do mecanismo de atenção do LLaMA usando
    scaled_dot_product_attention (SDPA) do PyTorch.

    Esta classe implementa a atenção multi-cabeça com suporte a
    Rotary Position Embedding (RoPE) e atenção agrupada.

    Atributos:
        config: Configuração do modelo LLaMA.
        layer_idx (int): Índice da camada atual.
        hidden_size (int): Dimensão do espaço oculto.
        num_heads (int): Número de cabeças de atenção.
        head_dim (int): Dimensão de cada cabeça de atenção.
        num_key_value_heads (int): Número de cabeças para key e value (pode ser menor que num_heads).
        max_position_embeddings (int): Número máximo de posições para embeddings.
        rotary_emb (LlamaRotaryEmbedding): Instância para aplicar embeddings rotacionais.

    Args:
        config: Configuração do modelo LLaMA.
        layer_idx (Optional[int]): Índice da camada. Necessário para algumas otimizações.
    """

    def __init__(self, config, layer_idx: Optional[int] = None):
        super().__init__()
        self.config = config
        self.layer_idx = layer_idx

        self.hidden_size = config.hidden_size
        self.num_heads = config.num_attention_heads
        self.head_dim = self.hidden_size // self.num_heads
        self.num_key_value_heads = config.num_key_value_heads
        self.num_key_value_groups = self.num_heads // self.num_key_value_heads
        self.max_position_embeddings = config.max_position_embeddings

        if (self.head_dim * self.num_heads) != self.hidden_size:
            raise ValueError(
                f"hidden_size deve ser divisível por num_heads. "
                f"Got {self.hidden_size} e {self.num_heads}."
            )

        # Projeções lineares para Q, K, V e O
        self.q_proj = nn.Linear(self.hidden_size, self.num_heads * self.head_dim, bias=config.attention_bias)
        self.k_proj = nn.Linear(self.hidden_size, self.num_key_value_heads * self.head_dim, bias=config.attention_bias)
        self.v_proj = nn.Linear(self.hidden_size, self.num_key_value_heads * self.head_dim, bias=config.attention_bias)
        self.o_proj = nn.Linear(self.num_heads * self.head_dim, self.hidden_size, bias=config.attention_bias)

        self.rotary_emb = LlamaRotaryEmbedding(
            self.head_dim,
            max_position_embeddings=self.max_position_embeddings,
            base=config.rope_theta,
        )

    def forward(
        self,
        hidden_states: torch.Tensor,
        attention_mask: Optional[torch.Tensor] = None,
        position_ids: Optional[torch.LongTensor] = None,
        past_key_value: Optional[Tuple[torch.Tensor]] = None,
        output_attentions: bool = False,
        use_cache: bool = False,
    ) -> Tuple[torch.Tensor, Optional[torch.Tensor], Optional[Tuple[torch.Tensor]]]:
        """
        Realiza a passagem forward do mecanismo de atenção.

        Args:
            hidden_states (torch.Tensor): Estados ocultos de entrada.
                Shape: [batch_size, seq_length, hidden_size]
            attention_mask (Optional[torch.Tensor]): Máscara de atenção.
                Shape: [batch_size, 1, tgt_seq_length, src_seq_length]
            position_ids (Optional[torch.LongTensor]): IDs das posições.
                Shape: [batch_size, seq_length]
            past_key_value (Optional[Tuple[torch.Tensor]]): Cache de estados passados.
            output_attentions (bool): Se True, retorna os pesos de atenção.
            use_cache (bool): Se True, retorna o cache para uso futuro.

        Returns:
            Tuple[torch.Tensor, Optional[torch.Tensor], Optional[Tuple[torch.Tensor]]]:
                - Estados ocultos atualizados
                - Pesos de atenção (opcional)
                - Novo cache de estados (opcional)
        """
        # Obtém as dimensões do tensor de entrada
        # hidden_states shape: [batch_size, seq_length, hidden_size]
        bsz, q_len, _ = hidden_states.size()

        if self.config.pretraining_tp > 1:
            # Implementação para tensor parallelism
            key_value_slicing = (self.num_key_value_heads * self.head_dim) // self.config.pretraining_tp
            query_slices = self.q_proj.weight.split((self.num_heads * self.head_dim) // self.config.pretraining_tp, dim=0)
            key_slices = self.k_proj.weight.split(key_value_slicing, dim=0)
            value_slices = self.v_proj.weight.split(key_value_slicing, dim=0)

            query_states = [F.linear(hidden_states, query_slices[i]) for i in range(self.config.pretraining_tp)]
            query_states = torch.cat(query_states, dim=-1)

            key_states = [F.linear(hidden_states, key_slices[i]) for i in range(self.config.pretraining_tp)]
            key_states = torch.cat(key_states, dim=-1)

            value_states = [F.linear(hidden_states, value_slices[i]) for i in range(self.config.pretraining_tp)]
            value_states = torch.cat(value_states, dim=-1)

        else:
            # Projeções Q, K, V padrão
            # Resultado: [batch_size, seq_length, num_heads * head_dim]
            query_states = self.q_proj(hidden_states)
            key_states = self.k_proj(hidden_states)
            value_states = self.v_proj(hidden_states)

        # Reshape e transpõe Q, K, V
        # Resultado: [batch_size, num_heads, seq_length, head_dim]
        query_states = query_states.view(bsz, q_len, self.num_heads, self.head_dim).transpose(1, 2)
        key_states = key_states.view(bsz, q_len, self.num_key_value_heads, self.head_dim).transpose(1, 2)
        value_states = value_states.view(bsz, q_len, self.num_key_value_heads, self.head_dim).transpose(1, 2)

        # Calcula os embeddings rotacionais
        # cos e sin: [1, seq_length, head_dim]
        cos, sin = self.rotary_emb(value_states, seq_len=q_len)

        # Aplica RoPE (Rotary Position Embedding) a Q e K
        # query_states, key_states: [batch_size, num_heads, seq_length, head_dim]
        query_states, key_states = apply_rotary_pos_emb(query_states, key_states, cos, sin, position_ids)

        # Lida com o cache de estados passados para geração autoregressiva
        if past_key_value is not None:
            # Concatena estados passados com os atuais
            # key_states, value_states: [batch_size, num_heads, seq_length + past_length, head_dim]
            key_states = torch.cat([past_key_value[0], key_states], dim=2)
            value_states = torch.cat([past_key_value[1], value_states], dim=2)

        # Prepara o cache para a próxima iteração se necessário
        past_key_value = (key_states, value_states) if use_cache else None

        # Repete K e V para atenção agrupada (grouped-query attention)
        # key_states, value_states: [batch_size, num_heads, seq_length, head_dim]
        key_states = repeat_kv(key_states, self.num_key_value_groups)
        value_states = repeat_kv(value_states, self.num_key_value_groups)

        # Aplica a atenção usando scaled_dot_product_attention
        # attn_output: [batch_size, num_heads, seq_length, head_dim]
        attn_output = F.scaled_dot_product_attention(
            query_states, key_states, value_states,
            attn_mask=attention_mask,
            dropout_p=self.config.attention_dropout if self.training else 0.0,
            is_causal=False
        )

        # Reorganiza o tensor de saída
        # attn_output: [batch_size, seq_length, num_heads * head_dim]
        attn_output = attn_output.transpose(1, 2).contiguous()
        attn_output = attn_output.reshape(bsz, q_len, self.hidden_size)

        # Aplica a projeção de saída
        # attn_output: [batch_size, seq_length, hidden_size]
        attn_output = self.o_proj(attn_output)

        if output_attentions:
            warnings.warn("output_attentions=True não é suportado para SDPA no momento.")
            attn_weights = None
        else:
            attn_weights = None

        return attn_output, attn_weights, past_key_value

#### Flash Attention 2

In [7]:
!pip install flash_attn --quiet

In [8]:
import torch
import torch.nn as nn
from typing import Optional, Tuple
from flash_attn import flash_attn_func, flash_attn_varlen_func
import warnings

class LlamaFlashAttention2(nn.Module):
    """
    Implementação do mecanismo de atenção do LLaMA usando Flash Attention 2.
    Esta versão é otimizada para eficiência em memória e velocidade.
    """

    def __init__(self, config, layer_idx: Optional[int] = None):
        super().__init__()
        self.config = config
        self.layer_idx = layer_idx

        self.hidden_size = config.hidden_size
        self.num_heads = config.num_attention_heads
        self.head_dim = self.hidden_size // self.num_heads
        self.num_key_value_heads = config.num_key_value_heads
        self.num_key_value_groups = self.num_heads // self.num_key_value_heads
        self.max_position_embeddings = config.max_position_embeddings

        if (self.head_dim * self.num_heads) != self.hidden_size:
            raise ValueError(f"hidden_size deve ser divisível por num_heads.")

        # Inicializa as projeções lineares para Q, K, V e O
        self.q_proj = nn.Linear(self.hidden_size, self.num_heads * self.head_dim, bias=config.attention_bias)
        self.k_proj = nn.Linear(self.hidden_size, self.num_key_value_heads * self.head_dim, bias=config.attention_bias)
        self.v_proj = nn.Linear(self.hidden_size, self.num_key_value_heads * self.head_dim, bias=config.attention_bias)
        self.o_proj = nn.Linear(self.num_heads * self.head_dim, self.hidden_size, bias=config.attention_bias)

        self.rotary_emb = LlamaRotaryEmbedding(
            self.head_dim,
            max_position_embeddings=self.max_position_embeddings,
            base=config.rope_theta,
        )

    def forward(
        self,
        hidden_states: torch.Tensor,
        attention_mask: Optional[torch.Tensor] = None,
        position_ids: Optional[torch.LongTensor] = None,
        past_key_value: Optional[Tuple[torch.Tensor]] = None,
        output_attentions: bool = False,
        use_cache: bool = False,
    ) -> Tuple[torch.Tensor, Optional[torch.Tensor], Optional[Tuple[torch.Tensor]]]:
        # hidden_states shape: [batch_size, seq_length, hidden_size]
        bsz, q_len, _ = hidden_states.size()

        # Aplica as projeções lineares para Q, K, V
        # Shapes: [batch_size, seq_length, num_heads * head_dim]
        query_states = self.q_proj(hidden_states)
        key_states = self.k_proj(hidden_states)
        value_states = self.v_proj(hidden_states)

        # Reshape e transpõe para [batch_size, num_heads, seq_length, head_dim]
        query_states = query_states.view(bsz, q_len, self.num_heads, self.head_dim).transpose(1, 2)
        key_states = key_states.view(bsz, q_len, self.num_key_value_heads, self.head_dim).transpose(1, 2)
        value_states = value_states.view(bsz, q_len, self.num_key_value_heads, self.head_dim).transpose(1, 2)

        # Calcula os embeddings rotacionais
        cos, sin = self.rotary_emb(value_states, seq_len=q_len)
        # Aplica RoPE (Rotary Position Embedding)
        query_states, key_states = apply_rotary_pos_emb(query_states, key_states, cos, sin, position_ids)

        # Lida com o cache de estados passados para geração autoregressiva
        if past_key_value is not None:
            key_states = torch.cat([past_key_value[0], key_states], dim=2)
            value_states = torch.cat([past_key_value[1], value_states], dim=2)
        past_key_value = (key_states, value_states) if use_cache else None

        # Repete K e V para atenção agrupada (grouped-query attention)
        key_states = repeat_kv(key_states, self.num_key_value_groups)
        value_states = repeat_kv(value_states, self.num_key_value_groups)

        # Prepara os tensores para Flash Attention
        q, k, v = query_states, key_states, value_states

        # Converte q, k, v para o formato esperado por Flash Attention
        # [batch_size, seq_length, num_heads, head_dim]
        q = q.transpose(1, 2)
        k = k.transpose(1, 2)
        v = v.transpose(1, 2)

        # Aplica Flash Attention
        if attention_mask is None:
            # Usa a versão padrão do Flash Attention quando não há máscara
            attn_output = flash_attn_func(q, k, v, dropout_p=self.config.attention_dropout if self.training else 0.0, causal=True)
        else:
            # Usa a versão com comprimento variável quando há máscara
            attn_output, _ = flash_attn_varlen_func(
                q, k, v,
                cu_seqlens_q=attention_mask,
                cu_seqlens_k=attention_mask,
                max_seqlen_q=q_len,
                max_seqlen_k=q_len,
                dropout_p=self.config.attention_dropout if self.training else 0.0,
                causal=True
            )

        # Reshape e aplica a projeção de saída
        # [batch_size, seq_length, hidden_size]
        attn_output = attn_output.reshape(bsz, q_len, self.hidden_size)
        attn_output = self.o_proj(attn_output)

        if output_attentions:
            warnings.warn("output_attentions=True não é suportado para Flash Attention.")
            attn_weights = None
        else:
            attn_weights = None

        return attn_output, attn_weights, past_key_value

### LLaMA Decoder Layer

In [9]:
import torch
import torch.nn as nn
from typing import Optional, Tuple

class LlamaDecoderLayer(nn.Module):
    """
    Implementa uma camada do decodificador do modelo LLaMA.

    Esta classe combina os mecanismos de atenção e feed-forward network (MLP),
    formando um bloco completo do transformer decodificador. Inclui normalizações
    de camada e conexões residuais.

    Atributos:
        hidden_size (int): Dimensão do espaço oculto.
        self_attn (LlamaAttention): Mecanismo de auto-atenção.
        mlp (LlamaMLP): Rede feed-forward.
        input_layernorm (LlamaRMSNorm): Normalização de camada para entrada.
        post_attention_layernorm (LlamaRMSNorm): Normalização após a atenção.

    Args:
        config (LlamaConfig): Configuração do modelo LLaMA.
        layer_idx (int): Índice da camada atual.

    Exemplo:
        >>> config = LlamaConfig(hidden_size=512, intermediate_size=2048, num_attention_heads=8)
        >>> layer = LlamaDecoderLayer(config, layer_idx=0)
        >>> hidden_states = torch.randn(1, 10, 512)  # [batch_size, seq_length, hidden_size]
        >>> attention_mask = torch.ones(1, 1, 10, 10)  # [batch_size, 1, seq_length, seq_length]
        >>> outputs = layer(hidden_states, attention_mask=attention_mask)
        >>> print(outputs[0].shape)
        torch.Size([1, 10, 512])
    """

    def __init__(self, config: LlamaConfig):
        super().__init__()
        self.hidden_size = config.hidden_size
        self.self_attn = LlamaAttention(config=config)
        self.mlp = LlamaMLP(config)
        self.input_layernorm = LlamaRMSNorm(config.hidden_size, eps=config.rms_norm_eps)
        self.post_attention_layernorm = LlamaRMSNorm(config.hidden_size, eps=config.rms_norm_eps)

    def forward(
        self,
        hidden_states: torch.Tensor,
        attention_mask: Optional[torch.Tensor] = None,
        position_ids: Optional[torch.LongTensor] = None,
        past_key_value: Optional[Tuple[torch.Tensor]] = None,
        output_attentions: Optional[bool] = False,
        use_cache: Optional[bool] = False,
    ) -> Tuple[torch.FloatTensor, Optional[Tuple[torch.FloatTensor, torch.FloatTensor]]]:
        """
        Realiza a passagem forward de uma camada do decodificador.

        Args:
            hidden_states (torch.Tensor): Estados ocultos de entrada.
                Shape [batch_size, seq_length, hidden_size]
            attention_mask (Optional[torch.Tensor]): Máscara de atenção.
                Shape [batch_size, 1, tgt_seq_length, src_seq_length]
            position_ids (Optional[torch.LongTensor]): IDs das posições.
                Shape [batch_size, seq_length]
            past_key_value (Optional[Tuple[torch.Tensor]]): Cache de estados passados para
                geração autoregressiva.
            output_attentions (Optional[bool]): Se True, retorna os pesos de atenção.
            use_cache (Optional[bool]): Se True, retorna o cache para uso futuro.

        Returns:
            Tuple[torch.FloatTensor, Optional[Tuple[torch.FloatTensor, torch.FloatTensor]]]:
                - Estados ocultos atualizados
                - Tupla contendo os novos cache de estados (se use_cache=True)
        """
        # Shape de hidden_states: [batch_size, seq_length, hidden_size]
        residual = hidden_states

        # Normalização de camada na entrada
        hidden_states = self.input_layernorm(hidden_states)
        # Shape após normalização: [batch_size, seq_length, hidden_size]

        # Self Attention
        hidden_states, self_attn_weights, present_key_value = self.self_attn(
            hidden_states=hidden_states,
            attention_mask=attention_mask,
            position_ids=position_ids,
            past_key_value=past_key_value,
            output_attentions=output_attentions,
            use_cache=use_cache,
        )
        # Shape após atenção: [batch_size, seq_length, hidden_size]

        # Conexão residual após a atenção
        hidden_states = residual + hidden_states
        # Shape após conexão residual: [batch_size, seq_length, hidden_size]

        # Normalização de camada após a atenção
        residual = hidden_states
        hidden_states = self.post_attention_layernorm(hidden_states)
        # Shape após normalização: [batch_size, seq_length, hidden_size]

        # MLP (Feed-Forward Network)
        hidden_states = self.mlp(hidden_states)
        # Shape após MLP: [batch_size, seq_length, hidden_size]

        # Conexão residual após o MLP
        hidden_states = residual + hidden_states
        # Shape final: [batch_size, seq_length, hidden_size]

        outputs = (hidden_states,)

        if output_attentions:
            outputs += (self_attn_weights,)

        if use_cache:
            outputs += (present_key_value,)

        return outputs

    def __repr__(self):
        return f"LlamaDecoderLayer(hidden_size={self.hidden_size})"

### LLaMA Pre-Trained Model

In [10]:
from transformers import PreTrainedModel
from transformers.modeling_utils import PretrainedConfig
from typing import Union, Optional
import torch
import torch.nn as nn

class LlamaPreTrainedModel(PreTrainedModel):
    """
    Classe base abstrata para modelos pré-treinados LLaMA.

    Esta classe herda de `PreTrainedModel` e implementa funcionalidades específicas
    para modelos LLaMA, incluindo inicialização de pesos e configurações de otimização.

    Atributos:
        config_class (Type[PretrainedConfig]): Classe de configuração para modelos LLaMA.
        base_model_prefix (str): Prefixo usado para nomear o modelo base.
        supports_gradient_checkpointing (bool): Indica suporte a checkpointing de gradiente.
        _no_split_modules (List[str]): Lista de módulos que não devem ser divididos durante
                                       o processamento paralelo.

    Exemplo:
        >>> from transformers import LlamaConfig
        >>> class MyLlamaModel(LlamaPreTrainedModel):
        ...     def __init__(self, config):
        ...         super().__init__(config)
        ...         # Implementação do modelo
        ...
        >>> config = LlamaConfig()
        >>> model = MyLlamaModel(config)
    """

    config_class = LlamaConfig
    base_model_prefix = "model"
    supports_gradient_checkpointing = True
    _no_split_modules = ["LlamaDecoderLayer"]
    _skip_keys_device_placement = ["past_key_values"]
    _supports_flash_attn_2 = True
    _supports_sdpa = True
    _supports_cache_class = True
    _supports_quantized_cache = True
    _supports_static_cache = True

    def __init__(self, config: LlamaConfig, *inputs, **kwargs):
        super().__init__(config, *inputs, **kwargs)

    def _init_weights(self, module: nn.Module):
        """
        Inicializa os pesos do módulo.

        Esta função é chamada para cada submódulo durante a inicialização do modelo.
        Implementa a estratégia de inicialização de pesos específica para modelos LLaMA.

        Args:
            module (nn.Module): O módulo cujos pesos serão inicializados.
        """
        std = self.config.initializer_range
        if isinstance(module, nn.Linear):
            module.weight.data.normal_(mean=0.0, std=std)
            if module.bias is not None:
                module.bias.data.zero_()
        elif isinstance(module, nn.Embedding):
            module.weight.data.normal_(mean=0.0, std=std)
            if module.padding_idx is not None:
                module.weight.data[module.padding_idx].zero_()

    def _set_gradient_checkpointing(self, module: nn.Module, value: bool = False):
        """
        Configura o checkpointing de gradiente para o módulo.

        O checkpointing de gradiente pode ser usado para economizar memória durante o treinamento,
        recalculando os gradientes durante a passagem backward em vez de armazená-los.

        Args:
            module (nn.Module): O módulo para configurar o checkpointing.
            value (bool): Se True, ativa o checkpointing de gradiente.
        """
        if isinstance(module, (LlamaDecoderLayer, LlamaModel)):
            module.gradient_checkpointing = value

    def gradient_checkpointing_enable(self):
        """
        Ativa o checkpointing de gradiente para todo o modelo.
        """
        self.apply(lambda module: self._set_gradient_checkpointing(module, value=True))

    def gradient_checkpointing_disable(self):
        """
        Desativa o checkpointing de gradiente para todo o modelo.
        """
        self.apply(lambda module: self._set_gradient_checkpointing(module, value=False))

    def enable_input_require_grads(self):
        """
        Configura o modelo para permitir gradientes nos inputs.

        Isso é necessário para técnicas como adversarial training.
        """
        def make_inputs_require_grads(module, input, output):
            output.requires_grad_(True)

        self._require_grads_hook = self.get_input_embeddings().register_forward_hook(make_inputs_require_grads)

    def disable_input_require_grads(self):
        """
        Remove a configuração que permite gradientes nos inputs.
        """
        self._require_grads_hook.remove()

    def get_position_embeddings(self) -> Optional[Union[nn.Embedding, torch.Tensor]]:
        """
        Retorna as embeddings de posição do modelo, se existirem.

        Returns:
            Optional[Union[nn.Embedding, torch.Tensor]]: As embeddings de posição ou None.
        """
        if hasattr(self, "rotary_emb"):
            return self.rotary_emb
        return None

    def resize_position_embeddings(self, new_num_position_embeddings: int):
        """
        Redimensiona as embeddings de posição do modelo.

        Args:
            new_num_position_embeddings (int): O novo número de posições.

        Raises:
            NotImplementedError: Esta funcionalidade não está implementada para modelos LLaMA.
        """
        raise NotImplementedError(
            f"{self.__class__.__name__} não suporta o redimensionamento das embeddings de posição."
        )

    def get_output_embeddings(self) -> Optional[nn.Module]:
        """
        Retorna as embeddings de saída do modelo, se existirem.

        Returns:
            Optional[nn.Module]: As embeddings de saída ou None.
        """
        return None  # LLaMA não usa embeddings de saída por padrão

    def set_output_embeddings(self, new_embeddings: Optional[nn.Module]):
        """
        Define novas embeddings de saída para o modelo.

        Args:
            new_embeddings (Optional[nn.Module]): As novas embeddings de saída.

        Raises:
            NotImplementedError: Esta funcionalidade não está implementada para modelos LLaMA.
        """
        raise NotImplementedError(
            f"{self.__class__.__name__} não suporta a mudança das embeddings de saída."
        )

### LLaMA Model

In [11]:
import torch
import torch.nn as nn
from typing import Optional, Tuple, Union, List
from transformers.modeling_outputs import BaseModelOutputWithPast

class LlamaModel(LlamaPreTrainedModel):
    """
    Modelo base LLaMA.

    Esta classe implementa a estrutura principal do modelo LLaMA, incluindo
    as camadas de embedding, as camadas do decodificador e a normalização final.

    Atributos:
        config (LlamaConfig): Configuração do modelo.
        padding_idx (int): Índice do token de padding.
        vocab_size (int): Tamanho do vocabulário.
        embed_tokens (nn.Embedding): Camada de embedding para tokens.
        layers (nn.ModuleList): Lista de camadas do decodificador.
        norm (LlamaRMSNorm): Camada de normalização final.
        gradient_checkpointing (bool): Se o checkpointing de gradiente está ativado.

    Args:
        config (LlamaConfig): Configuração do modelo LLaMA.

    Exemplo:
        >>> from transformers import LlamaConfig
        >>> config = LlamaConfig()
        >>> model = LlamaModel(config)
        >>> input_ids = torch.randint(0, config.vocab_size, (1, 10))
        >>> outputs = model(input_ids)
        >>> last_hidden_states = outputs.last_hidden_state
    """

    def __init__(self, config: LlamaConfig):
        super().__init__(config)
        self.padding_idx = config.pad_token_id
        self.vocab_size = config.vocab_size

        self.embed_tokens = nn.Embedding(config.vocab_size, config.hidden_size, self.padding_idx)
        self.layers = nn.ModuleList([LlamaDecoderLayer(config) for _i1 in range(config.num_hidden_layers)])
        self.norm = LlamaRMSNorm(config.hidden_size, eps=config.rms_norm_eps)

        self.gradient_checkpointing = False
        # Initialize weights and apply final processing
        self.post_init()

    def get_input_embeddings(self):
        return self.embed_tokens

    def set_input_embeddings(self, value):
        self.embed_tokens = value

    def forward(
        self,
        input_ids: Optional[torch.LongTensor] = None,
        attention_mask: Optional[torch.Tensor] = None,
        position_ids: Optional[torch.LongTensor] = None,
        past_key_values: Optional[List[torch.FloatTensor]] = None,
        inputs_embeds: Optional[torch.FloatTensor] = None,
        use_cache: Optional[bool] = None,
        output_attentions: Optional[bool] = None,
        output_hidden_states: Optional[bool] = None,
        return_dict: Optional[bool] = None,
    ) -> Union[Tuple, BaseModelOutputWithPast]:
        """
        Realiza a passagem forward do modelo LLaMA.

        Args:
            input_ids (Optional[torch.LongTensor]): Tensor de IDs de tokens de entrada.
                Shape: (batch_size, sequence_length)
            attention_mask (Optional[torch.Tensor]): Máscara de atenção para os tokens de entrada.
                Shape: (batch_size, sequence_length)
            position_ids (Optional[torch.LongTensor]): IDs de posição para os tokens de entrada.
                Shape: (batch_size, sequence_length)
            past_key_values (Optional[List[torch.FloatTensor]]): Lista de tensores contendo estados passados
                para uso em geração incremental.
            inputs_embeds (Optional[torch.FloatTensor]): Embeddings pré-computados para substituir input_ids.
                Shape: (batch_size, sequence_length, hidden_size)
            use_cache (Optional[bool]): Se deve retornar um cache para geração incremental.
            output_attentions (Optional[bool]): Se deve retornar todas as atenções.
            output_hidden_states (Optional[bool]): Se deve retornar todos os estados ocultos.
            return_dict (Optional[bool]): Se deve retornar um dicionário ao invés de uma tupla.

        Returns:
            Union[Tuple, BaseModelOutputWithPast]: Saída do modelo, incluindo últimos estados ocultos,
                past_key_values (se use_cache=True), e opcionalmente todos os estados ocultos e atenções.
        """
        output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions
        output_hidden_states = (
            output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states
        )
        use_cache = use_cache if use_cache is not None else self.config.use_cache
        return_dict = return_dict if return_dict is not None else self.config.use_return_dict

        # Recuperar embeddings de entrada
        if input_ids is not None and inputs_embeds is not None:
            raise ValueError("Você não pode especificar tanto input_ids quanto inputs_embeds ao mesmo tempo")
        elif input_ids is not None:
            # input_ids shape: (batch_size, sequence_length)
            batch_size, seq_length = input_ids.shape
            inputs_embeds = self.embed_tokens(input_ids)
            # inputs_embeds shape: (batch_size, sequence_length, hidden_size)
        elif inputs_embeds is not None:
            batch_size, seq_length, _ = inputs_embeds.shape
        else:
            raise ValueError("Você deve especificar ou input_ids ou inputs_embeds")

        # Gerar position_ids se não fornecidos
        if position_ids is None:
            # position_ids shape: (batch_size, sequence_length)
            position_ids = torch.arange(seq_length, dtype=torch.long, device=inputs_embeds.device)
            position_ids = position_ids.unsqueeze(0).expand(batch_size, -1)

        # Gerar máscara de atenção se não fornecida
        if attention_mask is None:
            # attention_mask shape: (batch_size, sequence_length)
            attention_mask = torch.ones((batch_size, seq_length), device=inputs_embeds.device)

        # Converter máscara de atenção para o formato correto (expandido para todas as cabeças)
        # extended_attention_mask shape: (batch_size, 1, 1, sequence_length)
        extended_attention_mask = self.get_extended_attention_mask(attention_mask, (batch_size, seq_length))

        # hidden_states shape: (batch_size, sequence_length, hidden_size)
        hidden_states = inputs_embeds
        all_hidden_states = () if output_hidden_states else None
        all_self_attentions = () if output_attentions else None
        next_decoder_cache = () if use_cache else None

        for idx, decoder_layer in enumerate(self.layers):
            if output_hidden_states:
                all_hidden_states += (hidden_states,)

            past_key_value = past_key_values[idx] if past_key_values is not None else None

            if self.gradient_checkpointing and self.training:
                layer_outputs = self._gradient_checkpointing_func(
                    decoder_layer.__call__,
                    hidden_states,
                    extended_attention_mask,
                    position_ids,
                    past_key_value,
                    output_attentions,
                    use_cache,
                )
            else:
                layer_outputs = decoder_layer(
                    hidden_states,
                    attention_mask=extended_attention_mask,
                    position_ids=position_ids,
                    past_key_value=past_key_value,
                    output_attentions=output_attentions,
                    use_cache=use_cache,
                )

            # layer_outputs[0] shape: (batch_size, sequence_length, hidden_size)
            hidden_states = layer_outputs[0]

            if use_cache:
                # next_decoder_cache shape (para cada camada):
                # (2, batch_size, num_heads, sequence_length, head_dim)
                next_decoder_cache += (layer_outputs[2 if output_attentions else 1],)

            if output_attentions:
                # all_self_attentions shape (para cada camada):
                # (batch_size, num_heads, sequence_length, sequence_length)
                all_self_attentions += (layer_outputs[1],)

        # Normalização final
        # hidden_states shape: (batch_size, sequence_length, hidden_size)
        hidden_states = self.norm(hidden_states)

        # Adicionar últimos estados ocultos
        if output_hidden_states:
            all_hidden_states += (hidden_states,)

        next_cache = next_decoder_cache if use_cache else None
        if not return_dict:
            return tuple(v for v in [hidden_states, next_cache, all_hidden_states, all_self_attentions] if v is not None)

        return BaseModelOutputWithPast(
            last_hidden_state=hidden_states,  # (batch_size, sequence_length, hidden_size)
            past_key_values=next_cache,       # Lista de tensores, cada um com shape:
                                              # (2, batch_size, num_heads, sequence_length, head_dim)
            hidden_states=all_hidden_states,  # Tupla de tensores, cada um com shape:
                                              # (batch_size, sequence_length, hidden_size)
            attentions=all_self_attentions,   # Tupla de tensores, cada um com shape:
                                              # (batch_size, num_heads, sequence_length, sequence_length)
        )

### LLaMA Implmentations

#### Causal Language Modeling

In [12]:
import torch
import torch.nn as nn
from torch.nn import CrossEntropyLoss
from typing import Optional, Tuple, Union, List
from transformers.modeling_outputs import CausalLMOutputWithPast

class LlamaForCausalLM(LlamaPreTrainedModel):
    """
    Modelo LLaMA para Modelagem de Linguagem Causal.

    Esta classe implementa o modelo LLaMA específico para tarefas de geração de texto,
    adicionando uma camada de saída linear (lm_head) ao modelo base LLaMA.

    Atributos:
        model (LlamaModel): O modelo base LLaMA.
        lm_head (nn.Linear): Camada linear para projetar estados ocultos no espaço do vocabulário.
        vocab_size (int): Tamanho do vocabulário do modelo.

    Args:
        config (LlamaConfig): Configuração do modelo LLaMA.

    Exemplo:
        >>> from transformers import LlamaConfig, LlamaTokenizer
        >>> config = LlamaConfig()
        >>> model = LlamaForCausalLM(config)
        >>> tokenizer = LlamaTokenizer.from_pretrained("meta-llama/Llama-2-7b-hf")
        >>> inputs = tokenizer("Olá, como vai?", return_tensors="pt")
        >>> outputs = model(**inputs)
        >>> logits = outputs.logits
    """

    _tied_weights_keys = ["lm_head.weight"]

    def __init__(self, config):
        super().__init__(config)
        self.model = LlamaModel(config)
        self.vocab_size = config.vocab_size
        self.lm_head = nn.Linear(config.hidden_size, config.vocab_size, bias=False)

        # Inicializa pesos e aplica processamento final
        self.post_init()

    def get_input_embeddings(self):
        return self.model.embed_tokens

    def set_input_embeddings(self, value):
        self.model.embed_tokens = value

    def get_output_embeddings(self):
        return self.lm_head

    def set_output_embeddings(self, new_embeddings):
        self.lm_head = new_embeddings

    def set_decoder(self, decoder):
        self.model = decoder

    def get_decoder(self):
        return self.model

    def forward(
        self,
        input_ids: torch.LongTensor = None,
        attention_mask: Optional[torch.Tensor] = None,
        position_ids: Optional[torch.LongTensor] = None,
        past_key_values: Optional[List[torch.FloatTensor]] = None,
        inputs_embeds: Optional[torch.FloatTensor] = None,
        labels: Optional[torch.LongTensor] = None,
        use_cache: Optional[bool] = None,
        output_attentions: Optional[bool] = None,
        output_hidden_states: Optional[bool] = None,
        return_dict: Optional[bool] = None,
    ) -> Union[Tuple, CausalLMOutputWithPast]:
        """
        Realiza a passagem forward do modelo.

        Args:
            input_ids (torch.LongTensor): IDs dos tokens de entrada.
            attention_mask (torch.Tensor, opcional): Máscara de atenção.
            position_ids (torch.LongTensor, opcional): IDs das posições.
            past_key_values (List[torch.FloatTensor], opcional): Valores passados para uso em geração incremental.
            inputs_embeds (torch.FloatTensor, opcional): Embeddings de entrada pré-computados.
            labels (torch.LongTensor, opcional): Rótulos para cálculo de perda.
            use_cache (bool, opcional): Se deve usar cache para geração incremental.
            output_attentions (bool, opcional): Se deve retornar todas as atenções.
            output_hidden_states (bool, opcional): Se deve retornar todos os estados ocultos.
            return_dict (bool, opcional): Se deve retornar um dicionário ou tupla.

        Returns:
            Union[Tuple, CausalLMOutputWithPast]: Saída do modelo, incluindo logits, loss (se labels fornecidos),
                                                  past_key_values (se use_cache=True), e opcionalmente
                                                  hidden_states e attentions.
        """
        output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions
        output_hidden_states = (
            output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states
        )
        return_dict = return_dict if return_dict is not None else self.config.use_return_dict

        # decoder outputs consists of (dec_features, layer_state, dec_hidden, dec_attn)
        outputs = self.model(
            input_ids=input_ids,
            attention_mask=attention_mask,
            position_ids=position_ids,
            past_key_values=past_key_values,
            inputs_embeds=inputs_embeds,
            use_cache=use_cache,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )

        hidden_states = outputs[0]
        logits = self.lm_head(hidden_states)

        loss = None
        if labels is not None:
            # Shift so that tokens < n predict n
            shift_logits = logits[..., :-1, :].contiguous()
            shift_labels = labels[..., 1:].contiguous()
            # Flatten the tokens
            loss_fct = CrossEntropyLoss()
            shift_logits = shift_logits.view(-1, self.config.vocab_size)
            shift_labels = shift_labels.view(-1)
            # Enable model parallelism
            shift_labels = shift_labels.to(shift_logits.device)
            loss = loss_fct(shift_logits, shift_labels)

        if not return_dict:
            output = (logits,) + outputs[1:]
            return ((loss,) + output) if loss is not None else output

        return CausalLMOutputWithPast(
            loss=loss,
            logits=logits,
            past_key_values=outputs.past_key_values,
            hidden_states=outputs.hidden_states,
            attentions=outputs.attentions,
        )

    def prepare_inputs_for_generation(
        self, input_ids, past_key_values=None, attention_mask=None, inputs_embeds=None, **kwargs
    ):
        if past_key_values:
            input_ids = input_ids[:, -1:]

        # if `inputs_embeds` are passed, we only want to use them in the 1st generation step
        if inputs_embeds is not None and past_key_values is None:
            model_inputs = {"inputs_embeds": inputs_embeds}
        else:
            model_inputs = {"input_ids": input_ids}

        model_inputs.update(
            {
                "past_key_values": past_key_values,
                "use_cache": kwargs.get("use_cache"),
                "attention_mask": attention_mask,
                "position_ids": kwargs.get("position_ids"),
            }
        )
        return model_inputs

    @staticmethod
    def _reorder_cache(past_key_values, beam_idx):
        reordered_past = ()
        for layer_past in past_key_values:
            reordered_past += (
                tuple(past_state.index_select(0, beam_idx.to(past_state.device)) for past_state in layer_past),
            )
        return reordered_past

## Supervised Fine-Tunning Trainer

In [13]:
from transformers import Trainer, TrainingArguments
from transformers import LlamaTokenizer
from typing import Dict, List, Optional, Union
import torch
from torch.utils.data import Dataset
from transformers.trainer_utils import EvalPrediction

class LlamaSFTTrainer(Trainer):
    """
    LlamaSFTTrainer é uma classe personalizada para treinar modelos LlamaForCausalLM.
    Herda da classe Trainer do Hugging Face e implementa funcionalidades específicas
    para o treinamento de modelos LLaMA.
    """
    def __init__(
        self,
        model: LlamaForCausalLM,
        args: TrainingArguments,
        train_dataset: Optional[Dataset] = None,
        eval_dataset: Optional[Union[Dataset, Dict[str, Dataset]]] = None,
        tokenizer: Optional[LlamaTokenizer] = None,
        **kwargs
    ):
        super().__init__(model, args, train_dataset, eval_dataset, tokenizer, **kwargs)

    def compute_loss(self, model, inputs, return_outputs=False):
        """
        Calcula a perda para o modelo LLaMA.
        """
        if "labels" in inputs:
            labels = inputs.pop("labels")
        else:
            labels = None

        # Passa os inputs pelo modelo
        # inputs: Dict[str, torch.Tensor] onde cada tensor tem shape [batch_size, seq_length]
        # outputs: Objeto contendo loss e logits, ambos com shape [batch_size, seq_length, vocab_size]
        outputs = model(**inputs)

        if labels is not None and self.label_smoother:
            loss = self.label_smoother(outputs, labels)
        else:
            loss = outputs.loss

        return (loss, outputs) if return_outputs else loss

    def prediction_step(
        self,
        model: LlamaForCausalLM,
        inputs: Dict[str, Union[torch.Tensor, Any]],
        prediction_loss_only: bool,
        ignore_keys: Optional[List[str]] = None,
    ) -> Tuple[Optional[torch.Tensor], Optional[torch.Tensor], Optional[torch.Tensor]]:
        """
        Realiza um passo de predição/avaliação no modelo.
        """
        # Se não estiver gerando predições ou se estiver calculando apenas a perda, usa o método da classe pai
        if not self.args.predict_with_generate or prediction_loss_only:
            return super().prediction_step(
                model,
                inputs,
                prediction_loss_only=prediction_loss_only,
                ignore_keys=ignore_keys
            )

        # Prepara os inputs para o modelo
        has_labels = "labels" in inputs
        inputs = self._prepare_inputs(inputs)

        # Configura os parâmetros para geração de texto
        gen_kwargs = self._gen_kwargs.copy()
        if gen_kwargs.get("max_length") is None and gen_kwargs.get("max_new_tokens") is None:
            gen_kwargs["max_length"] = self.model.config.max_length

        gen_kwargs["num_beams"] = gen_kwargs.get("num_beams", self.model.config.num_beams)

        # Gera tokens usando o modelo
        # input_ids: [batch_size, seq_length]
        # attention_mask: [batch_size, seq_length]
        # generated_tokens: [batch_size, max_length]
        generated_tokens = self.model.generate(
            inputs["input_ids"],
            attention_mask=inputs["attention_mask"],
            **gen_kwargs,
        )

        # Faz o padding dos tokens gerados se necessário
        if generated_tokens.shape[-1] < gen_kwargs["max_length"]:
            generated_tokens = self._pad_tensors_to_max_len(generated_tokens, gen_kwargs["max_length"])
            # generated_tokens após padding: [batch_size, max_length]

        # Calcula a perda se houver labels
        with torch.no_grad():
            if has_labels:
                with self.autocast_smart_context_manager():
                    # outputs: Objeto contendo loss e logits, ambos com shape [batch_size, seq_length, vocab_size]
                    outputs = model(**inputs)
                if self.label_smoother is not None:
                    # loss: escalar tensor []
                    loss = self.label_smoother(outputs, inputs["labels"]).mean().detach()
                else:
                    loss = (outputs["loss"] if isinstance(outputs, dict) else outputs[0]).mean().detach()
            else:
                loss = None

        if self.args.prediction_loss_only:
            return (loss, None, None)


        # Prepara as labels para retorno
        labels = inputs["labels"]
        if labels.shape[-1] < gen_kwargs["max_length"]:
            labels = self._pad_tensors_to_max_len(labels, gen_kwargs["max_length"])
            # labels após padding: [batch_size, max_length]

        # Retorna a perda, tokens gerados e labels
        return (loss, generated_tokens, labels)

    def _pad_tensors_to_max_len(self, tensor, max_length):
        """
        Faz o padding de um tensor para um comprimento máximo especificado.
        """
        # Verifica se o tokenizer está disponível
        if self.tokenizer is None:
            raise ValueError("Tokenizer é necessário para fazer o padding dos tensores até o comprimento máximo.")

        # Se o tensor já tem o comprimento máximo ou maior, retorna sem modificação
        if tensor.shape[-1] >= max_length:
            return tensor

        # Faz o padding do tensor
        # tensor: [batch_size, seq_length]
        padded_tensor = self.tokenizer.pad(
            {"input_ids": tensor},
            padding="max_length",
            max_length=max_length,
            return_tensors="pt",
        )["input_ids"]
        # padded_tensor: [batch_size, max_length]

        return padded_tensor


## Code

In [14]:
#from huggingface_hub import notebook_login
#notebook_login()

In [15]:
# Carregar modelo e tokenizador
model = LlamaForCausalLM.from_pretrained("meta-llama/Llama-2-7b-hf")
tokenizer = LlamaTokenizer.from_pretrained("meta-llama/Llama-2-7b-hf")

# Preparar entrada
input_text = "Olá, como você está?"
inputs = tokenizer(input_text, return_tensors="pt")

# Gerar texto
outputs = model.generate(**inputs, max_length=50)

# Decodificar saída
generated_text = tokenizer.decode(outputs[0], skip_special_tokens=True)
print(generated_text)



Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

KeyError: 'Cache only has 0 layers, attempted to access layer with index 0'