In [2]:
import math
import inspect
from dataclasses import dataclass

import torch
import torch.nn as nn
from torch.nn import functional as F

# 0.数据（用于调试

In [22]:
X = torch.load('X.tensor').to('cpu')
Y = torch.load('Y.tensor').to('cpu')

print("X: ", X.shape)
print("Y: ", Y.shape)

X:  torch.Size([16, 256])
Y:  torch.Size([16, 256])


# 1.GPTConfig

In [14]:
@dataclass
class GPTConfig:
    block_size: int = 1024
    #  GPT-2 vocab_size of 50257, padded up to nearest multiple of 64 for efficiency
    #  怎么对vocab进行pad? 没有直观感受
    vocab_size: int = 50304
    n_layer: int = 12
    n_head: int = 12
    n_embd: int = 768
    dropout: float = 0.0
    bias: bool = True

# 2.LayerNorm

    ndim: 隐藏层维度，也就是num_hiddens
    
    weights和bias的形状与PyTorch官方实现的一样吗，都是normalized_shape
    正确理解就是你之前的理解，对每个特征维度分布一对，而不是小冬瓜讲的那样
    

In [3]:
class LayerNorm(nn.Module):
    """
        pytorch中自带的LayerNorm类默认带有bias，并且不支持通过指定参数bias=False禁用偏置项，
        所以这里自己定义一个LayerNorm层
    """
    def __init__(self, ndim, bias):
        # python2中为super(LayerNorm, self),即传入子类和对象实例作为参数
        # 但是python3可以通过内部机制推断出正确的类层级关系并调用父类的初始化方法，直接使用这种简洁形式就可以了
        super().__init__()
        self.weight = nn.Parameter(torch.ones(ndim))
        self.bias = nn.Parameter(torch.zeros(ndim)) if bias else None
        
    def forward(self, input):
        # 更底层的实现，比LayerNorm的实例化多传入weight和bias和eps
        return F.layer_norm(input, self.weight.shape, self.weight, self.bias, 1e-5)

# 3.SelfAttention

    比transformer的attention多一个dropout操作（多了个resid_dropout）:
        原来的attention只在masked attention matrix计算出来后进行dropout，
        而这里对W_0的结果也进行了一次dropout

## 3.1 torch2.0以上自带的torch.nn.functional.scaled_dot_product方法的底层逻辑
    
    1.is_causal和attn_mask的关系
            is_causal为True  和  attn_mask不为None   不能同时满足
            从if is_causal:  assert attn_mask is None这里可以看出来
            
            虽然传入的是attn_mask，但是最后起作用的mask其实是attn_bias而不是attn_mask
            is_causal和attn_mask都只是用于形成最终的attn_bias
            
            is_causal为True时（此时attn_mask必须为None）,attn_bias下三角（不包括对角线）为0，其它位置为-inf
            is_causal为False,attn_mask不为None时：
                attn_mask可能为torch.bool,也可能为0和-inf
                不过最后也是行程0和-ing组成的attn_bias张量
            （默认情况）is_causal为False并且attn_mask也为None，那么attn_bias全为0,也就是全都看得到（包括padding)
            
            最后把attn_bias和未经softmax的attention_weights相加，然后再softmax就可以了

In [24]:
temp_mask = torch.ones(3, 4, dtype=torch.bool).tril(diagonal=0)
temp_mask

tensor([[ True, False, False, False],
        [ True,  True, False, False],
        [ True,  True,  True, False]])

