# 124M的简单GPT模型

## Step1 导包

In [12]:
import torch
from torch import nn 
from torch.nn import functional as F


## Step2 初始参数

In [13]:
GPT_CONFIG_124M = {
    "vocab_size": 50257,    # Vocabulary size
    "context_length": 1024, # Context length
    "emb_dim": 768,         # Embedding dimension
    "n_heads": 12,          # Number of attention heads
    "n_layers": 12,         # Number of layers
    "dropout": 0.1,       # Dropout rate
    "qkv_bias": False       # Query-Key-Value bias
}

## Step3 分块构建模型

### 1.构建embedding层

In [14]:
class EmbeddingLayer(nn.Module):
    def __init__(self,vocab_size,max_length,embedding_dim):
        super().__init__()
        self.max_length = max_length
        self.embedding = nn.Embedding(vocab_size , embedding_dim)
        self.position_embedding = nn.Embedding(max_length,embedding_dim)
    def forward(self,input):
        input_embeddings = self.embedding(input)
        seq_len = input_embeddings.size(-2)
        positions = torch.arange(seq_len, device=input_embeddings.device)
        position_embeddings = self.position_embedding(positions)
        output = input_embeddings + position_embeddings
        return output

### 2.构建注意力机制

In [15]:
class CausalAttention(nn.Module):
    def __init__(self,embedding_dim,context_length,dropout , qkv_bias):
        super().__init__()
        self.embedding_dim = embedding_dim
        self.W_q = nn.Linear(embedding_dim , embedding_dim , bias = qkv_bias)
        self.W_k = nn.Linear(embedding_dim , embedding_dim , bias = qkv_bias)
        self.W_v = nn.Linear(embedding_dim , embedding_dim , bias = qkv_bias)
        self.dropout = nn.Dropout(dropout)
        #因果权重
        self.register_buffer('mask', torch.triu(torch.ones(context_length, context_length, dtype=torch.bool), diagonal=1))
    def forward(self, x):
        b, num_tokens, d_in = x.shape # New batch dimension b
        #提取batch的大小、token的数量、跟宽度
        keys = self.W_k(x)
        queries = self.W_q(x)
        values = self.W_v(x)
        #进行运算计算
        attn_scores = queries @ keys.transpose(1, 2) # Changed transpose
        #通过点积来计算attention的数值
        attn_scores.masked_fill_(  # New, _ ops are in-place
            self.mask[:num_tokens, :num_tokens], -torch.inf)  # `:num_tokens` to account for cases where the number of tokens in the batch is smaller than the supported context_size
        attn_weights = torch.softmax(
            attn_scores / keys.shape[-1]**0.5, dim=-1## 缩放因子 √d，用于稳定梯度
        )
        #在时间顺序上进行mask确保信息不会被泄露
        attn_weights = self.dropout(attn_weights) # New
        #防止过拟合的dropout处理方式
        context_vec = attn_weights @ values
        # 根据注意力权重计算上下文向量
        return context_vec


### 3.构建多头注意力

