# 生成式预训练语言模型：理论与实战
深蓝学院 课程 
课程链接：https://www.shenlanxueyuan.com/course/620

作者 **黄佳**

In [1]:
# 构建训练句子集，每一个句子包含中文，英文（解码器输入）和翻译成英文后目标输出三个部分
sentences = [
    ['咖哥 喜欢 小冰 <pad> <pad>', '<sos> KaGe likes XiaoBing <pad>', 'KaGe likes XiaoBing <pad> <eos>'],
    ['我 爱 学习 人工智能 <pad>', '<sos> I love studying AI', 'I love studying AI <eos>'],
    ['深度 学习 改变 世界 <pad>', '<sos> Deep learning YYDS <pad>', 'Deep learning YYDS <pad> <eos>'],
    ['自然 语言 处理 很 强大', '<sos> NLP is so powerful', 'NLP is so powerful <eos>'],
    ['神经网络 非常 复杂 <pad> <pad>', '<sos> Neural Net are complex', 'Neural Net are complex <eos>']]

word_list_cn, word_list_en = [], []  # 初始化中英文单词列表
# 遍历每一个句子并将单词添加到单词列表中
for s in sentences:
    word_list_cn.extend(s[0].split())
    word_list_en.extend(s[1].split())
    word_list_en.extend(s[2].split())

# 去重得到不重复的单词列表
word_list_cn = list(set(word_list_cn))
word_list_en = list(set(word_list_en))

# 构建单词到索引的映射
word2idx_cn = {w: i for i, w in enumerate(word_list_cn)}
word2idx_en = {w: i for i, w in enumerate(word_list_en)}

# 构建索引到单词的映射
idx2word_cn = {i: w for i, w in enumerate(word_list_cn)}
idx2word_en = {i: w for i, w in enumerate(word_list_en)}

# 计算词汇表的大小
voc_size_cn = len(word_list_cn)
voc_size_en = len(word_list_en)

print("句子数量：", len(sentences)) # 打印句子数
print("中文词汇表大小：", voc_size_cn) #打印中文词汇表大小
print("英文词汇表大小：", voc_size_en) #打印英文词汇表大小
print("中文词汇到索引的字典：", word2idx_cn) # 中文词汇到索引
print("英文词汇到索引的字典：", word2idx_en) # 英文词汇到索引

句子数量： 5
中文词汇表大小： 19
英文词汇表大小： 21
中文词汇到索引的字典： {'改变': 0, '语言': 1, '小冰': 2, '处理': 3, '很': 4, '爱': 5, '我': 6, '复杂': 7, '深度': 8, '学习': 9, '世界': 10, '喜欢': 11, '<pad>': 12, '非常': 13, '自然': 14, '人工智能': 15, '咖哥': 16, '神经网络': 17, '强大': 18}
英文词汇到索引的字典： {'XiaoBing': 0, 'is': 1, 'NLP': 2, 'Net': 3, 'Deep': 4, '<sos>': 5, 'learning': 6, 'so': 7, 'are': 8, 'YYDS': 9, 'love': 10, 'AI': 11, '<eos>': 12, 'complex': 13, 'likes': 14, 'KaGe': 15, 'I': 16, 'studying': 17, 'Neural': 18, '<pad>': 19, 'powerful': 20}


In [10]:
import numpy as np # 导入numpy
import torch # 导入torch
import random # 导入random库
# 定义一个函数，随机选择一个句子和词典生成输入、输出和目标数据
def make_data(sentences):
    # 随机选择一个句子进行训练
    random_sentence = random.choice(sentences)    
    # 将输入句子中的单词转换为对应的索引
    encoder_input = np.array([[word2idx_cn[n] for n in random_sentence[0].split()]])
    # 将输出句子中的单词转换为对应的索引
    decoder_input = np.array([[word2idx_en[n] for n in random_sentence[1].split()]])
    # 将目标句子中的单词转换为对应的索引
    target = np.array([[word2idx_en[n] for n in random_sentence[2].split()]])
    # 将输入、输出和目标批次转换为LongTensor
    encoder_input = torch.LongTensor(encoder_input)
    decoder_input = torch.LongTensor(decoder_input)
    target = torch.LongTensor(target)
    return encoder_input, decoder_input, target

# 使用make_data函数生成输入、输出和目标张量
encoder_input, decoder_input, target = make_data(sentences)
for s in sentences: # 获取原始句子
    if all([word2idx_cn[w] in encoder_input[0] for w in s[0].split()]):
        original_sentence = s
        break
