#### GPT2的手写代码实现

In [22]:
import torch
from torch import nn
import torch.nn.functional as F
from torch.utils.data import Dataset
from torch.utils.data import dataloader
from dataclasses import dataclass

# 设置随机数种子，保证随机数相同
torch.manual_seed(1024)

<torch._C.Generator at 0x1d51c523370>

In [12]:
@dataclass
class GPTconfig():
    block_size: int = 512
    batch_size: int = 64
    n_layer : int = 8
    n_head : int = 12 
    n_embd: int = 768 # 这个和hidden_dim一样
    head_size : int = n_head // n_head
    dropout : int = 0.1
    vocab_size : int = 50257

## 模型的结构

In [None]:
import math
class SingleHeadAttention(nn.Module):
    def __init__(self, config)->None:
        super().__init__()
        self.head_size = config.n_head_size
        self.q_proj = nn.Linear(config.n_embd,config.n_head_size)
        self.k_proj = nn.Linear(config.n_embd,config.n_head_size)
        self.v_proj = nn.Linear(config.n_embd,config.n_head_size)

        #注册器创建mask，不会梯度回传，减少计算量
        self.register_buffer(
            'attention_mask',
            torch.tril(config.block_size,config.block_size)
        )
        self.dropout = nn.Dropout(config.dropout)       

    def forward(self,x):
        q = self.q_proj(x)
        k = self.k_proj(x)
        v = self.v_proj(x)
        # (batch,seq,head_dim)

        middle_value = q @ k.transpose(-2,-1)
        weight = middle_value.masked_fill(
            self.attention_mask == 0,
            float('-inf'),
            ) / math.sqrt(self.head_size) 
        weight = self.dropout(torch.softmax(weight,dim=-1)) @ v
        return final