In [16]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_in, d_out, context_length, dropout, num_heads, qkv_bias=False):
        super().__init__()
        assert d_out % num_heads == 0, "d_out must be divisible by num_heads"
        #确保是可以被整除的
            

        self.d_out = d_out
        self.num_heads = num_heads
        self.head_dim = d_out // num_heads # Reduce the projection dim to match desired output dim
        #初始化头的维度、数量
        self.W_query = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.W_key = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.W_value = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.out_proj = nn.Linear(d_out, d_out)  # Linear layer to combine head outputs
        #头的输出结合线性层
        self.dropout = nn.Dropout(dropout)
        #进行dropout防止过拟合
        self.register_buffer(
            "mask",
            torch.triu(torch.ones(context_length, context_length, dtype=torch.bool),
                       diagonal=1)
        )
        # 上三角掩码，确保因果性

    def forward(self, x):
        b, num_tokens, d_in = x.shape

        keys = self.W_key(x) # Shape: (b, num_tokens, d_out)
        queries = self.W_query(x)
        values = self.W_value(x)
        #把输出的维度拆成头*头大小
        # We implicitly split the matrix by adding a `num_heads` dimension
        # Unroll last dim: (b, num_tokens, d_out) -> (b, num_tokens, num_heads, head_dim)
        keys = keys.view(b, num_tokens, self.num_heads, self.head_dim) 
        values = values.view(b, num_tokens, self.num_heads, self.head_dim)
        queries = queries.view(b, num_tokens, self.num_heads, self.head_dim)
        #转制维度,听说是为了更好的计算注意力
        # Transpose: (b, num_tokens, num_heads, head_dim) -> (b, num_heads, num_tokens, head_dim)
        keys = keys.transpose(1, 2)
        queries = queries.transpose(1, 2)
        values = values.transpose(1, 2)
        # 计算缩放点积注意力
        # Compute scaled dot-product attention (aka self-attention) with a causal mask
        attn_scores = queries @ keys.transpose(2, 3)  # Dot product for each head
        # 将掩码缩减到当前 token 数量，并转换为布尔型
        # 进而实现动态遮蔽,所以不用另开好几个数组
        mask_bool = self.mask.bool()[:num_tokens, :num_tokens]
        # 遮蔽矩阵
        # Use the mask to fill attention scores
        attn_scores.masked_fill_(mask_bool, -torch.inf)
        #归一化
        attn_weights = torch.softmax(attn_scores / keys.shape[-1]**0.5, dim=-1)
        attn_weights = self.dropout(attn_weights)

        # Shape: (b, num_tokens, num_heads, head_dim)
        context_vec = (attn_weights @ values).transpose(1, 2) 
        #头的合并
        # Combine heads, where self.d_out = self.num_heads * self.head_dim
        #对上下文向量的形状进行调整，确保输出的形状
        context_vec = context_vec.contiguous().view(b, num_tokens, self.d_out)
        context_vec = self.out_proj(context_vec) # optional projection

        return context_vec

### 4.构建基于多头注意力的Transformer层

In [17]:
class GPTTransformerLayer(nn.Module):
    def __init__(self,d_in, d_out, context_length, dropout, num_heads, qkv_bias=False):
        super().__init__()
        self.attention_layer = MultiHeadAttention(
            d_in,
            d_out,
            context_length,
            dropout,
            num_heads,
            qkv_bias
        )
        self.fc = nn.Sequential(
            nn.Linear(d_out , 4*d_out),
            nn.GELU(),
            nn.Linear(4*d_out , d_out)
        )
        self.ln1 = nn.LayerNorm(d_out)
        self.ln2 = nn.LayerNorm(d_out)
        self.dropout = nn.Dropout(dropout)
    def forward(self , x):
        # 两次残差
        y_1 = self.dropout(self.attention_layer(self.ln1(x)))
        y_1 += x

        x = y_1
        y_2 = self.dropout(self.fc(self.ln2(x)))
        y_2 += x
        return y_2


## Step4 完成GPT模型

In [18]:
class GPTModel(nn.Module):
    def __init__(self,cfg):
        super().__init__()
        self.embedding_layer = EmbeddingLayer(cfg["vocab_size"] , cfg["context_length"] , cfg["emb_dim"])
        self.transformers_blks = nn.Sequential(
            *[GPTTransformerLayer(
                d_in = cfg["emb_dim"],
                d_out = cfg["emb_dim"],
                context_length = cfg["context_length"],
                dropout = cfg["dropout"],
                num_heads = cfg["n_heads"],
                qkv_bias = cfg["qkv_bias"]
            ) for _ in range(cfg["n_layers"])]
        )
        self.embed_drop = nn.Dropout(cfg["dropout"])
        self.ln = nn.LayerNorm(cfg["emb_dim"])
        self.output_layer = nn.Linear(cfg["emb_dim"] , cfg["vocab_size"],bias = False)
    def forward(self,x):
        embedding_output = self.embedding_layer(x)
        embedding_output = self.embed_drop(embedding_output)
        transformers_output = self.transformers_blks(embedding_output)
        output = self.ln(transformers_output)
        output = self.output_layer(output)
        return output