print("原始句子:", original_sentence) # 打印原始句子
print("编码器输入张量的形状:", encoder_input.shape)  # 打印输入张量形状
print("解码器输入张量的形状:", decoder_input.shape) # 打印输出张量形状
print("目标张量的形状:", target.shape) # 打印目标张量形状
print("编码器输入张量:", encoder_input) # 打印输入张量
print("解码器输入张量:", decoder_input) # 打印输出张量
print("目标张量:", target) # 打印目标张量

原始句子: ['自然 语言 处理 很 强大', '<sos> NLP is so powerful', 'NLP is so powerful <eos>']
编码器输入张量的形状: torch.Size([1, 5])
解码器输入张量的形状: torch.Size([1, 5])
目标张量的形状: torch.Size([1, 5])
编码器输入张量: tensor([[14,  1,  3,  4, 18]])
解码器输入张量: tensor([[ 5,  2,  1,  7, 20]])
目标张量: tensor([[ 2,  1,  7, 20, 12]])


In [9]:
def make_batch(sentences, batch_size=2):
    # 随机选择batch_size个句子进行训练
    random_sentences = random.sample(sentences, batch_size)
    
    # 初始化encoder_input, decoder_input, target的列表
    encoder_input_list, decoder_input_list, target_list = [], [], []

    # 遍历随机选择的句子
    for random_sentence in random_sentences:
        # 将输入句子中的单词转换为对应的索引
        encoder_input_list.append([word2idx_cn[n] for n in random_sentence[0].split()])
        # 将输出句子中的单词转换为对应的索引
        decoder_input_list.append([word2idx_en[n] for n in random_sentence[1].split()])
        # 将目标句子中的单词转换为对应的索引
        target_list.append([word2idx_en[n] for n in random_sentence[2].split()])

    # 将列表转换为numpy数组
    encoder_input = np.array(encoder_input_list)
    decoder_input = np.array(decoder_input_list)
    target = np.array(target_list)

    # 将输入、输出和目标批次转换为LongTensor
    encoder_input = torch.LongTensor(encoder_input)
    decoder_input = torch.LongTensor(decoder_input)
    target = torch.LongTensor(target)
    return encoder_input, decoder_input, target

In [4]:
import torch.nn as nn # 导入torch.nn库
# 定义编码器类，继承自nn.Module
class Encoder(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(Encoder, self).__init__()       
        self.hidden_size = hidden_size # 设置隐藏层大小       
        self.embedding = nn.Embedding(input_size, hidden_size) # 创建词嵌入层       
        self.rnn = nn.RNN(hidden_size, hidden_size, batch_first=True) # 创建RNN层
    # 定义前向传播函数
    def forward(self, inputs, hidden):
        embedded = self.embedding(inputs) # 将输入转换为嵌入向量       
        output, hidden = self.rnn(embedded, hidden) # 将嵌入向量输入RNN层并获取输出
        return output, hidden

# 定义解码器类，继承自nn.Module
class Decoder(nn.Module):
    def __init__(self, hidden_size, output_size):
        super(Decoder, self).__init__()       
        self.hidden_size = hidden_size # 设置隐藏层大小       
        self.embedding = nn.Embedding(output_size, hidden_size) # 创建词嵌入层
        self.rnn = nn.RNN(hidden_size, hidden_size, batch_first=True)  # 创建RNN层       
        self.out = nn.Linear(hidden_size, output_size) # 创建线性输出层
    # 定义前向传播函数
    def forward(self, inputs, hidden):       
        embedded = self.embedding(inputs) # 将输入转换为嵌入向量       
        output, hidden = self.rnn(embedded, hidden) # 将嵌入向量输入RNN层并获取输出       
        output = self.out(output) # 使用线性层生成最终输出
        return output, hidden

device = 'cuda' if torch.cuda.is_available() else 'cpu' # 设置设备，检查是否有GPU
n_hidden = 128 # 设置隐藏层数量

# 创建编码器和解码器
encoder = Encoder(voc_size_cn, n_hidden)
decoder = Decoder(n_hidden, voc_size_en)
print('编码器结构：', encoder)  # 打印编码器的结构
print('解码器结构：', decoder)  # 打印解码器的结构

编码器结构： Encoder(
  (embedding): Embedding(19, 128)
  (rnn): RNN(128, 128, batch_first=True)
)
解码器结构： Decoder(
  (embedding): Embedding(21, 128)
  (rnn): RNN(128, 128, batch_first=True)
  (out): Linear(in_features=128, out_features=21, bias=True)
)


In [5]:
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder):
        super(Seq2Seq, self).__init__()
        # 初始化编码器和解码器
        self.encoder = encoder
        self.decoder = decoder
    # 定义前向传播函数
    def forward(self, encoder_input, hidden, decoder_input):
        # 将输入序列通过编码器并获取输出和隐藏状态
        encoder_output, encoder_hidden = self.encoder(encoder_input, hidden)
        # 将编码器的隐藏状态传递给解码器作为初始隐藏状态
        decoder_hidden = encoder_hidden
        # 将目标序列通过解码器并获取输出
        decoder_output, _ = self.decoder(decoder_input, decoder_hidden)
        return decoder_output

