<a href="https://colab.research.google.com/github/diegomrodrigues/my_gpt/blob/main/GT2%20Starter%20Implementation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
import torch
import math
from torch import nn

def gelu(x):
    return 0.5 * x * (1 + torch.tanh(math.sqrt(2 / math.pi) * (x + 0.044715 * torch.pow(x, 3))))

In [4]:
class LayerNorm(nn.Module):
    def __init__(self, hidden_size, eps=1e-12):
        super(LayerNorm, self).__init__()

        # Parâmetros aprendíveis para escala (gamma) e deslocamento (beta)
        # Inicializados com 1s e 0s respectivamente
        self.weight = nn.Parameter(torch.ones(hidden_size))
        self.bias = nn.Parameter(torch.zeros(hidden_size))

        # Epsilon para estabilidade numérica
        self.epsilon = eps

    def forward(self, x):  # x: [batch_size, seq_length, hidden_size]
        # Calcula a média ao longo da última dimensão (feature dimension)
        # keepdim=True mantém a dimensionalidade para broadcast correto
        mu = x.mean(-1, keepdim=True)  # [batch_size, seq_length, 1]

        # Calcula a variância
        # Usa a fórmula E[(X - μ)^2] para variância
        sigma = (x - mu).pow(2).mean(-1, keepdim=True)  # [batch_size, seq_length, 1]

        # Normalização: (x - μ) / sqrt(σ^2 + ε)
        # Epsilon (ε) evita divisão por zero
        x = (x - mu) / torch.sqrt(sigma + self.epsilon)  # [batch_size, seq_length, hidden_size]

        # Aplica transformação afim com parâmetros aprendíveis
        # y = γ * x + β, onde γ = self.weight e β = self.bias
        return self.weight * x + self.bias  # [batch_size, seq_length, hidden_size]


In [5]:
class Conv1D(nn.Module):
    def __init__(self, nf, nx):
        # nf: número de filtros (saída)
        # nx: tamanho da entrada
        super(Conv1D, self).__init__()

        self.nf = nf

        # Inicializa os pesos com uma distribuição normal
        # [nx, nf]
        w = torch.empty(nx, nf)
        nn.init.normal_(w, std=0.02)

        # Cria parâmetros treináveis para pesos e vieses
        self.weight = nn.Parameter(w)  # [nx, nf]
        self.bias = nn.Parameter(torch.zeros(nf))  # [nf]

    def forward(self, x): # x: [batch_size, input_len, nx]

        # Prepara o shape de saída
        size_out = x.size()[:-1] + (self.nf,) # [batch_size, input_len, nf]

        # Reshape x para 2D
        x_2d = x.view(-1, x.size(-1)) # [batch_size * input_len, nx]

        # Aplica a transformação linear
        # torch.addmm realiza: out = beta * self.bias + alpha * (x_2d @ self.weight)
        x_transformed = torch.addmm(self.bias, x_2d, self.weight) # [batch_size * input_len, nf]

        # Reshape de volta para 3D
        x_output = x_transformed.view(*size_out) # [batch_size, input_len, nf]

        return x_output

