In [1]:
import numpy as np
import torch
import torch.nn as nn
from torch.cuda import device

d_k = 64 # K（=Q）维度
d_v = 64 # V维度
# 定义缩放点积注意力类
class ScaledDotProductAttention(nn.Module):
    def __init__(self):
        super(ScaledDotProductAttention, self).__init__()
    def forward(self, Q, K, V, attn_mask):
        #----------------------维度信息------------------------
        # Q、K、V [batch_size, n_heads, len_q/k/v, dim_k/v] (dim_q=dim_k)
        # attn_mask [batch_size, n_heads, len_q, len_k]
        #-------------------------------------------------------
        # 计算注意力分数（原始权重）[batch_size, n_heads, len_q, dim_q] * [batch_size, n_heads, dim_k, len_q] = [batch_size, n_heads, len_q, len_k]
        scores = torch.matmul(Q, K.transpose(-1, -2)) / np.sqrt(d_k)
        #----------------------维度信息------------------------
        # scores [batch_size, n_heads, len_q, len_k]
        #-------------------------------------------------------
        # 使用注意力掩码，将attn_mask中值为1的位置的权重替换为极小值
        #----------------------维度信息------------------------
        # attn_mask [batch_size, n_heads, len_q, len_k]
        #-------------------------------------------------------
        scores.masked_fill_(attn_mask, -1e9)
        # 用softmax函数对scores进行归一化，得到注意力权重
        weights = nn.Softmax(dim=-1)(scores)  # 创建了一个指定最后一个维度的softmax层，并对scores进行softmax操作
        #----------------------维度信息------------------------
        # weights [batch_size, n_heads, len_q, len_k]
        #-------------------------------------------------------
        # 计算上下文向量（也就是注意力值），是上下文信息的紧凑表示 [batch_size, n_heads, len_q, len_k] * [batch_size, n_heads, len_k, dim_v] = [batch_size, n_heads, len_q, dim_v]
        context = torch.matmul(weights, V)
        #----------------------维度信息------------------------
        # context [batch_size, n_heads, len_q, dim_v]
        #-------------------------------------------------------
        return context, weights # 返回上下文信息和注意力权重

In [2]:
# 定义多头注意力类
d_embedding = 512 # 词嵌入维度
n_heads = 8 # 多头注意力个数
batch_size = 3 # 批次大小
class MultiHeadAttention(nn.Module):
    def __init__(self):
        super(MultiHeadAttention, self).__init__()
        # 确保d_model能被n_heads整除
        assert d_embedding % n_heads == 0
        self.W_Q = nn.Linear(d_embedding, d_k * n_heads)  # Q的线性变换层
        self.W_K = nn.Linear(d_embedding, d_k * n_heads)  # K的线性变换层
        self.W_V = nn.Linear(d_embedding, d_v * n_heads)  # V的线性变换层
        self.linear = nn.Linear(d_v * n_heads, d_embedding)  # 最后的线性变换层
        self.layer_norm = nn.LayerNorm(d_embedding)  # Layer Norm层
    def forward(self, Q, K, V, attn_mask):
        #----------------------维度信息------------------------
        # Q、K、V [batch_size, len_q/k/v, embedding_dim]
        #-------------------------------------------------------
        residual, batch_size = Q, Q.size(0) # 残差连接
        # 将输入进行线性变换和重塑，以便后续处理，[batch_size, len_q/k/v, embedding_dim] -> [batch_size, n_heads, len_q/k/v, d_k/d_v]
        q_s = self.W_Q(Q).view(batch_size, -1, n_heads, d_k).transpose(1, 2)  # Q [batch_size, n_heads, len_q, d_k]
        k_s = self.W_K(K).view(batch_size, -1, n_heads, d_k).transpose(1, 2)  # K [batch_size, n_heads, len_k, d_k]
        v_s = self.W_V(V).view(batch_size, -1, n_heads, d_v).transpose(1, 2)  # V [batch_size, n_heads, len_v, d_v]
        # 将注意力掩码复制多到头 attn_mask [batch_size, 1, len_q, len_k] -> [batch_size, n_heads, len_q, len_k]
        attn_mask = attn_mask.unsqueeze(1).repeat(1, n_heads, 1, 1)
        #----------------------维度信息------------------------
        # attn_mask [batch_size, n_heads, len_q, len_k]
        #-------------------------------------------------------
        # 使用缩放点积注意力计算上下文和注意力权重
        context, weights = ScaledDotProductAttention()(q_s, k_s, v_s, attn_mask)
        #----------------------维度信息------------------------
        # context [batch_size, n_heads, len_q, d_v]
        # weights [batch_size, n_heads, len_q, len_k]
        #-------------------------------------------------------
        # 通过调整维度将多个头的上下文向量连接在一起，[batch_size, n_heads, len_q, d_v] -> [batch_size, len_q, n_heads * d_v]
        context = context.transpose(1, 2).contiguous().view(batch_size, -1, n_heads * d_v)
        #----------------------维度信息------------------------
        # context [batch_size, len_q, n_heads * d_v]
        #-------------------------------------------------------
        # 用一个线性层把连接后的在多头注意力结果转换，原始地嵌入维度，[batch_size, len_q, n_heads * d_v] -> [batch_size, len_q, embedding_dim]
        output = self.linear(context)
        #----------------------维度信息------------------------
        # output [batch_size, len_q, embedding_dim]
        #-------------------------------------------------------
        # 与输入(Q)进行残差连接，并进行层归一化后处理
        output = self.layer_norm(output + residual)
        #----------------------维度信息------------------------
        # output [batch_size, len_q, embedding_dim]
        #-------------------------------------------------------
        return output, weights  # 返回层归一化的输出和和注意力权重

