# 基础GPT的实现

本节将手写一个简单的GPT，并使用 torch.rand 生成数据来测试。为了实现模型跑通，以及使模型收敛
包含以下内容：

- [x] 模型结构
- [x] 损失函数
- [x] 优化器
- [x] 模型收敛验证

## 1.模型结构

### 1.1 实现单头注意力

输入 x：[seq_len, d_model]
K Q V线性矩阵：K[d_model, d_k], Q[d_model, d_k], V[d_model, d_v], W[d_model, d_model]

输出：y[seq_len, d_v]

In [2]:
import torch
import torch.nn as nn

class SelfAttention(nn.Module):
    def __init__(self, d_model, d_k, d_v):
        # super()函数是用于调用父类(超类)的一个方法
        super(SelfAttention, self).__init__()
        self.K = nn.Linear(d_model, d_k)
        self.Q = nn.Linear(d_model, d_k)
        self.V = nn.Linear(d_model, d_v)
        self.W = nn.Linear(d_v, d_model)

    def forward(self, x):
        # x: [seq_len, d_model]
        K = self.K(x)  # [seq_len, d_k]
        Q = self.Q(x)  # [seq_len, d_k]
        V = self.V(x)  # [seq_len, d_v]

        # 计算注意力权重，即Q和K的点积，再除以根号下d_k， K.transpose(-2, -1)表示将K的最后一个维度和倒数第二个维度交换位置,这样可以使得K和Q的点积结果是一个方阵得到 [seq_len, seq_len]的注意力权重矩阵
        attention_weights = torch.matmul(Q, K.transpose(-2, -1)) / torch.sqrt(torch.tensor(K.shape[-1], dtype=torch.float32)) # [seq_len, seq_len]
        attention_weights = torch.softmax(attention_weights, dim=-1)

        # 计算注意力输出
        y = torch.matmul(attention_weights, V)  # [seq_len, d_v]
        y = self.W(y)  # [seq_len, d_model]
        return y

代码分析：
1.. `__init__`方法中，我们定义了四个线性层，分别用于计算K、Q、V和W。
2. `forward`方法中，我们首先计算K、Q、V，然后计算注意力权重，最后计算注意力输出。
3. 注意力权重计算公式为：`attention_weights = torch.matmul(Q, K.transpose(-2, -1)) / torch.sqrt(torch.tensor(d_k, dtype=torch.float32))`，其中`torch.matmul`表示矩阵乘法，`K.transpose(-2, -1)`表示将K的最后一个维度和倒数第二个维度交换位置，这样可以使得K和Q的点积结果是一个方阵。
4. 注意力输出计算公式为：`y = torch.matmul(attention_weights, V)`，其中`torch.matmul`表示矩阵乘法，`attention_weights`表示注意力权重，`V`表示V矩阵。
5. 注意力输出经过线性层W后得到最终的输出。

测试

In [3]:
d_model = 512
d_k = 64
d_v = 64
seq_len = 10
x = torch.rand(seq_len, d_model)
attention = SelfAttention(d_model, d_k, d_v)
y = attention(x)
print(y.shape)  # [seq_len, d_model]

# 输出 torch.Size([10, 512])
# 维度分析 ：x [seq_len, d_model] , Wk [d_model, d_k] , Wq [d_model, d_k] , Wv [d_model, d_v] , W [d_v, d_model]
#                                   K [seq_len, d_k] , Q [seq_len, d_k] , V [seq_len, d_v] 
#                   attention_weights [seq_len, seq_len]
#                   y = attention_weights * V [seq_len, d_v] * W [d_v, d_model] 
#                     = [seq_len, seq_len] * [seq_len, d_v] * [d_v, d_model] = [seq_len, d_model]

torch.Size([10, 512])


### 1.2 实现多头注意力（非batch版本）

输入 x：[seq_len, d_model]

K Q V 线性矩阵：K[d_model,  num_heads * d_k], Q[d_model, num_heads * d_k], V[d_model, num_heads * d_v], W[num_heads * d_v, d_model]

输出：y[seq_len, d_model]