In [6]:
class SingleHeadAttention(nn.Module):
    def __init__(self, nx, n_ctx, config, scale=False):
        super(SingleHeadAttention, self).__init__()

        # nx: dimensão do modelo (tamanho dos embeddings)
        self.nx = nx
        # n_ctx: comprimento máximo do contexto (número máximo de tokens na sequência)
        self.n_ctx = n_ctx

        # Performar scaled dot product?
        self.scale = scale

        # Criamos uma máscara de atenção triangular inferior (causal)
        # Isso garante que cada token só preste atenção aos tokens anteriores
        # Shape: [1, 1, n_ctx, n_ctx]
        self.register_buffer("bias", torch.tril(torch.ones(n_ctx, n_ctx)).view(1, 1, n_ctx, n_ctx))

        # Camada linear para projetar a entrada em Query, Key, Value
        # Multiplica por 3 porque criamos Q, K, V de uma vez
        self.c_attn = Conv1D(nx * 3, nx)
        # Camada linear para projetar a saída da atenção de volta ao espaço do modelo
        self.c_proj = Conv1D(nx, nx)

    def _attention(self, q, k, v):
        # Calculamos os scores de atenção: Query * Key^T
        # Isso mede quanto cada token (Query) deve prestar atenção a cada outro token (Key)
        w = torch.matmul(q, k)  # [batch_size, 1, n_ctx, n_ctx]

        # Aplicamos escala opcional para estabilizar o treinamento
        # Dividimos pelo sqrt da dimensão para evitar que os gradientes fiquem muito grandes
        if self.scale:
            w = w / math.sqrt(v.size(-1))

        # Preparamos a máscara causal
        # Isso garante que não olhamos para tokens futuros
        nd, ns = w.size(-2), w.size(-1)
        mask = self.bias[:,:,ns-nd:ns,:ns]

        # Aplicamos a máscara causal
        # Colocamos -infinito onde a máscara é 0, efetivamente zerando esses scores no softmax
        w = w * mask - 1e10 * (1 - mask)

        # Aplicamos softmax para obter os pesos de atenção
        # Isso normaliza os scores para que somem 1 para cada query
        w = nn.Softmax(dim=-1)(w)

        # Calculamos a saída da atenção: pesos de atenção * Values
        # Isso agrega as informações dos tokens relevantes
        output = torch.matmul(w, v)  # [batch_size, 1, n_ctx, nx]

        return output

    def forward(self, x, layer_past=None):
        # x: entrada [batch_size, n_ctx, nx]

        # Projetamos a entrada para Query, Key, Value de uma vez
        qkv = self.c_attn(x)  # [batch_size, n_ctx, nx*3]

        # Separamos Q, K, V
        query, key, value = qkv.split(self.nx, dim=2)  # cada um: [batch_size, n_ctx, nx]

        # Reshape para adicionar dimensão de cabeça (neste caso, apenas 1)
        # Isso prepara os tensores para a operação de atenção
        query = query.unsqueeze(1)  # [batch_size, 1, n_ctx, nx]
        key = key.unsqueeze(1).transpose(-1, -2)  # [batch_size, 1, nx, n_ctx]
        value = value.unsqueeze(1)  # [batch_size, 1, n_ctx, nx]

        # Lidamos com o cache do estado passado, se fornecido
        # Isso é útil para geração incremental de texto
        if layer_past:
            past_key, past_value = layer_past
            key = torch.cat((past_key, key), dim=-1)
            value = torch.cat((past_value, value), dim=-2)

        # Armazenamos o estado atual para uso futuro
        present = torch.stack((key, value))

        # Calculamos a atenção
        attn_output = self._attention(query, key, value)  # [batch_size, 1, n_ctx, nx]

        # Removemos a dimensão da cabeça (que era 1)
        attn_output = attn_output.squeeze(1)  # [batch_size, n_ctx, nx]
        # Aplicamos a projeção final para voltar ao espaço do modelo
        attn_output = self.c_proj(attn_output)  # [batch_size, n_ctx, nx]

        return attn_output, present