## Step5 测试模型参数量及模型输出

In [19]:
torch.manual_seed(123)
model = GPTModel(GPT_CONFIG_124M)
total_params = sum(p.numel() for p in model.parameters())
#模型的总参数数量
print(f"Total number of parameters: {total_params:,}")
total_params_gpt2 =  total_params - sum(p.numel() for p in model.output_layer.parameters())
print(f"Number of trainable parameters considering weight tying: {total_params_gpt2:,}")
#Parameter- sharing

Total number of parameters: 163,009,536
Number of trainable parameters considering weight tying: 124,412,160


In [20]:
def generate_text_simple(model, idx, max_new_tokens, context_size):
    # 预测单词的模块
    # idx 是当前上下文中的（batch, n_tokens）索引数组
    for _ in range(max_new_tokens):
        # 每次生成一个单词后，重新将其加入序列中
        # 如果当前上下文长度超过模型支持的最大上下文长度，则截取
        # 例如，如果LLM只支持5个token，而上下文长度为10
        # 那么只使用最后5个token作为上下文
        idx_cond = idx[:, -context_size:]
        # 如果idx的长度超过模型支持的上下文长度size，只保留最后size个token
        # 避免溢出
        # 获取预测结果
        with torch.no_grad():  # 在推理阶段，不需要计算梯度，因为没有反向传播
            # 这样可以减少存储开销
            logits = model(idx_cond)
            # 模型输出结果
        # 只关注最后一个时间步的输出
        # (batch, n_tokens, vocab_size) 变为 (batch, vocab_size)
        logits = logits[:, -1, :]
        # 关注最后一个时间步
        # 使用softmax函数计算概率
        probas = torch.softmax(logits, dim=-1)  # (batch, vocab_size)
        # 归一化
        # 获取具有最高概率值的词汇索引
        idx_next = torch.argmax(probas, dim=-1, keepdim=True)  # (batch, 1)
        # 获取概率最高的词汇索引
        # 将采样的索引添加到序列中
        idx = torch.cat((idx, idx_next), dim=1)  # (batch, n_tokens+1)

    return idx

In [24]:
import tiktoken
tokenizer = tiktoken.get_encoding("gpt2")#初始化GPT2!
start_context = "我是，无敌jjy"
#模拟
encoded = tokenizer.encode(start_context)
print("encoded:", encoded)
#进行语义理解
encoded_tensor = torch.tensor(encoded).unsqueeze(0)
print("encoded_tensor.shape:", encoded_tensor.shape)

encoded: [22755, 239, 42468, 171, 120, 234, 33768, 254, 46763, 234, 41098, 88]
encoded_tensor.shape: torch.Size([1, 12])


In [25]:
model.eval() # disable dropout
#在检验的时候不需要正则化了
out = generate_text_simple(
    model=model,
    #左边的参数名字,右边是函数传入的实际模型
    idx=encoded_tensor, #上下文的索引
    max_new_tokens=6, #最多运行六次,然后取结果概率最高的
    #初始文本➕6
    context_size=GPT_CONFIG_124M["context_length"]
)

print("Output:", out)
print("Output length:", len(out[0]))

Output: tensor([[22755,   239, 42468,   171,   120,   234, 33768,   254, 46763,   234,
         41098,    88,  8549, 37595, 43749, 47275, 39866, 16217]])
Output length: 18


In [26]:
decoded_text = tokenizer.decode(out.squeeze(0).tolist())
print(decoded_text)

我是，无敌jjy Managementarf CLI revocationPolit immun