# 创建Seq2Seq模型
model = Seq2Seq(encoder, decoder)
print('S2S模型结构：', model)  # 打印模型的结构

S2S模型结构： Seq2Seq(
  (encoder): Encoder(
    (embedding): Embedding(19, 128)
    (rnn): RNN(128, 128, batch_first=True)
  )
  (decoder): Decoder(
    (embedding): Embedding(21, 128)
    (rnn): RNN(128, 128, batch_first=True)
    (out): Linear(in_features=128, out_features=21, bias=True)
  )
)


In [6]:
# 定义训练函数
def train_seq2seq(model, criterion, optimizer, epochs):
    for epoch in range(epochs):
        encoder_input, decoder_input, target = make_batch(sentences) # 训练数据的创建
        hidden = torch.zeros(1, encoder_input.size(0), n_hidden) # 初始化隐藏状态      
        optimizer.zero_grad()# 梯度清零        
        output = model(encoder_input, hidden, decoder_input) # 获取模型输出        
        loss = criterion(output.view(-1, voc_size_en), target.view(-1)) # 计算损失        
        if (epoch + 1) % 100 == 0: # 打印损失
            print(f"Epoch: {epoch + 1:04d} cost = {loss:.6f}")         
        loss.backward()# 反向传播        
        optimizer.step()# 更新参数

# 训练模型
epochs = 1000 # 训练轮次
criterion = nn.CrossEntropyLoss(ignore_index=word2idx_en['<pad>']) # 损失函数
optimizer = torch.optim.Adam(model.parameters(), lr=0.001) # 优化器
train_seq2seq(model, criterion, optimizer, epochs) # 调用函数训练模型

Epoch: 0100 cost = 0.032659
Epoch: 0200 cost = 0.011341
Epoch: 0300 cost = 0.006474
Epoch: 0400 cost = 0.004237
Epoch: 0500 cost = 0.002764
Epoch: 0600 cost = 0.002077
Epoch: 0700 cost = 0.001579
Epoch: 0800 cost = 0.001268
Epoch: 0900 cost = 0.001135
Epoch: 1000 cost = 0.001027


In [7]:
# 定义测试函数
def test_seq2seq(model, source_sentence, word_dict, number_dict):
    # 将输入句子转换为索引
    encoder_input = np.array([[word2idx_cn[n] for n in source_sentence.split()]])
    # 构建输出句子的索引，以'<sos>'开始，后面跟'<eos>'，长度与输入句子相同
    decoder_input = np.array([word2idx_en['<sos>']] + [word_dict['<eos>']]*(len(encoder_input[0])-1))
    # 转换为LongTensor
    encoder_input = torch.LongTensor(encoder_input)
    decoder_input = torch.LongTensor(decoder_input).unsqueeze(0) # 增加一维    
    hidden = torch.zeros(1, encoder_input.size(0), n_hidden) # 初始化隐藏状态    
    predict = model(encoder_input, hidden, decoder_input) # 获取模型输出    
    predict = predict.data.max(2, keepdim=True)[1] # 获取最大概率的索引
    # 打印输入句子和预测的句子
    print(source_sentence, '->', [number_dict[n.item()] for n in predict.squeeze()])

# 测试模型
test_seq2seq(model, '咖哥 喜欢 小冰', word2idx_en, idx2word_en)  
test_seq2seq(model, '自然 语言 处理 很 强大', word2idx_en, idx2word_en)  

咖哥 喜欢 小冰 -> ['Neural', 'Net', 'so']
自然 语言 处理 很 强大 -> ['NLP', 'is', 'so', 'so', '<eos>']