In [None]:
# 是这个逻辑，但是具体实现更高效，毕竟flashattention
def scaled_dot_product_attention(query, key, value, attn_mask=None, dropout_p=0.0, is_causal=False, scale=None) -> torch.Tensor:
    # Efficient implementation equivalent to the following:
    L, S = query.size(-2), key.size(-2)
    scale_factor = 1 / math.sqrt(query.size(-1)) if scale is None else scale
    attn_bias = torch.zeros(L, S, dtype=query.dtype)
    if is_causal:
        assert attn_mask is None
        temp_mask = torch.ones(L, S, dtype=torch.bool).tril(diagonal=0)
        attn_bias.masked_fill_(temp_mask.logical_not(), float("-inf"))
        attn_bias.to(query.dtype)

    if attn_mask is not None:
        if attn_mask.dtype == torch.bool:
            attn_bias.masked_fill_(attn_mask.logical_not(), float("-inf"))
        else:
            attn_bias += attn_mask
    attn_weight = query @ key.transpose(-2, -1) * scale_factor
    attn_weight += attn_bias
    attn_weight = torch.softmax(attn_weight, dim=-1)
    attn_weight = torch.dropout(attn_weight, dropout_p, train=True)
    return attn_weight @ value

## 3.2借助scaled_dot_product实现selfattention

