### GPT-1

![image](../data/image/GPT-1.png)

In [1]:
import torch
import torch.nn as nn
import math
from collections import Counter
import torch.optim as optim

### 注意力机制 & 多头注意力计算
![image](../data/image/gpt-Attention.jpg)

In [2]:
def attention(query, key, value, mask):
    '''
    计算注意力
    query, key, value = [batch.head,seq_len,d_k]
    '''
    d_k = query.size(-1)
    # (Q K^T)/srqt(d)
    # [batch,head,seq_len,d_k]*[batch,head,d_k, seq_len] = [batch,head,seq_len, seq_len]
    score = torch.matmul(query, key.transpose(-1,-2)) / math.sqrt(d_k)
    # masked_fill返回修改后的张量，masked_fill_原张量会被修改
    # [batch,head,seq_len, seq_len]
    score = score.masked_fill(mask, -1e9)
    # [batch,head,seq_len, seq_len]
    score = torch.softmax(score, dim=-1)
    return torch.matmul(score, value)

In [3]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, head):
        '''
        d_model:Embedding的特征维度
        head:多头注意力中的头数
        '''
        super(MultiHeadAttention, self).__init__()
        self.head = head
        self.d_k = d_model // head
        self.queryLinear = nn.Linear(d_model, d_model)
        self.keyLinear = nn.Linear(d_model, d_model)
        self.valueLinear = nn.Linear(d_model, d_model)
        
        self.OutLinear = nn.Linear(d_model, d_model)
        self.Layer_norm = nn.LayerNorm(normalized_shape=d_model)
    
    def forward(self, query, key, value, mask):
        '''
        query,key,value:[batch, seq_len, d_model]
        mask:mask层
        '''
        batch = query.size(0)
        query_clone = query.clone()
        # [batch, seq_len, d_model] -> [batch,seq_len,head,d_k] -> [batch,head,seq_len,d_k]
        query = self.queryLinear(query).view(batch, -1, self.head, self.d_k).transpose(1,2)
        key = self.keyLinear(key).view(batch, -1, self.head, self.d_k).transpose(1,2)
        value = self.valueLinear(value).view(batch, -1, self.head, self.d_k).transpose(1,2)
        # [bacth,head,seq_len,d_k]
        score = attention(query, key, value, mask)
        score = score.transpose(1,2).contiguous().view(batch,-1, self.head * self.d_k)
        Atten = self.OutLinear(score)
        return self.Layer_norm(Atten + query_clone)

### FFN层 & 残差 归一化
![image](../data/image/GPT-FFN.png)

In [4]:
class FeedForwardNetwork(nn.Module):
    def __init__(self, d_embedding, d_ff):
        '''
        d_embedding:embedding特征大小
        d_ff:线性层升高维度
        '''
        super(FeedForwardNetwork, self).__init__()
        self.FFN = nn.Sequential(
            nn.Linear(d_embedding, d_ff),
            nn.ReLU(),
            nn.Linear(d_ff, d_embedding)
        )
        self.Layer_norm = nn.LayerNorm(normalized_shape=d_embedding)
    
    def forward(self, x):
        #残差网络
        x_clone = x.clone()
        ffn = self.FFN(x)
        return self.Layer_norm(x_clone+ffn)
    

### GPT Decoder
在GPT中只有解码器部分,采用的mask是后续掩码

![image](../data/image/GPT-DecoderLayer.png)



In [5]:
class GPT_Decoder_Layer(nn.Module):
    def __init__(self, d_embedding, head, d_ff):
        '''
        d_embedding:embedding的维度
        head:多头注意力的头数
        d_ff:前馈网络中，升高的维度
        '''
        super(GPT_Decoder_Layer, self).__init__()
        self.multihead = MultiHeadAttention(d_embedding, head)
        self.ffn = FeedForwardNetwork(d_embedding, d_ff)
    
    def forward(self, x, mask):
        atten = self.multihead(x, x, x, mask)
        ffn = self.ffn(atten)
        return ffn

In [6]:
class GPT_Decoder(nn.Module):
    def __init__(self, n_layers, d_embedding, head, d_ff):
        '''
        n_layers:GPT中的解码器层数
        d_embedding:embedding的维度
        head:多头注意力的头数
        d_ff:前馈网络中，升高的维度
        '''
        super(GPT_Decoder, self).__init__()
        self.layers = nn.ModuleList(
            [
              GPT_Decoder_Layer(d_embedding, head, d_ff) for _ in range(n_layers)
            ]
        )
    
    def forward(self, src, src_mask):
        for layer in self.layers:
            src = layer(src, src_mask)
        return src