In [3]:
# 定义逐位置前馈网络
class PoswiseFeedForwardNet(nn.Module):
    def __init__(self, d_ff=2048):
        super(PoswiseFeedForwardNet, self).__init__()
        # 定义一维卷积层1，用于将输入映射到更高维度
        self.conv1 = nn.Conv1d(in_channels=d_embedding, out_channels=d_ff, kernel_size=1)
        # 定义一维卷积层2，用于将映射后的向量映射回原始维度
        self.conv2 = nn.Conv1d(in_channels=d_ff, out_channels=d_embedding, kernel_size=1)
        # 定义层归一化
        self.layer_norm = nn.LayerNorm(d_embedding)
    def forward(self, inputs):
        #----------------------维度信息------------------------
        # inputs [batch_size, len_q, embedding_dim]
        #-------------------------------------------------------
        residual = inputs  # 保留残差连接
        # 在第一个卷积层1后使用ReLU函数，[batch_size, len_q, embedding_dim] -> [batch_size, embedding_dim, len_q] -> [batch_size, len_q, d_ff]
        output = nn.ReLU()(self.conv1(inputs.transpose(1, 2)))
        #----------------------维度信息------------------------
        # output [batch_size, d_ff, len_q]
        #-------------------------------------------------------
        # 使用卷积2进行降维
        output = self.conv2(output).transpose(1, 2)
        #----------------------维度信息------------------------
        # output [batch_size, len_q, embedding_dim]
        #-------------------------------------------------------
        # 与输入进行残差连接，并进行层归一化
        output = self.layer_norm(output + residual)
        #----------------------维度信息------------------------
        # output [batch_size, len_q, embedding_dim]
        #-------------------------------------------------------
        return output  # 返回层归一化的输出

In [5]:
# 定义填充注意力掩码函数
def get_attn_pad_mask(seq_q, seq_k):
    #----------------------维度信息------------------------
    # seq_q的维度是[batch_size, len_q]
    # seq_k的维度是[batch_size, len_k]
    #-------------------------------------------------------
    batch_size, len_q = seq_q.size()
    batch_size, len_k = seq_k.size()
    # 生成布尔类型张量
    pad_attn_mask = seq_k.data.eq(0).unsqueeze(1)  # <PAD>token的编码值为0
    #----------------------维度信息------------------------
    # pad_attn_mask [batch_size, 1, len_k]
    #-------------------------------------------------------
    # 变形为与注意力分数相同的张量
    pad_attn_mask = pad_attn_mask.expand(batch_size, len_q, len_k)
    #----------------------维度信息------------------------
    # pad_attn_mask [batch_size, len_q, len_k]
    #-------------------------------------------------------
    return pad_attn_mask  # 返回注意力掩码张量