In [4]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, d_k, d_v, num_heads):
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.d_model = d_model
        self.d_k = d_k
        self.d_v = d_v
        
        self.K = nn.Linear(d_model, d_k * num_heads)
        self.Q = nn.Linear(d_model, d_k * num_heads)
        self.V = nn.Linear(d_model, d_v * num_heads)
        self.W = nn.Linear(d_v * num_heads, d_model)
        
    def forward(self, x):
        # x: [seq_len, d_model]
        K = self.K(x).view(-1, self.num_heads, self.d_k)  # [seq_len, num_heads, d_k]
        Q = self.Q(x).view(-1, self.num_heads, self.d_k)  # [seq_len, num_heads, d_k]
        V = self.V(x).view(-1, self.num_heads, self.d_v)  # [seq_len, num_heads, d_v]
        
        # [seq_len, num_heads, d_k] * [seq_len, d_k, num_heads] = [seq_len, num_heads, num_heads]
        attention_weights = torch.matmul(Q, K.transpose(-2, -1)) / torch.sqrt(torch.tensor(self.d_k, dtype=torch.float32)) 
        attention_weights = torch.softmax(attention_weights, dim=-1) # [seq_len, num_heads, num_heads]
        
        # 计算注意力输出 [seq_len, num_heads, num_heads] * [seq_len, num_heads, d_v] = [seq_len, num_heads, d_v]
        y = torch.matmul(attention_weights, V)  # [seq_len, num_heads, d_v]
        y = y.view(-1, self.num_heads * self.d_v)  # [seq_len, d_v * num_heads]
        
        # 计算多头注意力输出
        y = self.W(y)  # [seq_len, d_model]
        return y

代码分析：
1. 区别于单头注意力，多头注意力在计算K、Q、V时，对多头的处理体现在view函数中，将d_model维度拆分为num_heads个d_k和d_v维度。
2. 在计算注意力权重时，将K和Q的点积结果除以根号下d_k，以防止点积结果过大。
3. 在计算注意力输出时， y 也是先 经过view函数将num_heads和d_v维度合并，再经过线性层W得到最终的输出。

In [5]:
# 测试
d_model = 512
d_k = 64
d_v = 64
seq_len = 10
num_heads = 8
x = torch.rand(seq_len, d_model)
attention = MultiHeadAttention(d_model, d_k, d_v, num_heads)
y = attention(x)
print(y.shape)  # [seq_len, d_model]

# 维度分析 ： x [seq_len, d_model] , Wk [d_model, d_k * num_heads] , Wq [d_model, d_k * num_heads] , Wv [d_model, d_v * num_heads] , W [d_v * num_heads, d_model]
#                                   K [seq_len, num_heads, d_k] , Q [seq_len, num_heads, d_k] , V [seq_len, num_heads, d_v]
#                    attention_weights [seq_len, num_heads, num_heads]
#                 y = attention_weights * V [seq_len, num_heads, d_v]  = [seq_len, num_heads, num_heads] * [seq_len, num_heads, d_v] = [seq_len, num_heads, d_v]
#                 y = y.view(-1, self.num_heads * self.d_v) * W = [seq_len, d_v * num_heads] * [d_v * num_heads, d_model] = [seq_len, d_model]


torch.Size([10, 512])


### 1.3 实现多头注意力（batch）

输入 x：[batch_size, seq_len, d_model]

K Q V 线性矩阵：K[d_model,  num_heads * d_k], Q[d_model, num_heads * d_k], V[d_model, num_heads * d_v], W[num_heads * d_v, d_model]

输出：y[batch_size, seq_len, d_model]


