## GPT模型
前面详细介绍了Transformer结构, 现在要介绍下GPT结构, 其与Transformer结构类似, 且只包含Decoder结构  

<img src="./images/GPT结构.png" alt="examples" style="zoom:45%;" />  

如上图所示, GPT包含的关键组件有:  
1.多头自注意力：通过ScaledDotProductAttention类实现缩放点积注意力机制，然后通过MultiHeadAttention类实现多头自注意力机制  
2.逐位置前馈网络：通过PoswiseFeedForwardNet类实现逐位置前馈网络  
3.正弦位置编码表：通过get_sin_code_table函数生成正弦位置编码表  
4.填充掩码：通过get_attn_pad_mask函数为填充令牌`<pad>`生成注意力掩码，避免注意力机制关注无用的信息  
5.后续掩码：通过get_attn_subsequent_mask函数为后续令牌(当前位置后面的信息)生成注意力掩码，避免解码器中的注意力机制"偷窥"未来的目标数据  
6.解码器层：通过DecoderLayer类定义解码器的单层  
7.解码器：通过Decoder类定义Transformer完整的解码器部分
8.GPT：在上述结构的基础上增加一个投影层，将解码器层输出的特征向量转换为预测输出，实现文本生成  

其中，组件1和组件5直接从前文复制过来即可，剩余组件需要重新定义

In [188]:
import numpy as np
import torch
import torch.nn as nn
d_k = 64 
d_v = 64

# 定义缩放点积注意力函数
class ScaledDotProductAttention(nn.Module):
    def __init__(self):
        super(ScaledDotProductAttention, self).__init__()
    def forward(self, Q, K, V, attn_mask):
        #------------------------- 维度信息 --------------------------------
        # Q K V [batch_size, n_heads, seq_len, seq_dim] -> fea_dim = n_heads * seq_dim
        # attn_mask [batch_size, n_heads, len_q, len_k]
        #------------------------------------------------------------------
        
        # 计算注意力分数（原始权重）[batch_size, n_heads, len_q, len_k]
        scores = torch.matmul(Q, K.transpose(-1, -2)) / np.sqrt(d_k)
        
        # 使用注意力掩码，将attn_mask中值为1(或者是布尔类型的true)的位置(填充位置)的权重替换为极小值
        scores.masked_fill_(attn_mask, -1e9)
        
        # 对注意力分数进行softmax归一化处理
        # weight -> [batch_size, n_heads, len_q, len_k]
        weight = nn.Softmax(dim=-1)(scores)
        
        # 再次点积操作，计算上下文向量(注意力的输出)
        # context -> [batch_size, n_heads, len_q, dim_v]
        context = torch.matmul(weight, V)
        return context, weight      

In [189]:
# 定义多头自注意力类
d_embedding = 512 # Embedding维度
n_heads = 8 # 多头注意力的头数 fea_dim = 512 / 8 = 64
batch_size = 3
class MultiHeadAttention(nn.Module):
    def __init__(self):
        super(MultiHeadAttention, self).__init__()
        self.W_Q = nn.Linear(d_embedding, d_k * n_heads) # Q的线性变换层
        self.W_K = nn.Linear(d_embedding, d_k * n_heads) # K的线性变换层
        self.W_V= nn.Linear(d_embedding, d_v * n_heads) # V的线性变换层
        self.linear = nn.Linear(d_v * n_heads, d_embedding)
        self.layer_norm = nn.LayerNorm(d_embedding)
    
    def forward(self, Q, K, V, attn_mask):
        #------------------------- 维度信息 --------------------------------
        # Q K V -> [batch_size, len_q/len_k,len_v, embedding_dim]
        #------------------------------------------------------------------
        residual, batch_size = Q, Q.size(0) # 保留残差连接
        
        # 将输入进行线性变换和重塑，以便后续处理
        q_s = self.W_Q(Q).view(batch_size, -1, n_heads, d_k).transpose(1, 2)
        k_s = self.W_K(K).view(batch_size, -1, n_heads, d_k).transpose(1, 2)
        v_s = self.W_V(V).view(batch_size, -1, n_heads, d_v).transpose(1, 2)
        
        #------------------------- 维度信息 -------------------------------- 
        # q_s k_s v_s -> [batch_size, n_heads, len_q/k/v, d_k/k/v]
        #------------------------------------------------------------------
        # 将注意力掩码复制到多头
        # 先通过unsqueeze来增加一个维度(batch_size, 1, len_q, len_k)
        # 然后复制n_heads次数据，保证在每个头上的掩码数据都一致
        attn_mask = attn_mask.unsqueeze(1).repeat(1, n_heads, 1, 1)
        
        #------------------------- 维度信息 -------------------------------- 
        # attn_mask -> [batch_size, n_heads, len_q, len_k]
        #------------------------------------------------------------------
        # 使用缩放点积注意力计算上下文和注意力权重
        context, weights = ScaledDotProductAttention()(q_s, k_s, v_s, attn_mask)
        
        #------------------------- 维度信息 -------------------------------- 
        # context -> [batch_size, n_heads, len_q, dim_v]
        # weights -> [batch_size, n_heads, len_q, len_k]
        #------------------------------------------------------------------
        # 将多个头的上下文向量连接在一起
        # context -> [batch_size, len_q, n_heads * dim_v]
        context = context.transpose(1, 2).contiguous().view(batch_size, -1, n_heads * d_v)
        
        # 线性变换，转换为embedding_size
        output = self.linear(context)
        #------------------------- 维度信息 -------------------------------- 
        # output -> [batch_size, len_q, embedding_dim]
        #------------------------------------------------------------------

        # 与输入残差连接，并进行层一化处理
        output = self.layer_norm(output + residual)
        
        # 返回层归一化的结果和注意力权重
        return output, weights        