In [6]:
# 生成后续注意力掩码的函数，用于在多头自注意力计算中忽略未来信息
def get_attn_subsequence_mask(seq):
    #----------------------维度信息------------------------
    # seq的维度是[batch_size, seq_len(Q)=seq_len(K)]
    #-------------------------------------------------------
    # 获取输入序列的形状
    attn_shape = [seq.size(0), seq.size(1), seq.size(1)]
    #----------------------维度信息------------------------
    # attn_shape [batch_size, seq_len, seq_len]
    #-------------------------------------------------------
    # 生成一个下三角矩阵
    subsequent_mask = np.triu(np.ones(attn_shape), k=1)
    #----------------------维度信息------------------------
    # subsequent_mask [batch_size, seq_len, seq_len]
    #-------------------------------------------------------
    # 将numpy数组转换为Tensor，并将其转换为布尔类型
    subsequent_mask = torch.from_numpy(subsequent_mask).bool()
    #----------------------维度信息------------------------
    # subsequent_mask [batch_size, seq_len, seq_len]
    #-------------------------------------------------------
    return subsequent_mask  # 返回后续注意力掩码张量

In [8]:
# 定义解码器层类
class DecoderLayer(nn.Module):
    def __init__(self):
        super(DecoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention()  # 多头自注意力
        self.feed_forward = PoswiseFeedForwardNet()  # 逐位置前馈网络
        self.norm1 = nn.LayerNorm(d_embedding)  # 第一个层归一化
        self.norm2 = nn.LayerNorm(d_embedding)  # 第二个层归一化
    def forward(self, dec_inputs, self_attn_mask=None):
        # 使用多头注意力处理输入
        dec_outputs, _ = self.self_attn(dec_inputs, dec_inputs, dec_inputs, self_attn_mask)
        # 将注意力输出与输入相加并进行第一个层归一化
        norm1_outputs = self.norm1(dec_outputs + dec_inputs)
        # 将第一个层归一化的输出输入到逐位置前馈网络
        ff_outputs = self.feed_forward(norm1_outputs)
        # 将前馈网络的输出与第一个层归一化的输出相加并进行第二个层归一化
        dec_outputs = self.norm2(ff_outputs + norm1_outputs)
        return dec_outputs  # 返回解码器层的输出

In [10]:
# 定义解码器类
n_layers = 6 # 解码器层数
class Decoder(nn.Module):
    def __init__(self, vocab_size, max_seq_len):
        super(Decoder, self).__init__()
        # 词嵌入层（参数为字典维度）
        self.src_emb = nn.Embedding(vocab_size, d_embedding)
        # 位置编码层 （参数为最大序列长度）
        self.pos_emb = nn.Embedding(max_seq_len, d_embedding)
        # 初始化N个解码器层
        self.layers = nn.ModuleList([DecoderLayer() for _ in range(n_layers)])
    def forward(self, dec_inputs):
        # 创建位置信息
        positions = torch.arange(len(dec_inputs), device=dec_inputs.device).unsqueeze(-1)  # [seq_len, 1]
        # 将输入序列的词嵌入和位置编码相加
        inputs_embedding = self.src_emb(dec_inputs) + self.pos_emb(positions)
        # 生成解码器自注意力掩码
        attn_mask = get_attn_subsequence_mask(inputs_embedding).to(device)
        # 初始化解码器输入，这是第一个解码器层的输入
        dec_outputs = inputs_embedding
        for layer in self.layers:
            # 逐层调用解码器层
            dec_outputs = layer(dec_outputs, attn_mask)
        return dec_outputs  # 返回解码器的输出

In [11]:
# 定义GPT模型
class GPT(nn.Module):
    def __init__(self, vocab_size, max_seq_len):
        super(GPT, self).__init__()
        self.decoder = Decoder(vocab_size, max_seq_len)  # 解码器，用于学习文本生成能力
        self.projection = nn.Linear(d_embedding, vocab_size, bias=False)  # 全连接层，输出预测成果
    def forward(self, dec_inputs):
        # 通过解码器获取输出
        dec_outputs = self.decoder(dec_inputs)
        # 通过全连接层获取预测结果
        dec_logits = self.projection(dec_outputs)
        return dec_logits

In [13]:
# 构建语料库
from collections import Counter
class LanguageCorpus:
    def __init__(self, sentences):
        self.sentences = sentences
        # 计算语言的最大句子长度，并加2以容纳特殊符号<sos>和<eos>
        self.seq_len = max([len(sentences.split()) for sentences in sentences]) + 2  # seq_len是最长句子长度+2，为了容纳<sos>和<eos>
        self.vocab = self.create_vocabulary()  # 创建源语言和目标语言的词汇表
        self.idx2word = {v: k for k, v in self.vocab.items()}  # 索引到单词的映射
    def create_vocabulary(self):
        vocab = {'<pad>': 0, '<sos>': 1, '<eos>': 2}
        counter = Counter()
        # 统计语料库的代码频率
        for sentence in self.sentences:
            counter.update(sentence.split())
        # 创建词汇表，并为每个单词分配一个唯一的索引
        for word in counter:
            if word not in vocab:
                vocab[word] = len(vocab)
        return vocab
    def make_batch(self, batch_size, test_batch=False):
        input_batch, out_batch = [], []  # 输入和输出批次
        sentence_indices = torch.randperm(len(self.sentences))[:batch_size]  # 生成随即索引，个数为batch_size，最大为len(self.sentences)
        for idx in sentence_indices:
            sentence = self.sentences[idx]
            # 将句子转化为索引序列
            seq = [self.vocab['<sos>']] + [self.vocab[word] for word in sentence.split()] + [self.vocab['<eos>']]
            seq += [self.vocab['<pad>']] * (self.seq_len - len(seq))  # 填充序列
            # 将处理好的序列添加到批次中
            input_batch.append(seq[:-1])  # 输入序列不包括<eos>
            out_batch.append(seq[1:])  # 输出序列不包括<sos>
        return torch.LongTensor(input_batch), torch.LongTensor(out_batch)

In [31]:
# 显式语料库信息
with open("lang.txt", "r") as files:  # 从文件中读入语料
    sentences = [line.strip() for line in files]
corpus = LanguageCorpus(sentences)  # 创建语料库对象
vocab_size = len(corpus.vocab)  # 词汇表大小
max_seq_len = corpus.seq_len  # 最大句子长度（用于设置位置编码）
print(f"语料库词汇表大小： {vocab_size}")  # 打印词汇表大小
print(f"最大句子长度： {max_seq_len}")  # 打印最大句子长度

语料库词汇表大小： 133
最大句子长度： 17


In [19]:
import torch.optim as optim
device = "cuda" if torch.cuda.is_available() else "cpu"  # 设置设备
model = GPT(vocab_size, max_seq_len).to(device)  # 创建GPT模型
criterion = nn.CrossEntropyLoss()  # 交叉熵损失函数
optimizer = optim.Adam(model.parameters(), lr=0.0001)  # Adam优化器
epochs = 500
for epoch in range(epochs):
    optimizer.zero_grad()  # 梯度清零
    input_batch, target_batch = corpus.make_batch(batch_size)  # 生成输入和输出批次
    input_batch, target_batch = input_batch.to(device), target_batch.to(device)  # 数据移动到设备
    outputs = model(input_batch)  # 前向传播
    loss = criterion(outputs.view(-1, vocab_size), target_batch.view(-1))  # 计算损失
    if (epoch + 1) % 100 == 0:
        print(f"Epoch [{epoch+1}/{epochs}], Loss: {loss.item()}")
    loss.backward()  # 反向传播
    optimizer.step()  # 更新参数

Epoch [100/500], Loss: 0.5027474761009216
Epoch [200/500], Loss: 0.25911128520965576
Epoch [300/500], Loss: 0.2140609622001648
Epoch [400/500], Loss: 0.2459731101989746
Epoch [500/500], Loss: 0.2189064770936966


In [21]:
# 测试文本生成
def generate_text(model, input_str, max_len=50):
    model.eval()  # 设置为评估模式，关闭dropout和batch normalization等训练相关层
    # 将输入字符串中的每个token转换为其在词汇表中的索引
    input_tokens = [corpus.vocab[token] for token in input_str]
    # 创建一个新列表，将输入的token复制到输出tokens中，目前只需要输入词
    output_tokens = input_tokens.copy()
    with torch.no_grad():  # 禁用梯度计算，以节省内存并加快计算速度
        for _ in range(max_len):
            # 将输出的token转换为张量，并添加一个批次维度 [1, len(output_tokens)]
            inputs = torch.LongTensor(output_tokens).unsqueeze(0).to(device)
            outputs = model(inputs)  # 输出logits形状为 [1, len(output_tokens), vocab_size]
            # 在最后一个维度上获取logits中的最大值，并返回索引（即下一个token）
            next_token = outputs.argmax(-1)[:, -1].item()
            if next_token == corpus.vocab['<eos>']:
                break  # 如果生成的token是<eos>，则停止生成
            output_tokens.append(next_token)  # 将生成的token添加到输出tokens中
    # 将输出tokens转换回字符串
    output_str = ' '.join([corpus.idx2word[token] for token in output_tokens])
    return output_str

input_str = ["Python"]
generated_text = generate_text(model, input_str)  # 生成文本
print(f"输入: {input_str[0]}")
print(f"生成: {generated_text}")

输入: Python
生成: Python is a popular programming language.