In [8]:
class CausalSelfAttention(nn.Module):
    
    def __init__(self, config):
        super().__init__()
        # 确保num_hiddens除以num_heads除得开
        assert config.n_embd % config.n_head == 0
        # 
        self.c_attn = nn.Linear(config.n_embd, 3 * config.n_embd, bias = config.bias) # 分qkv用
        self.c_proj = nn.Linear(config.n_embd, config.n_embd, bias=config.bias) # W_o
        self.attn_dropout = nn.Dropout(config.dropout) # 给手动实现的attention用（slow attention)
        self.resid_dropout = nn.Dropout(config.dropout) # 给W_o用
        self.dropout = config.dropout # 给flash attention用
        self.n_head = config.n_head
        self.n_embd = config.n_embd
        # hasattr方法不只可以检查类的实例中有没有属性，还可以检查模块中有没有属性/变量，方法，类等，用处非常广泛
        self.flash = hasattr(torch.nn.functional, 'scaled_dot_product_attention')
        # 如果torch>=2.0.0,使用自带的scaled_dot_product_attention方法
        # 模型默认使用强制教学，自带方法可以通过设置is_causal为True来实现下三角掩码
        # 而手动实现却没有下三角掩码，所以这里需要自己定义一个大小为max_len * max_len的下三角矩阵，以满足所有可能长度的强制教学
        # 而且这个下三角矩阵是不需要更新的，所以就存在缓冲区即可
        if not self.flash:
            print("WARNING: using slow attention. Flash Attention requires PyTorch >= 2.0")
            # 缓冲区注册的bias，也是模型的一部分，并且与模型生命周期相同，强绑定
            # 也可以通过causal_attention.bias.shape这种方式来访问
            # 区别是不会被优化器作为可训练参数，常用于这种不会变的变量
            # torch.tril的diagonal参数默认为0,表示包含对角线元素的下三角矩阵
            self.register("bias", torch.tril(torch.ones(config.block_size, config.block_size))).view(1, 1, config.block_size, config.block_size)
    
    def forward(self, x):
        B, T, C = x.size()  # batch_size, seq_len, num_hiddens
        
        # 获得形状为 B, num_heads, T, C/num_heads 的 QKV
        q, k, v = self.c_attn(x).split(self.n_embd, dim=2)
        k = k.view(B, T, self.n_head, C // self.n_head).transpose(1, 2) # (B, nh, T, hs)
        q = q.view(B, T, self.n_head, C // self.n_head).transpose(1, 2) # (B, nh, T, hs)
        v = v.view(B, T, self.n_head, C // self.n_head).transpose(1, 2) # (B, nh, T, hs)
        
        # dot_product_attention
        if self.flash:
            # pytorch自带实现
            y = torch.nn.functional.scaled_dot_product_attention(q, k, v, attn_mask=None, dropout_p = self.dropout if self.training else 0, is_causal=True)
        else:
            # 手动实现
            att = (q @ k.transpose(-2, -1)) * (1.0 / math.sqrt(k.size(-1)))
            att = att.masked_fill(self.bias[:, :, :T, :T] == 0, float('-inf'))
            att = F.softmax(att, dim=-1)
            att = self.attn_dropout(att)
            y = att @ v
        y = y.transpose(1, 2).contiguouus().view(B, T, C)
        
        # W_o + dropout
        y = self.resid_dropout(self.c_proj(y))
        return y

# 4. FFN(这里就叫做MLP)

In [10]:
class MLP(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.c_fc = nn.Linear(config.n_embd, 4*config.n_embd, bias=config.bias)
        self.gelu = nn.GELU() # 本质都是非线性变化，不必深究
        self.c_proj = nn.Linear(4*config.n_embd, config.n_embd, bias = config.bias)
        self.dropout = nn.Dropout(config.dropout)
    def forward(self, x):
        x = self.gelu(self.c_fc(x))
        return self.dropout(self.c_proj(x))

# 5.decoderBlock

    与transformer中decoder的区别：
         1.前者顺序为：
            attention → add → layernorm → ffn → add → layernorm
          后者顺序为：
             layernorm → attention → add → layernorm → mlp → add
         2.之前add&nom模块中有dropout操作，现在没有add&norm了，dropout并没有消失，而是转到了attention和mlp块中
          仔细看下：
              causalattention中对W_o的dropout
              mlp中对c_proj的dropout
          这在transformer中都是没有的，在gpt中的其实就是add&norm块中转移过去的

In [13]:
class Block(nn.Module):
    def __init__(self, config):
        super().__init__()
        # 没有dropout，dropout都被包含到了attn和mlp中
        self.ln_1 = LayerNorm(config.n_embd, bias=config.bias)
        self.attn = CausalSelfAttention(config)
        self.ln_2 = LayerNorm(config.n_embd, bias=config.bias)
        self.mlp = MLP(config)
    def forward(self, x):
        x = x + self.attn(self.ln_1(x))
        x = x + self.mlp(self.ln_2(x))
        return x

# 6.GPT

    1.对于 for pn, p in self.named_parameters():
            if pn.endswith('c_proj.weight'):
                torch.nn.init.normal_(p, mean=0.0, std=0.02/math.sqrt(2 * config.n_layer))
         的理解：
         这种初始化方式借鉴了GPT-2论文中所提出的建议，即对残差结构中最后的线性层（也称为分类器前的最后一层）
         应用一个特殊的缩放初始化策略，旨在更好地控制模型训练初期的信号传播和梯度更新。
    2.pos_encoding不再自己提前设定了，而是让模型自己学（非常奔放


In [21]:
class GPT(nn.Module):
    def __init__(self, config):
        super().__init__()
        assert config.vocab_size is not None
        assert config.block_size is not None
        self.config = config
        
        self.transformer = nn.ModuleDict(dict(
            wte = nn.Embedding(config.vocab_size, config.n_embd), # vocab embedding
            wpe = nn.Embedding(config.block_size, config.n_embd),  # pos embedding
            drop = nn.Dropout(config.dropout), # 用于两个embedding操作完成并相加后，进行drop
            h = nn.ModuleList([Block(config) for _ in range(config.n_layer)]),
            ln_f = LayerNorm(config.n_embd, bias=config.bias)  # 全部block结束之后，再layernorm一下
        ))
        
        # 输出头
        self.lm_head = nn.Linear(config.n_embd, config.vocab_size, bias=False)
        # weight typing，一种权重共享的操作，可能会引起警告，但是不耽误实际运行
        # (lm_head): Linear(in_features=768, out_features=65, bias=False)
        # Linear层的weight形状不是768 x 65！！而是65 x 768!!
        self.transformer.wte.weight = self.lm_head.weight
        
        # 权重初始化
        # 其实仔细看下，GPT中的可更新权重全部都是Linear和embedding,layernorm的在类里面已经初始化过了，不需要了
        self.apply(self._init_weights)
        # Module()类的属性named_parameters()方法可以返回一个 生成器生成所有可更新参数的name和parameter
        for name, parameter in self.named_parameters():
            if name.endswith('c_proj.weight'):
                torch.nn.init.normal_(parameter, mean=0.0, std=0.02/math.sqrt(2*config.n_layer))
        # 顺便输出参数数量
        print("number of parameters: %.2fM" % (self.get_num_params()/1e6, ))
        
    def 