class MultiHeadAttention(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.blocks = nn.ModuleList([
            SingleHeadAttention(config)
            for _ in range(config.n_head)
        ])
        # 做一次空间变换
        self.linear_proj = nn.Linear(config.n_embd,config.n_embd)
        self.dropout = nn.Dropout(config.dropout)
    def forward(self,x):
        # x -> (batch_size,seq,head_size) -> (batch_size,seq,n_head * head_size)
        output = torch.cat(
            [h(x) for h in range(self.blocks)],dim=-1
        )
        output = self.linear_proj(output)
        output = self.dropout(output)

        return output

class FeedForward(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.ffn = nn.Sequential(
            nn.Linear(config.n_embd,4 * config.n_embd),
            nn.GELU(),
            nn.Linear(4 * config.n_embd,config.n_embd),
            nn.Dropout(config.dropout)
        )
    
    def forward(self,x):
        return self.ffn(x)
# 把多头和ffn合并成一个大的block

class Block(nn.Module):
    def __init__(self,config):
        super().__init__()
        self.ffn = FeedForward(config)
        self.layer_norm = nn.LayerNorm(config.n_embd)
        self.muti_head = MultiHeadAttention(config)
    
    def forward(self,x):
        x = x+self.layer_norm(self.muti_head(x))
        x = x+self.layer_norm(self.ffn(x))
        return x
    
class GPT2(nn.Module):
    def __init__(self,config):
        super().__init__()
        self.n_embd = config.n_embd

        # x -> (batch,seq)  to -> (batch,seq,n_embedding_dim) 在此表vocabulary_size 中找到对应的n_embd个特征
        self.word_embedding = nn.Embedding(config.vocab_size,config.n_embd)
        self.position_embedding = nn.Embedding(config.block_size,config.n_embd)
        # 定义有多少个block 块运行
        self.Block = nn.Sequential(
            *[Block(config) for _ in range(config.n_layer)]
        )
        self.layer_norm = nn.LayerNorm(config.n_embd)
        # 转化为最终的预测
        # 如果有weight tie 需要关掉linear的bias更新，因为Embedding中没有bias
        self.linear_proj = nn.Linear(config.n_embd,config.vocab_size,bias=False)

        self.apply(_init_weight)

    def _init_weight(self,module):
            # 正态分布初始化
            # isinstance 识别是否为Linear层，用于判断
            if isinstance(module,nn.Linear):
                torch.nn.init.normal(module.weight,mean=0.0,std=0.02)
                if module.bias is not None: #有bias情况下
                    torch.nn.init.zeros_(module.bias)
            elif isinstance(module,nn.Embedding):
                torch.nn.init.normal(module.weight,mean=0.0,std=0.02)
        
    def forward(self,x,target = 0):

        # 要确保 词信息和位置信息长度相同
        tokens_word = self.word_embedding(x)
        tokens_pos = self.position_embedding(torch.arange(seq,device=x.device))
        x = tokens_word + tokens_pos
        
        #送到中间的block当中
        x = self.Block(x)
        x = self.layer_norm(x)
        x = self.linear_proj(x)
        x = torch.softmax(x)
        batch,seq,vocab_size = x.size()
        if target is None:
            loss = None
        else:
            # final -> (batch,seq,vocab_size)
            # 做预测需要变为(batch * seq,n_embd)
            # 这里会将最后所有的tokens转化为 词典表里面每一个词典的概率，查看target的值是否为vocab_size的最大值，最后计算其交叉熵值，最后不断梯度下降更新这个loss
            x = x.view(batch * seq ,vocab_size)

            # target -> (batch,seq)
            # 做预测需要变为 (batch * seq)
            target = target.view(batch * seq)

            # 做交叉熵计算
            # 自带softmax
            loss = F.cross_entropy(x,target=target)
            return x,loss
    
    def generate(self,): #todo
        return 

## 数据集整理


In [None]:
class MyDataset(Dataset):
    def __init__(self,path,block_size = 512):
        super().__init__()
        import tiktoken
        self.enc = tiktoken.get_encoding("gpt2")
        # 定义长度大小
        self.block_size = block_size
        # 结束符 允许这个编码值被输出，如果不加allowed会被视为普通的tokens
        # 如果没有加allow，就没有办法被作为分割符号，并且输出的值也不会是加了allow的token，因为它会进行字符的拆分，再进行编码，和每个不同编码效果相关
        self.end_tokens = self.enc.encode(
            "<|endoftext|>",
            allowed_special={"<|endoftext|>"}
        )[0]

        # 用于存放最终用分割符切片好的，带有eos的block_size大小的数组
        self.encode_data = []


        import json
        # 首先读取文本的前1000条
        self.max_line = 1000
        # 临时存放数据
        raw_data = []

        with open(path,'r') as f:
            for i, line in enumerate(f):
                if i >=self.max_line:
                    break
                try:
                    # 读取每一行text后的文本，并且去掉空格
                    text = json.load(line.strip())['text']
                    raw_data.append(text)
                except json.JSONDecodeError:
                    continue
                # 所有异常都跳过不报错
                except Exception as e:
                    continue
        
        # 为读取到的每个数据增加分隔符
        full_data = []
        for text in raw_data:
            data = self.encode_data(text)
            # 这里用extend是为了把所有句子都连起来，用分隔符号划分句子
            full_data.extend(data + [self.end_tokens])
        
        # 训练时，可以每一个block_size为长度进行训练，因为有分割符号eos 模型不会跨句子学习信息
        
        for i in range(0,len(full_data),block_size):
            # 如果超过边界，则会进行切片操作，python不会报错
            chunk = full_data[i:i+block_size+1] # 偏移一位，用于预测 inputs 和label不应该相等
            # 如果在最后长度不够，则使用eos_tokens填充
            if chunk < block_size + 1 :
                chunk = chunk + [self.end_tokens] * (block_size + 1 - len(chunk))
            self.encode_data.append(chunk)

    def __len__(self):
        return len(self.encode_data)
    
    def __getitem__(self,x):
        chunk = self.encode_data(x)
        x = torch.tensor(chunk[:-1], dtype=torch.long)
        y = torch.tensor(chunk[1:], dtype=torch.long)
        return x,y
    #  GPT编码与解码
    def encode(self,x):
        return self.enc.encode(x)
    
    def decode(self,x):
        return self.enc.decode(x)

In [33]:
import tiktoken

# 获取 GPT-2 的编码器
enc = tiktoken.get_encoding("gpt2")

# 计算 <|endoftext|> 的 token ID
eos_token = enc.encode("<|endoftext|>")

print("End-of-text token ID:", eos_token)  # 输出: 50256

ValueError: Encountered text corresponding to disallowed special token '<|endoftext|>'.
If you want this text to be encoded as a special token, pass it to `allowed_special`, e.g. `allowed_special={'<|endoftext|>', ...}`.
If you want this text to be encoded as normal text, disable the check for this token by passing `disallowed_special=(enc.special_tokens_set - {'<|endoftext|>'})`.
To disable this check for all special tokens, pass `disallowed_special=()`.