In [7]:
class MultiHeadAttention(nn.Module):
    def __init__(self, nx, n_ctx, n_head, scale=False):
        super(MultiHeadAttention, self).__init__()

        # Cria uma máscara de atenção triangular inferior (causal)
        # Isso garante que cada token só preste atenção aos tokens anteriores
        # Shape: [1, 1, n_ctx, n_ctx]
        self.register_buffer("bias", torch.tril(torch.ones(n_ctx, n_ctx)).view(1, 1, n_ctx, n_ctx))

        self.n_head = n_head  # Número de cabeças de atenção
        self.split_size = nx  # Tamanho da dimensão do modelo
        self.scale = scale    # Flag para aplicar escala nos scores de atenção

        # Camada linear para projetar a entrada em Query, Key, Value para todas as cabeças
        self.c_attn = Conv1D(nx * 3, nx)
        # Camada linear para projetar a saída da atenção de volta ao espaço do modelo
        self.c_proj = Conv1D(nx, nx)

    def _attention(self, q, k, v):
        # Calcula os scores de atenção: Query * Key^T
        # Shape: [batch_size, n_head, seq_len, seq_len]
        w = torch.matmul(q, k)

        # Aplica escala opcional para estabilizar o treinamento
        if self.scale:
            w = w / math.sqrt(v.size(-1))

        # Prepara e aplica a máscara causal
        nd, ns = w.size(-2), w.size(-1)
        mask = self.bias[:, :, ns-nd:ns, :ns]
        w = w * mask - 1e10 * (1 - mask)  # Aplica -inf onde a máscara é 0

        # Aplica softmax para obter os pesos de atenção
        w = nn.Softmax(dim=-1)(w)

        # Calcula a saída da atenção: pesos de atenção * Values
        output = torch.matmul(w, v)
        return output

    def _merge_heads(self, x):
        # Reorganiza o tensor de [batch, head, seq, features] para [batch, seq, head*features]
        x = x.permute(0, 2, 1, 3).contiguous()
        new_x_shape = x.size()[:-2] + (x.size(-2) * x.size(-1),)
        return x.view(*new_x_shape)

    def _split_heads(self, x, k=False):
        # Divide o último dimensão em [n_head, features/n_head]
        new_x_shape = x.size()[:-1] + (self.n_head, x.size(-1) // self.n_head)
        x = x.view(*new_x_shape)

        # Reorganiza o tensor para [batch, head, seq, features/n_head]
        if k:
            # Para as keys, colocamos a dim seq por último para otimizar a multiplicação de matrizes
            return x.permute(0, 2, 3, 1)
        else:
            return x.permute(0, 2, 1, 3)

    def forward(self, x, layer_past=None):
        # x: entrada [batch_size, seq_len, nx]

        # Projeta a entrada para Q, K, V de uma vez
        qkv = self.c_attn(x)  # [batch_size, seq_len, nx*3]

        # Separa Q, K, V
        query, key, value = qkv.split(self.split_size, dim=2)
        # query, key, value: cada um [batch_size, seq_len, nx]

        # Divide as cabeças e reorganiza
        query = self._split_heads(query)  # [batch_size, n_head, seq_len, nx/n_head]
        key = self._split_heads(key, k=True)  # [batch_size, n_head, nx/n_head, seq_len]
        value = self._split_heads(value)  # [batch_size, n_head, seq_len, nx/n_head]

        # Lida com o cache do estado passado, se fornecido
        if layer_past:
            past_key, past_value = layer_past[0].transpose(-2, -1), layer_past[1]
            key = torch.cat((past_key, key), dim=-1)  # [batch_size, n_head, nx/n_head, seq_len_extended]
            value = torch.cat((past_value, value), dim=-2)  # [batch_size, n_head, seq_len_extended, nx/n_head]

        # Armazena o estado atual para uso futuro
        present = torch.stack((key.transpose(-2, -1), value))
        # present: [2, batch_size, n_head, seq_len, nx/n_head]

        # Calcula a atenção para todas as cabeças
        attn_output = self._attention(query, key, value)
        # [batch_size, n_head, seq_len, nx/n_head]

        # Combina as cabeças novamente
        attn_output = self._merge_heads(attn_output)  # [batch_size, seq_len, nx]

        # Projeta de volta para o espaço do modelo
        attn_output = self.c_proj(attn_output)  # [batch_size, seq_len, nx]

        return attn_output, present

In [9]:
class MLP(nn.Module):
    def __init__(self, n_state, n_embed):
        super(MLP, self).__init__()

        # n_state: geralmente 4 * n_embed, seguindo a arquitetura original do Transformer
        # n_embed: dimensão do modelo (embedding dimension)

        # Primeira camada linear: expande a dimensão
        self.c_fc = Conv1D(n_state, n_embed)

        # Segunda camada linear: projeta de volta para a dimensão original
        self.c_proj = Conv1D(n_embed, n_state)

        # Função de ativação GELU (Gaussian Error Linear Unit)
        self.activation = gelu

    def forward(self, x):
        # x: entrada [batch_size, seq_len, n_embed]

        # Aplica a primeira transformação linear e a função de ativação
        h = self.c_fc(x)  # [batch_size, seq_len, n_state]
        h = self.activation(h)  # [batch_size, seq_len, n_state]

        # Aplica a segunda transformação linear
        h2 = self.c_proj(h)  # [batch_size, seq_len, n_embed]

        return h2  # [batch_size, seq_len, n_embed]

In [10]:
class TransformerBlock(nn.Module):
    def __init__(self, n_ctx, config, scale=False):
        super(TransformerBlock, self).__init__()

        nx = config.n_embed  # Dimensão do modelo

        # Primeira camada de normalização, aplicada antes da atenção
        self.ln_1 = LayerNorm(nx, eps=config.layer_norm_epsilon)

        # Camada de atenção (neste caso, atenção de cabeça única)
        self.attention = SingleHeadAttention(nx, n_ctx, config, scale)

        # Segunda camada de normalização, aplicada antes da MLP
        self.ln_2 = LayerNorm(nx, eps=config.layer_norm_epsilon)

        # MLP (Feed-Forward Network)
        self.mlp = MLP(4 * nx, nx)

    def forward(self, x, layer_past=None):
        # x: entrada [batch_size, seq_len, nx]

        # Primeira normalização de camada
        normalized_x = self.ln_1(x)  # [batch_size, seq_len, nx]

        # Camada de atenção
        attention_output, present = self.attention(normalized_x, layer_past=layer_past)
        # attention_output: [batch_size, seq_len, nx]
        # present: estado cacheado para geração incremental

        # Conexão residual após a atenção
        x = x + attention_output  # [batch_size, seq_len, nx]

        # Segunda normalização de camada
        normalized_x = self.ln_2(x)  # [batch_size, seq_len, nx]

        # MLP
        mlp_output = self.mlp(normalized_x)  # [batch_size, seq_len, nx]

        # Conexão residual após a MLP
        x = x + mlp_output  # [batch_size, seq_len, nx]

        return x, present

In [11]:
import copy
import torch
import torch.nn as nn

class Transformer(nn.Module):
    def __init__(self, config):
        super().__init__()

        self.n_layer = config.n_layer  # Número de camadas do Transformer
        self.n_embed = config.n_embed  # Dimensão do embedding
        self.n_vocab = config.n_vocab  # Tamanho do vocabulário
        self.n_pos   = config.n_pos    # Número máximo de posições

        # Embedding de tokens
        self.wte = nn.Embedding(self.n_vocab, self.n_embed)
        # Embedding de posições
        self.wpe = nn.Embedding(self.n_pos, self.n_embed)

        # Cria um bloco do Transformer
        block = TransformerBlock(config.n_ctx, config, scale=True)

        # Cria uma lista de blocos do Transformer
        self.h = nn.ModuleList([copy.deepcopy(block) for _ in range(self.n_layer)])

        # Camada final de normalização
        self.ln_f = LayerNorm(self.n_embed, eps=config.layer_norm_epsilon)

    def forward(self, input_ids, position_ids=None, token_type_ids=None, past=None):
        # input_ids: [batch_size, seq_len]

        if past is None:
            past_length = 0
            past = [None] * len(self.h)
        else:
            past_length = past[0][0].size(-2)

        if position_ids is None:
            # Gera ids de posição se não fornecidos
            position_ids = torch.arange(past_length, input_ids.size(-1) + past_length, dtype=torch.long, device=input_ids.device)
            position_ids = position_ids.unsqueeze(0).expand_as(input_ids)  # [batch_size, seq_len]

        input_shape = input_ids.size()
        input_ids = input_ids.view(-1, input_ids.size(-1))  # [batch_size * seq_len]
        position_ids = position_ids.view(-1, position_ids.size(-1))  # [batch_size * seq_len]

        # Aplica embeddings de tokens e posições
        input_embeds = self.wte(input_ids)  # [batch_size * seq_len, n_embed]
        position_embeds = self.wpe(position_ids)  # [batch_size * seq_len, n_embed]

        if token_type_ids is not None:
            token_type_ids = token_type_ids.view(-1, token_type_ids.size(-1))
            token_type_embeds = self.wte(token_type_ids)  # [batch_size * seq_len, n_embed]
        else:
            token_type_embeds = 0

        # Soma todos os embeddings
        hidden_states = input_embeds + position_embeds + token_type_embeds  # [batch_size * seq_len, n_embed]

        presents = []
        for block, layer_past in zip(self.h, past):
            hidden_states, present = block(hidden_states, layer_past)
            # hidden_states: [batch_size * seq_len, n_embed]
            presents.append(present)

        # Aplica a normalização final
        hidden_states = self.ln_f(hidden_states)  # [batch_size * seq_len, n_embed]

        # Reshape para a forma original
        output_shape = input_shape + (hidden_states.size(-1),)
        hidden_states = hidden_states.view(*output_shape)  # [batch_size, seq_len, n_embed]

        return hidden_states, presents