### GPT_1 网络

![image](../data/image/GPT_1.png)

In [35]:
class GPT_1(nn.Module):
    def __init__(self, n_layers ,vocab_size, d_embedding, seq_len):
        '''
        n_layers:解码器的层数
        vocab_size:数据集是总词语个数
        d_embedding:embedding的特征长度
        seq_len:一个序列的最大长度
        '''
        super(GPT_1, self).__init__()
        # 计算词向量 [batch, seq_len] - > [batch, seq_len, d_model]
        self.src_emb = nn.Embedding(vocab_size, d_embedding)
        self.pos_emb = nn.Embedding(seq_len, d_embedding)
        self.decoder = GPT_Decoder(n_layers, d_embedding, 4, 1024)
        self.projection = nn.Linear(d_embedding, vocab_size)
    
    def forward(self, x, device):
        '''
        x = [batch,seq_len]
        '''
        position = torch.arange(x.size(0), device=device).unsqueeze(-1)
        inputs_embedding = self.src_emb(x) + self.pos_emb(position)
        # [batch.seq_len, d_model]
        # print(inputs_embedding.shape)
        attn_mask = create_subsequent_mask(inputs_embedding.size(1)).to(device)
        dec_outputs = self.decoder(inputs_embedding, attn_mask)
        # 传递给全连接层以生成预测
        logits = self.projection(dec_outputs)
        return logits   
        
        

In [36]:
def create_padding_mask(input_ids, padding_token_id):
    '''
    计算PAD变量
    data:[batch, seq_len]
    return:[batch, 1, seq_len] -> [batch, head, seq_len, seq_len]
    '''
    padding_mask = (input_ids == padding_token_id)
    return padding_mask

In [37]:
def create_subsequent_mask(seq_length):
    '''
    seq_length:词的长度
    rerturn [seq_length, seq_length]
    '''
    # 创建后续掩码，上三角矩阵，保留对角线及以下元素，其余置为True
    subsequent_mask = torch.triu(torch.ones(seq_length, seq_length), diagonal=1).bool()
    
    return subsequent_mask

In [38]:
def create_padding_subsequent_masks(input_ids, padding_token_id):
    '''
    input:
        input_ids:[batch, seq_length]
        padding_token_id:pad的tokenid
    return:
        batch, 1, seq_length, seq_length
     pad的列是true时，意味着任何词对pad的注意力都是0 ,但是pad本身对其他词的注意力并不为0，所以pad行不是true
    '''
    seq_length = input_ids.size(1)
    # return [batch, 1, seq_length]
    padding_mask = create_padding_mask(input_ids, padding_token_id)
    # rerturn [seq_length, seq_length]
    subsequent_mask = create_subsequent_mask(seq_length)
    combined_mask = padding_mask.unsqueeze(1) | subsequent_mask
    return combined_mask

In [39]:
class LanguageCorpus:
    def __init__(self, sentences):
        self.sentences = sentences
        # 计算语言的最大句子长度，并加 2 以容纳特殊符号 <sos> 和 <eos>
        self.seq_len = max([len(sentence.split()) for sentence in sentences]) + 2
        self.word2idx = self.create_vocabulary() # 创建源语言和目标语言的词汇表
        self.idx2word = {v: k for k, v in self.word2idx.items()} # 创建索引到单词的映射
        # 词汇表大小
        self.vocab_size = len(self.word2idx)
        
    def create_vocabulary(self):
        word2idx = {'<pad>': 0, '<sos>': 1, '<eos>': 2}
        counter = Counter()
        # 统计语料库的单词频率
        for sentence in self.sentences:
            words = sentence.split()
            counter.update(words)
        # 创建词汇表，并为每个单词分配一个唯一的索引
        for word in counter:
            if word not in word2idx:
                word2idx[word] = len(word2idx)
        return word2idx
    
    def make_batch(self, batch_size, test_batch=False):
        input_batch, output_batch = [], [] # 初始化批数据
        sentence_indices = torch.randperm(len(self.sentences))[:batch_size] # 随机选择句子索引
        for index in sentence_indices:
            sentence = self.sentences[index]
            # 将句子转换为索引序列
            seq = [self.word2idx['<sos>']] + [self.word2idx[word] for word in sentence.split()] + [self.word2idx['<eos>']]
            seq += [self.word2idx['<pad>']] * (self.seq_len - len(seq)) # 对序列进行填充
            # 将处理好的序列添加到批次中
            input_batch.append(seq[:-1])
            output_batch.append(seq[1:])
        return torch.LongTensor(input_batch), torch.LongTensor(output_batch)