In [190]:
# 定义逐位置前馈网络
class PoswiseFeedForwardNet(nn.Module):
    def __init__(self, d_ff=2048):
        super(PoswiseFeedForwardNet, self).__init__()
        # 定义一维卷积层，用于将输入映射到高维度
        self.conv1 = nn.Conv1d(in_channels=d_embedding, out_channels=d_ff, kernel_size=1)
        # 定义一维卷积层，用于将输入映射回原始维度
        self.conv2 = nn.Conv1d(in_channels=d_ff, out_channels=d_embedding, kernel_size=1)
        # 定义层归一化
        self.layer_norm = nn.LayerNorm(d_embedding)
    def forward(self, inputs):
        #------------------------- 维度信息 -------------------------------- 
        # input -> [batch_size, len_q, embedding_dim]
        #------------------------------------------------------------------
        residual = inputs # 保留残差连接
        
        # 将输入进行卷积运算之后的激活函数时ReLU
        output = nn.ReLU()(self.conv1(inputs.transpose(1, 2)))
        
        #------------------------- 维度信息 -------------------------------- 
        # output -> [batch_size, d_ff, len_q]
        #------------------------------------------------------------------
        residual = inputs # 保留残差连接
        
        # 使用第二个卷积层机进行降维
        output = self.conv2(output).transpose(1, 2)
        #------------------------- 维度信息 -------------------------------- 
        # output -> [batch_size, len_q, d_embedding]
        #------------------------------------------------------------------
        
        # 残差连接+归一化
        output = self.layer_norm(output + residual)
        
        return output    