In [6]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, d_k, d_v, num_heads):
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.d_model = d_model
        self.d_k = d_k
        self.d_v = d_v
        
        self.K = nn.Linear(d_model, d_k * num_heads)
        self.Q = nn.Linear(d_model, d_k * num_heads)
        self.V = nn.Linear(d_model, d_v * num_heads)
        self.W = nn.Linear(d_v * num_heads, d_model)

    def forward(self, x):
        # x: [batch_size, seq_len, d_model]

        K = self.K(x).view(-1, self.num_heads, self.d_k)  # [batch_size * seq_len, num_heads, d_k]
        Q = self.Q(x).view(-1, self.num_heads, self.d_k)  # [batch_size * seq_len, num_heads, d_k]
        V = self.V(x).view(-1, self.num_heads, self.d_v)  # [batch_size * seq_len, num_heads, d_v]
        
        attention_weights = torch.matmul(Q, K.transpose(-2, -1)) / torch.sqrt(torch.tensor(self.d_k, dtype=torch.float32))  # [batch_size * seq_len, num_heads, num_heads]
        attention_weights = torch.softmax(attention_weights, dim=-1)  # [batch_size * seq_len, num_heads, num_heads]
      
    
        y = torch.matmul(attention_weights, V)  # [batch_size * seq_len, num_heads, d_v]
        y = y.view(-1, self.num_heads * self.d_v)  # [batch_size * seq_len, d_v * num_heads]
       
        y = self.W(y)  # [batch_size * seq_len, d_model]
        y = y.view(-1, x.size(1), self.d_model)  # [batch_size, seq_len, d_model]
        return y

In [7]:
# 测试
d_model = 512
d_k = 64
d_v = 64
num_heads = 8
seq_len = 10

batch_size = 32

# 生成一个batch_size * seq_len * d_model 的随机张量
x = torch.rand(batch_size, seq_len, d_model)
attention = MultiHeadAttention(d_model, d_k, d_v, num_heads)
y = attention(x)
print(y.shape)  # [batch_size, seq_len, d_model]

# 维度分析 ： x [batch_size, seq_len, d_model] , Wk [d_model, d_k * num_heads] , Wq [d_model, d_k * num_heads] , Wv [d_model, d_v * num_heads] , W [d_v * num_heads, d_model]
#                                   K [batch_size * seq_len, num_heads, d_k] , Q [batch_size * seq_len, num_heads, d_k] , V [batch_size * seq_len, num_heads, d_v]
#                    attention_weights [batch_size * seq_len, num_heads, num_heads]
#                    y [batch_size * seq_len, num_heads, d_v]
#                    y = y.view(-1, self.num_heads * self.d_v) [batch_size * seq_len, d_v * num_heads]
#                    y = self.W(y) [batch_size * seq_len, d_model]
#                    y = y.view(-1, x.size(1), self.d_model) [batch_size, seq_len, d_model]


torch.Size([32, 10, 512])


In [8]:
# 继续优化，添加残差连接和层归一化
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, d_k, d_v, num_heads):
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.d_model = d_model
        self.d_k = d_k
        self.d_v = d_v
        self.K = nn.Linear(d_model, d_k * num_heads)
        self.Q = nn.Linear(d_model, d_k * num_heads)
        self.V = nn.Linear(d_model, d_v * num_heads)
        self.W = nn.Linear(d_v * num_heads, d_model)
        self.layer_norm = nn.LayerNorm(d_model)

    def forward(self, x):
        # x: [batch_size, seq_len, d_model]
        K = self.K(x).view(-1, self.num_heads, self.d_k)  # [batch_size * seq_len, num_heads, d_k]
        Q = self.Q(x).view(-1, self.num_heads, self.d_k)  # [batch_size * seq_len, num_heads, d_k]
        V = self.V(x).view(-1, self.num_heads, self.d_v)  # [batch_size * seq_len, num_heads, d_v]
        attention_weights = torch.matmul(Q, K.transpose(-2, -1)) / torch.sqrt(torch.tensor(self.d_k, dtype=torch.float32))  # [batch_size * seq_len, num_heads, num_heads]
        attention_weights = torch.softmax(attention_weights, dim=-1)  # [batch_size * seq_len, num_heads, num_heads]
        y = torch.matmul(attention_weights, V)  # [batch_size * seq_len, num_heads, d_v]
        y = y.view(-1, self.num_heads * self.d_v)  # [batch_size * seq_len, d_v * num_heads]
        y = self.W(y)  # [batch_size * seq_len, d_model]
        
        y = y.view(-1, x.size(1), self.d_model)  # [batch_size, seq_len, d_model]
        y = self.layer_norm(x + y)  # [batch_size, seq_len, d_model]
        return y

In [9]:
# 测试
d_model = 512
d_k = 64
d_v = 64
num_heads = 8
seq_len = 10

batch_size = 32