In [40]:
with open('../data/traindata/lang.txt', 'r') as file:
    sentences = [line.strip() for line in file.readlines()]
corpus = LanguageCorpus(sentences)

# vocab_size = len(corpus.vocab) # 词汇表大小
# max_seq_len = corpus.seq_len # 最大句子长度（用于设置位置编码）

In [41]:
device = "cuda" if torch.cuda.is_available() else "cpu" # 设置设备

In [42]:
model = GPT_1(6,corpus.vocab_size, 512, corpus.seq_len).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)
epochs = 1500
batch_size = 8

In [45]:
model.train()
for epoch in range(epochs):
    optimizer.zero_grad() #梯度清零
    inputs, targets = corpus.make_batch(batch_size)
    inputs, targets = inputs.to(device), targets.to(device)
    outputs = model(inputs, device)
    loss = criterion(outputs.view(-1, corpus.vocab_size), targets.view(-1))
    if (epoch + 1) % 100 == 0: # 打印损失
        print(f"Epoch: {epoch + 1:04d} cost = {loss:.6f}")
    loss.backward() # 反向传播
    optimizer.step() # 更新参数

Epoch: 0100 cost = 0.272002
Epoch: 0200 cost = 0.204050
Epoch: 0300 cost = 0.214156
Epoch: 0400 cost = 0.213290
Epoch: 0500 cost = 0.185977
Epoch: 0600 cost = 0.211682
Epoch: 0700 cost = 0.188645
Epoch: 0800 cost = 0.193570
Epoch: 0900 cost = 0.198230
Epoch: 1000 cost = 0.197337
Epoch: 1100 cost = 0.187307
Epoch: 1200 cost = 0.199976
Epoch: 1300 cost = 0.205316
Epoch: 1400 cost = 0.185375
Epoch: 1500 cost = 0.191484


In [60]:
# 测试文本生成
def generate_text(model, input_str, max_len=50):
    model.eval()  # 将模型设置为评估（测试）模式，关闭 dropout 和 batch normalization 等训练相关的层
    # 将输入字符串中的每个 token 转换为其在词汇表中的索引
    input_tokens = [corpus.word2idx[token] for token in input_str]
    # 创建一个新列表，将输入的 tokens 复制到输出 tokens 中 , 目前只有输入的词
    output_tokens = input_tokens.copy()
    with torch.no_grad():  # 禁用梯度计算，以节省内存并加速测试过程
        for _ in range(max_len):  # 生成最多 max_len 个 tokens
            # 将输出的 token 转换为 PyTorch 张量，并增加一个代表批次的维度 [1, len(output_tokens)]
            inputs = torch.LongTensor(output_tokens).unsqueeze(0).to(device)
            outputs = model(inputs, device) # 输出 logits 形状为 [1, len(output_tokens), vocab_size]
            # 在最后一个维度上获取 logits 中的最大值，并返回其索引（即下一个 token）
            _, next_token = torch.max(outputs[:, -1, :], dim=-1)            
            next_token = next_token.item() # 将张量转换为 Python 整数            
            if next_token == corpus.word2idx["<eos>"]:
                break # 如果生成的 token 是 EOS（结束符），则停止生成过程           
            output_tokens.append(next_token) # 将生成的 tokens 添加到 output_tokens 列表
            
    # 将输出 tokens 转换回文本字符串
    output_str = " ".join([corpus.idx2word[token] for token in output_tokens])
    return output_str

In [61]:
input_str = ["Python",'libraries'] # 输入一个词：Python
generated_text = generate_text(model, input_str) # 模型跟着这个词生成后续文本
print(" 生成的文本 :", generated_text) # 打印预测文本

 生成的文本 : Python libraries like Pandas and Matplotlib.