In [191]:
# 生成正弦位置编码表的函数，用于在 Transformer 中引入位置信息
def get_sin_enc_table(n_position, embedding_dim):
    #------------------------- 维度信息 --------------------------------
    # n_position: 输入序列的最大长度
    # embedding_dim: 词嵌入向量的维度
    #-----------------------------------------------------------------    
    # 根据位置和维度信息，初始化正弦位置编码表
    sinusoid_table = np.zeros((n_position, embedding_dim))    
    # 遍历所有位置和维度，计算角度值
    for pos_i in range(n_position):
        for hid_j in range(embedding_dim):
            angle = pos_i / np.power(10000, 2 * (hid_j // 2) / embedding_dim)
            sinusoid_table[pos_i, hid_j] = angle    
    # 计算正弦和余弦值
    sinusoid_table[:, 0::2] = np.sin(sinusoid_table[:, 0::2])  # dim 2i 偶数维
    sinusoid_table[:, 1::2] = np.cos(sinusoid_table[:, 1::2])  # dim 2i+1 奇数维    
    #------------------------- 维度信息 --------------------------------
    # sinusoid_table 的维度是 [n_position, embedding_dim]
    #----------------------------------------------------------------   
    return torch.FloatTensor(sinusoid_table)  # 返回正弦位置编码表

In [192]:
def get_attn_pad_mask(seq_q, seq_k):
    #------------------------- 维度信息 --------------------------------
    # seq_q 的维度是 [batch_size, len_q]
    # seq_k 的维度是 [batch_size, len_k]
    #-----------------------------------------------------------------

    batch_size, len_q = seq_q.size()
    batch_size, len_k = seq_k.size()
    
    # 生成boolean类型张量，即将seq_k中token为0的位置变为true
    pad_attn_mask = seq_k.data.eq(0).unsqueeze(1)
    
    #------------------------- 维度信息 --------------------------------
    # pad_attn_mask 的维度是 [batch_size, 1, len_k]
    #-----------------------------------------------------------------
    # 再次变形为与注意力分数相同形状的张量 -> [batch_size, len_q, len_k]
    pad_attn_mask = pad_attn_mask.expand(batch_size, len_q, len_k)
    
    return pad_attn_mask
    

In [193]:
import numpy as np
def get_attn_subsequent_mask(seq):
    #------------------------- 维度信息 --------------------------------
    # seq 的维度是 [batch_size, seq_len(Q)=seq_len(K)]
    #-----------------------------------------------------------------
    # 获取输入序列的形状
    attn_shape = [seq.size(0), seq.size(1), seq.size(1)]
    
    # 通过numpy创建一个上三角矩阵(triu = triangle upper)
    # subsequent_mask -> [batch_size, seq_len(Q), seq_len(K)]
    subsequent_mask = np.triu(np.ones(attn_shape), k=1)
    
    # 将numpy数组转换为Pytorch张量，并将数据类型设置为byte(布尔值)
    subsequent_mask = torch.from_numpy(subsequent_mask).byte()
    
    return subsequent_mask

### 组件6: 解码器层
GPT的解码器层比较简单，相比于Transformer少了解码器-编码器注意力，因此模型的训练速度也大大提升了。这主要还是因为GPT是一个单向生成式模型，只关注生成文本而不关注源文本

In [194]:
class DecoderLayer(nn.Module):
    def __init__(self):
        super(DecoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention()
        self.feed_forward = PoswiseFeedForwardNet()
        # 原来的代码中还有nn.LayerNorm层归一化操作，这次是不需要，因为在自注意力和前馈网络中已经有了
    
    def forward(self, dec_inputs, attn_amsk=None):
        attn_output, _ = self.self_attn(dec_inputs, dec_inputs, dec_inputs, attn_amsk)
        dec_outputs = self.feed_forward(attn_output)
        return dec_outputs

### 组件7: 解码器类

In [195]:
n_layers = 6
class Decoder(nn.Module):
    def __init__(self, vocab_size, max_seq_len):
        super(Decoder, self).__init__()
        # 词嵌入层
        self.src_emb = nn.Embedding(vocab_size, d_embedding)
        # 位置编码层
        self.pos_emb = nn.Embedding(max_seq_len, d_embedding)
        # 初始化n个解码器层
        self.layers = nn.ModuleList(DecoderLayer() for _ in range(n_layers))
    
    def forward(self, dec_inputs):
        # 创建位置信息 数据将被保存在与dec_inputs相同的设备上
        positions = torch.arange(len(dec_inputs), device=dec_inputs.device).unsqueeze(-1)
        # 词嵌入与位置编码相加
        inputs_embedding = self.src_emb(dec_inputs) + self.pos_emb(positions)
        # 填充掩码
        attn_pad_mask = get_attn_pad_mask(dec_inputs, dec_inputs)
        # 后续掩码
        self_attn_mask = get_attn_subsequent_mask(dec_inputs)
        attn_mask = torch.gt((attn_pad_mask + self_attn_mask), 0).to(device)
        dec_outputs = inputs_embedding
        for layer in self.layers:
            dec_outputs = layer(dec_outputs, attn_mask)
        return dec_outputs

### 组件8: GPT模型

In [196]:
class GPT(nn.Module):
    def __init__(self, vocab_size, max_seq_len):
        super(GPT, self).__init__()
        self.decoder = Decoder(vocab_size, max_seq_len)
        self.projection = nn.Linear(d_embedding, vocab_size)
    def forward(self, dec_inputs):
        dec_output = self.decoder(dec_inputs)
        output = self.projection(dec_output)
        return output

## 模型训练和预测

### 处理语料

In [197]:
from collections import Counter
class LanguageCorpus:
    def __init__(self, sentences):
        self.sentences = sentences
        # 计算语料中句子的最大长度，并加2以容纳特殊符号
        self.seq_len = max([len(sentence.split()) for sentence in sentences]) + 2
        self.vocab = self.create_vocabulary() # 创建词汇表
        self.idx2word = {v: k for k, v in self.vocab.items()} # 创建索引到单词的映射关系
        
    def create_vocabulary(self):
        vocab = {'<pad>': 0, '<sos>': 1, '<eos>': 2}
        counter = Counter()
        # 统计语料库的单词频率
        for sentence in self.sentences:
            words = sentence.split()
            counter.update(words)
        # 创建词汇表，并给每个word分配唯一的索引值
        for word in counter:
            if word not in vocab:
                vocab[word] = len(vocab)
        
        return vocab
    
    def make_batch(self, batch_size, test_batch=False):
        input_batch, output_batch = [], []
        sentences_indices = torch.randperm(len(self.sentences))[:batch_size] # 随机选择句子索引
        for index in sentences_indices:
            sentence = self.sentences[index]
            # 将句子转换为索引序列
            seq = [self.vocab['<sos>']] + [self.vocab[word] for word in sentence.split()] + [self.vocab['<eos>']]
            seq += [self.vocab['<pad>']] * (self.seq_len - len(seq)) # 通过pad填充序列
            # 创建训练和目标数据
            input_batch.append(seq[:-1])
            output_batch.append(seq[1:])
        return torch.LongTensor(input_batch), torch.LongTensor(output_batch)

### 读取小批量文本数据

In [198]:
with open('lang.txt', 'r') as file:
    sentences = [line.strip() for line in file.readlines()]

corpus = LanguageCorpus(sentences)
max_seq_len = corpus.seq_len
vocab_size = len(corpus.vocab)

print(f"语料库的词汇表大小: {vocab_size}")
print(f"最长句子长度: {max_seq_len}")

语料库的词汇表大小: 133
最长句子长度: 17


### 模型训练

In [199]:
import torch.optim as optim
device = "cuda" if torch.cuda.is_available() else "cpu"
# 创建模型实例
model = GPT(vocab_size, max_seq_len).to(device)
# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)
epochs = 500
for epoch in range(epochs):
    optimizer.zero_grad()
    inputs, targets = corpus.make_batch(batch_size)
    inputs, targets = inputs.to(device), targets.to(device)
    outpus = model(inputs)
    loss = criterion(outpus.view(-1, vocab_size), targets.view(-1))
    if (epoch + 1) % 100 == 0:
        print(f"Epoch: {epoch + 1:04d} cost = {loss:.6f}")
    loss.backward()
    optimizer.step()

Epoch: 0100 cost = 0.298699
Epoch: 0200 cost = 0.331234
Epoch: 0300 cost = 0.305218
Epoch: 0400 cost = 0.209336
Epoch: 0500 cost = 0.206406


### 文本生成中的自回归(贪婪搜索)

In [200]:
# 测试文本生成
def generate_text(model, input_str, max_len=50):
    model.eval()  # 将模型设置为评估（测试）模式，关闭 dropout 和 batch normalization 等训练相关的层
    # 将输入字符串中的每个 token 转换为其在词汇表中的索引
    input_tokens = [corpus.vocab[token] for token in input_str]
    # 创建一个新列表，将输入的 tokens 复制到输出 tokens 中 , 目前只有输入的词
    output_tokens = input_tokens.copy()
    with torch.no_grad():  # 禁用梯度计算，以节省内存并加速测试过程
        for _ in range(max_len):  # 生成最多 max_len 个 tokens
            # 将输出的 token 转换为 PyTorch 张量，并增加一个代表批次的维度 [1, len(output_tokens)]
            inputs = torch.LongTensor(output_tokens).unsqueeze(0).to(device)
            print(inputs.shape)
            outputs = model(inputs) # 输出 logits 形状为 [1, len(output_tokens), vocab_size]
            print(outpus.shape)
            # 在最后一个维度上获取 logits 中的最大值，并返回其索引（即下一个 token）
            _, next_token = torch.max(outputs[:, -1, :], dim=-1)     
            print(next_token)       
            next_token = next_token.item() # 将张量转换为 Python 整数            
            if next_token == corpus.vocab["<eos>"]:
                break # 如果生成的 token 是 EOS（结束符），则停止生成过程           
            output_tokens.append(next_token) # 将生成的 tokens 添加到 output_tokens 列表
    # 将输出 tokens 转换回文本字符串
    output_str = " ".join([corpus.idx2word[token] for token in output_tokens])
    return output_str
input_str = ["Python"] # 输入一个词：Python
generated_text = generate_text(model, input_str) # 模型跟着这个词生成后续文本
print(" 生成的文本 :", generated_text) # 打印预测文本

torch.Size([1, 1])
torch.Size([3, 16, 133])
tensor([4])
torch.Size([1, 2])
torch.Size([3, 16, 133])
tensor([5])
torch.Size([1, 3])
torch.Size([3, 16, 133])
tensor([6])
torch.Size([1, 4])
torch.Size([3, 16, 133])
tensor([7])
torch.Size([1, 5])
torch.Size([3, 16, 133])
tensor([8])
torch.Size([1, 6])
torch.Size([3, 16, 133])
tensor([2])
 生成的文本 : Python is a popular programming language.