# 生成一个batch_size * seq_len * d_model 的随机张量
x = torch.rand(batch_size, seq_len, d_model)
attention = MultiHeadAttention(d_model, d_k, d_v, num_heads)
y = attention(x)
print(y.shape)  # [batch_size, seq_len, d_model]

torch.Size([32, 10, 512])


### 1.4 实现 Block
Block 在 GPT 中由两个部分组成：一个 MultiHeadAttention 层和一个前馈神经网络层。前馈神经网络层由两个线性层和一个激活函数组成。



In [10]:
class Block(nn.Module):
    def __init__(self, d_model, d_k, d_v, num_heads, d_ff):
        super(Block, self).__init__()
        self.attention = MultiHeadAttention(d_model, d_k, d_v, num_heads)
        self.feed_forward = nn.Sequential(
            nn.Linear(d_model, d_ff),
            nn.ReLU(),
            nn.Linear(d_ff, d_model)
        )
        self.layer_norm = nn.LayerNorm(d_model)

    def forward(self, x):
        # x: [batch_size, seq_len, d_model]
        y = self.attention(x)

        y = self.feed_forward(y)
        y = self.layer_norm(x + y)
        return y
# 测试
d_model = 512
d_k = 64
d_v = 64
num_heads = 8
d_ff = 2048
seq_len = 10

batch_size = 32

# 生成一个batch_size * seq_len * d_model 的随机张量
x = torch.rand(batch_size, seq_len, d_model)
block = Block(d_model, d_k, d_v, num_heads, d_ff)
y = block(x)

# 打印输出张量的形状
print(y.shape)  # [batch_size, seq_len, d_model]

torch.Size([32, 10, 512])


### 1.6 实现 GPT
GPT 由多个 Block 组成，每个 Block 包含一个 MultiHeadAttention 层和一个前馈神经网络层。GPT 的输入是一个序列，输出是序列的每个位置的隐藏状态。

In [None]:
class FGPT(nn.Module):
    def __init__(self, d_model, d_k, d_v, num_heads, d_ff, num_layers):

        super(FGPT, self).__init__()
        self.num_layers = num_layers
        self.layers = nn.ModuleList([Block(d_model, d_k, d_v, num_heads, d_ff) for _ in range(num_layers)])
        self.layer_norm = nn.LayerNorm(d_model)

    def forward(self, x):
        # x: [batch_size, seq_len, d_model]
        for layer in self.layers:
            x = layer(x)
        x = self.layer_norm(x)
        return x

In [None]:
# 测试
d_model = 768
d_k = 64
d_v = 64
num_heads = 12
d_ff = 3072
num_layers = 12
seq_len = 10

batch_size = 32

# 生成一个batch_size * seq_len * d_model 的随机张量
x = torch.rand(batch_size, seq_len, d_model)
gpt = FGPT(d_model, d_k, d_v, num_heads, d_ff, num_layers)
y = gpt(x)

# 打印输出张量的形状
print(y.shape)  # [batch_size, seq_len, d_model]



torch.Size([32, 10, 768])


### 1.7 收敛测试
检查模型是否能正常收敛


In [15]:
# 构建模型
d_model = 768
d_k = 64
d_v = 64
num_heads = 12
d_ff = 3072
num_layers = 12
seq_len = 10

batch_size = 32

# 生成任意大批量的数据
data_leng = 100
x = torch.rand(data_leng, seq_len, d_model)
# 生成标签
y = torch.rand(data_leng, seq_len, d_model)

# 构建模型
gpt = GPT(d_model, d_k, d_v, num_heads, d_ff, num_layers)

# 定义损失函数
criterion = nn.MSELoss()

# 定义优化器
optimizer = torch.optim.Adam(gpt.parameters(), lr=0.001)

# 训练模型
for epoch in range(20):
    optimizer.zero_grad()
    y_pred = gpt(x)
    loss = criterion(y_pred, y)
    loss.backward()
    optimizer.step()
    if epoch % 10 == 0:
        print(f"Epoch {epoch}, Loss: {loss.item()}")
###PATH:./FGPT/01 GPT.ipynb


Epoch 0, Loss: 1.3334763050079346
Epoch 10, Loss: 1.1047908067703247
