In [2]:
import math
import torch
import numpy as np
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as Data

In [21]:
# S: 表示解码输入开始的符号
# E: 表示解码输出开始的符号
# P: 如果当前批次数据长度小于时间步长时，用来填充空白序列的符号
sentences = [
        # enc_input           dec_input         dec_output
        ['ich mochte ein bier P', 'S i want a beer .', 'i want a beer . E'],
        ['ich mochte ein cola P', 'S i want a coke .', 'i want a coke . E']
]

# 填充应为零
src_vocab = {'P' : 0, 'ich' : 1, 'mochte' : 2, 'ein' : 3, 'bier' : 4, 'cola' : 5}
src_vocab_size = len(src_vocab)

# 填充应为零
src_vocab = {'P' : 0, 'ich' : 1, 'mochte' : 2, 'ein' : 3, 'bier' : 4, 'cola' : 5}
src_vocab_size = len(src_vocab)

tgt_vocab = {'P' : 0, 'i' : 1, 'want' : 2, 'a' : 3, 'beer' : 4, 'coke' : 5, 'S' : 6, 'E' : 7, '.' : 8}
idx2word = {i: w for i, w in enumerate(tgt_vocab)}
tgt_vocab_size = len(tgt_vocab)




In [34]:
a = nn.Embedding(src_vocab_size, 512)

In [3]:
src_len = 5 # 编码输入的最大序列长度
tgt_len = 6 # 解码输入(=解码输出)的最大序列长度

# Transformer 参数
d_model = 512  # 嵌入大小
d_ff = 2048 # 前馈网络维度
d_k = d_v = 64  # K(=Q), V的维度
n_layers = 6  # 编码器或解码器的层数
n_heads = 8  # 多头注意力中的头数

In [22]:
def make_data(sentences):
    enc_inputs, dec_inputs, dec_outputs = [], [], []
    for i in range(len(sentences)):
        enc_input = [[src_vocab[n] for n in sentences[i][0].split()]]
        dec_input =  [[tgt_vocab[n] for n in sentences[i][1].split()]]
        dec_output = [[tgt_vocab[n] for n in sentences[i][2].split()]]
        
        enc_inputs.append(enc_input)
        dec_inputs.append(dec_input)
        dec_outputs.append(dec_output)
        
    return torch.LongTensor(enc_inputs), torch.LongTensor(dec_inputs), torch.LongTensor(dec_inputs)

In [24]:
enc_inputs, dec_inputs, dec_outputs  = make_data(sentences)

In [25]:
class MyDataSet(Data.Dataset):
    def __init__(self, enc_inputs, dec_inputs, dec_outputs):
        self.enc_inputs = enc_inputs
        self.dec_inputs = dec_inputs
        self.dec_outputs =dec_outputs
        
    def __len__(self):
        return self.enc_inputs.shape[0]
    
    def __getitem__(self, idx):
        return self.enc_inputs[idx], self.dec_inputs[idx], self.dec_outputs[idx]
loader = Data.DataLoader(MyDataSet(enc_inputs, dec_inputs, dec_outputs ), batch_size=2, shuffle=True)



In [26]:
class PositionalEncoding(nn.Module):
    '''
    Positional Encoding类用于为输入序列添加位置编码。
    '''
    def __init__(self, d_model, dropout=0, max_len=5000):
        '''
        初始化Positional Encoding对象。

        参数:
            d_model (int): 嵌入向量的维度大小。
            dropout (float): dropout概率，默认值为0。
            max_len (int): 序列的最大长度，默认值为5000。
        '''
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        # 创建一个形状为[max_len, d_model]的零张量
        pe = torch.zeros(max_len, d_model)
        # 创建一个表示位置的张量，形状为[max_len, 1]
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        # 创建一个除数项，用于计算正弦和余弦值，形状为[d_model/2]
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        # 计算位置编码的正弦部分并填充到pe张量中
        pe[:, 0::2] = torch.sin(position * div_term)
        # 计算位置编码的余弦部分并填充到pe张量中
        pe[:, 1::2] = torch.cos(position * div_term)
        # 在第0维度上添加一个维度，并将张量转置，形状变为[1, max_len, d_model]
        pe = pe.unsqueeze(0).transpose(0, 1)
        # 将位置编码张量注册为模型的缓冲区，这样在模型保存和加载时，位置编码张量也会被保存和加载
        self.register_buffer('pe', pe)

    def forward(self, x):
        '''
        前向传播函数，用于将位置编码添加到输入序列的嵌入向量中。

        参数:
            x (Tensor): 输入序列的嵌入向量，维度为[seq_len, batch_size, d_model]。

        返回:
            Tensor: 添加了位置编码的嵌入向量。
        '''
        # 将位置编码张量添加到输入序列的嵌入向量中，并返回结果
        x = x + self.pe[:x.size(0), :]
        # 对结果进行dropout操作
        return self.dropout(x)


In [None]:
a = nn.Embedding(src_vocab_size, 512)

In [27]:
def get_attn_pad_mask(seq_q, seq_k):
    '''
    获取填充掩码的函数,填充掩码的作用是将用于处理用于填充序列长度的P

    参数:
        seq_q (Tensor): 查询序列，维度为[batch_size, seq_len]。
        seq_k (Tensor): 键序列，维度为[batch_size, seq_len]。
    
    返回:
        Tensor: 填充掩码，维度为[batch_size, len_q, len_k]。
    '''
    # 获取查询序列和键序列的形状
    batch_size, len_q =seq_q.size()
    batch_size, len_k =seq_k.size()
    
    # eq(zero) is PAD token
    # 创建填充掩码，将seq_k中的零值位置设为True，形状为[batch_size, 1, len_k]
    pad_attn_mask = seq_k.data.eq(0).unsqueeze(1)# [batch_size, 1, len_k], False is masked
    
    return pad_attn_mask.expand(batch_size, len_q, len_k)
    
    

In [32]:
a = torch.rand((10,2))
b = torch.rand((10,2))

In [34]:
a

tensor([[0.3707, 0.7805],
        [0.3071, 0.6394],
        [0.1127, 0.1164],
        [0.2482, 0.0459],
        [0.7505, 0.2949],
        [0.2671, 0.4495],
        [0.5754, 0.5310],
        [0.3152, 0.0175],
        [0.6638, 0.1781],
        [0.4984, 0.0015]])

In [38]:
def get_attn_subsequence_mask(seq):
    '''
    获取子序列掩码的函数。

    参数:
        seq (Tensor): 序列，维度为[batch_size, tgt_len]。
    
    返回:
        Tensor: 子序列掩码，维度为[batch_size, tgt_len, tgt_len]。
    '''
    # 获取输入序列的形状
    attn_shape = [seq.size(0), seq.size(1), seq.size(1)]
    # 创建一个上三角矩阵，形状为[batch_size, tgt_len, tgt_len]
    subsequence_mask = np.triu(np.ones(attn_shape), k=1)
    # 将NumPy数组转换为PyTorch张量，并设定数据类型为byte
    subsequence_mask = torch.from_numpy(subsequence_mask).byte()
    # 返回子序列掩码张量
    return subsequence_mask # [batch_size, tgt_len, tgt_len]
    
        

In [None]:
#测试子序列掩码函数
b = torch.ones((10,5))
a = get_attn_subsequence_mask(b)
a

In [44]:

torch.from_numpy(np.triu(np.ones((3,3,3)), k=1)).byte()
    

tensor([[[0, 1, 1],
         [0, 0, 1],
         [0, 0, 0]],

        [[0, 1, 1],
         [0, 0, 1],
         [0, 0, 0]],

        [[0, 1, 1],
         [0, 0, 1],
         [0, 0, 0]]], dtype=torch.uint8)

In [4]:
class ScaledDotProductAttention(nn.Module):
    '''
    缩放点积注意力机制。
    '''
    def __init__(self):
        '''
        初始化缩放点积注意力机制。
        '''
        super(ScaledDotProductAttention, self).__init__()
    def forward(self, Q,K,V, atten_mask):
        '''
        计算缩放点积注意力机制的前向传播。

        参数:
            Q (Tensor): 查询张量，形状为[batch_size, n_heads, len_q, d_k]。
            K (Tensor): 键张量，形状为[batch_size, n_heads, len_k, d_k]。
            V (Tensor): 值张量，形状为[batch_size, n_heads, len_v(=len_k), d_v]。
            attn_mask (Tensor): 注意力掩码张量，形状为[batch_size, n_heads, seq_len, seq_len]。

        返回:
            Tensor: 上下文张量，形状为[batch_size, n_heads, len_q, d_v]。
            Tensor: 注意力张量，形状为[batch_size, n_heads, len_q, len_k]。
        '''
        # 计算注意力分数，scores : [batch_size, n_heads, len_q, len_k]
        scores = torch.matmul(Q, K.transpose(-1,-2))/np.sqrt(d_k) #批次矩阵乘法,对每个批次中的对应样本执行矩阵乘法
        #对注意力分数应用注意力掩码,，scores 是一个注意力分数张量，atten_mask 是一个与 scores 具有相同形状的掩码张量。
        scores.masked_fill(atten_mask, -1e9)
        #计算注意力权重
        attn = nn.Softmax(dim=-1)(scores)
        #计算上下文权重
        context = torch.matmul(attn,V) # [batch_size, n_heads, len_q, d_v]
        return context, attn

In [14]:
Q = torch.ones(10,8,5,64)
K =torch.ones(10,8,5,64)
V =  torch.ones(10,8,5,64)
attn_mask = torch.ones(10,8,5,5)
a = ScaledDotProductAttention()
q,w = a(Q,K,V,attn_mask == 1)
q.shape

torch.Size([10, 8, 5, 64])

In [82]:
class MultiHeadAttention(nn.Module):
    '''
    多头注意力机制。
    '''
    def __init__(self):
        '''
        初始化多头注意力机制。
        '''
        super(MultiHeadAttention, self).__init__()
        #查询张量线性变换层
        self.W_Q = nn.Linear(d_model, d_k*n_heads, bias=False)
        #键张量线性变换层
        self.W_K = nn.Linear(d_model, d_k*n_heads, bias=False)
         # 值张量线性变换层
        self.W_V = nn.Linear(d_model, d_v * n_heads, bias=False)
        #全连接层
        self.fc = nn.Linear(n_heads * d_v, d_model, bias=False)
        
        
    def forward(self, input_Q, input_K, input_V, attn_mask):
        '''
        多头注意力机制的前向传播。

        参数:
            input_Q (Tensor): 查询输入张量，形状为[batch_size, len_q, d_model]。
            input_K (Tensor): 键输入张量，形状为[batch_size, len_k, d_model]。
            input_V (Tensor): 值输入张量，形状为[batch_size, len_v(=len_k), d_model]。
            attn_mask (Tensor): 注意力掩码张量，形状为[batch_size, seq_len, seq_len]。

        返回:
            Tensor: 多头注意力机制输出张量，形状为[batch_size, len_q, d_model]。
            Tensor: 注意力张量，形状为[batch_size, n_heads, len_q, len_k]。
        '''
        # 保存残差张量
        residual, batch_size = input_Q, input_Q.size(0)
        Q = self.W_Q(input_Q).view(batch_size, -1, n_heads, d_k).transpose(1,2)
        K = self.W_K(input_K).view(batch_size, -1, n_heads, d_k).transpose(1,2)  # K: [batch_size, n_heads, len_k, d_k]
        V = self.W_V(input_V).view(batch_size, -1, n_heads, d_v).transpose(1,2)  # V: [batch_size, n_heads, len_v(=len_k), d_v]
        
        #将注意力掩码张量进行扩展
        
        attn_mask = attn_mask.unsqueeze(1).repeat(1,n_heads,1,1)# attn_mask : [batch_size, n_heads, seq_len, seq_len]
        #调用缩放点积注意力机制进行计算
        context, attn = ScaledDotProductAttention()(Q, K, V, attn_mask)
        #将张量形状进行调整
        context = context.transpose(1,2).reshape(batch_size, -1, n_heads * d_v)
        #将上下文张量通过全连接层进行映射
        output = self.fc(context)  # [batch_size, len_q, d_model]
        
        #返回加上残差张量的结果，并进行LayerNorm处理
        return nn.LayerNorm(d_model)(output + residual), attn
        

In [None]:
attn_mask = torch.ones(10,5,5)
attn_mask = attn_mask.unsqueeze(1).repeat(1,n_heads,1,1)
attn_mask

In [None]:
#测试多头注意力机制
Q = torch.ones(10,5,512)
K =torch.ones(10,5,512)
V =  torch.ones(10,5,512)
attn_mask = torch.ones(10,5,5)
a = MultiHeadAttention()
q,w = a(Q,K,V,attn_mask ==1)
q.shape

In [6]:
class PoswiseFeedForwardNet(nn.Module):
    '''
    位置前馈网络。
    '''
    def __init__(self):
        super(PoswiseFeedForwardNet, self).__init__()
        #全连接层序列
        self.fc = nn.Sequential(
            nn.Linear(d_model, d_ff, bias=False),
            nn.ReLU(),
            nn.Linear(d_ff, d_model, bias=False)
        )
    def forward(self, inputs):
        '''
        位置前馈网络的前向传播。

        参数:
            inputs (Tensor): 输入张量，形状为[batch_size, seq_len, d_model]。

        返回:
            Tensor: 经过位置前馈网络后的张量，形状为[batch_size, seq_len, d_model]。
        '''
        #保存残差张量
        residual = inputs 
        #经过全连接层序列处理
        output = self.fc(inputs)
        #返回加上残差张量的结果，并进行LayerNorm处理
        return nn.LayerNorm(d_model)(output + residual) # [batch_size, seq_len, d_model]

In [36]:
#测试前馈神经网络
i = torch.ones(10,5,512)

a  = PoswiseFeedForwardNet()
a(i).shape


torch.Size([10, 5, 512])

In [83]:
class  EncoderLayer(nn.Module):
    '''
    编码器层。
    '''
    def __init__(self):
        '''
        初始化编码器层。
        '''
        super(EncoderLayer, self).__init__()
        #编码器自注意力机制
        self.enc_self_attn = MultiHeadAttention()
        #位置前馈网络
        self.pos_ffn = PoswiseFeedForwardNet()
    def forward(self, enc_inputs, enc_self_attn_mask):
        '''
        编码器层的前向传播。

        参数:
            enc_inputs (Tensor): 编码器输入张量，形状为[batch_size, src_len, d_model]。
            enc_self_attn_mask (Tensor): 编码器自注意力机制掩码张量，形状为[batch_size, src_len, src_len]。

        返回:
            Tensor: 编码器输出张量，形状为[batch_size, src_len, d_model]。
            Tensor: 注意力张量，形状为[batch_size, n_heads, src_len, src_len]。
        '''
        #使用编码器自注意力机制计算编码器输出张量和注意力张量
        #输入的分别是xQ, Xk, Xv, 掩码矩阵
        
        enc_outputs, attn = self.enc_self_attn(enc_inputs, enc_inputs, enc_inputs,enc_self_attn_mask)
        #经过位置前馈网络处理
        enc_outputs = self.pos_ffn(enc_outputs) # enc_outputs: [batch_size, src_len, d_model]
        return enc_outputs, attn
        

In [13]:
#测试编码器层
enc_i = torch.ones(10,5,512)
enc_s = torch.ones(10,5,5)
a = EncoderLayer()
b0 ,b1 = a(enc_i,enc_s==1)
b0.shape

torch.Size([10, 5, 512])

In [85]:
class DecoderLayer(nn.Module):
    '''
    解码器层。
    '''
    def __init__(self):
        '''
        初始化解码器层。
        '''
        super(DecoderLayer, self).__init__()
        # 解码器自注意力机制
        self.dec_self_attn = MultiHeadAttention()
        #编码器-解码器注意力机制
        self.dec_enc_attn = MultiHeadAttention()
        # 位置前馈网络
        self.pos_ffn = PoswiseFeedForwardNet()
    def forward(self, dec_inputs, enc_outputs, dec_self_attn_mask, dec_enc_attn_mask):
        '''
        解码器层的前向传播。

        参数:
            dec_inputs (Tensor): 解码器输入张量，形状为[batch_size, tgt_len, d_model]。
            enc_outputs (Tensor): 编码器输出张量，形状为[batch_size, src_len, d_model]。
            dec_self_attn_mask (Tensor): 解码器自注意力机制掩码张量，形状为[batch_size, tgt_len, tgt_len]。
            dec_enc_attn_mask (Tensor): 编码器-解码器注意力机制掩码张量，形状为[batch_size, tgt_len, src_len]。

        返回:
            Tensor: 解码器输出张量，形状为[batch_size, tgt_len, d_model]。
            Tensor: 解码器自注意力张量，形状为[batch_size, n_heads, tgt_len, tgt_len]。
            Tensor: 编码器-解码器注意力张量，形状为[batch_size, n_heads, tgt_len, src_len]。
        '''
        #使用解码器自注意力机制计算解码器第一层输出张量和解码器第一层自注意力张量
        
        dec_outputs, dec_self_attn = self.dec_self_attn(dec_inputs, dec_inputs, dec_inputs, dec_self_attn_mask)
        #使用编码器-解码器注意力机制计算解码器第一层输出张量编码器-解码器注意力张量（分别提供v和q,k)
        dec_outputs, dec_enc_attn = self.dec_enc_attn(dec_outputs,  enc_outputs, enc_outputs, dec_enc_attn_mask)
        #经过位置前馈网络处理
        dec_outputs = self.pos_ffn(dec_outputs)# [batch_size, tgt_len, d_model]
        return dec_outputs, dec_self_attn, dec_enc_attn
        
        

In [87]:
#测试解码器层
enc_i = torch.ones(10,5,512)
enc_s = torch.ones(10,5,5)
a = DecoderLayer()
b0 ,b1,b2 = a(enc_i,enc_i,enc_s==1,enc_s==1)
b0.shape


torch.Size([10, 5, 512])

In [30]:
class Encoder(nn.Module):
    '''
    编码器模块。
    '''
    def __init__(self):
        '''
        初始化编码器。
        '''
        super(Encoder, self).__init__()
        #源语言词嵌入层
        self.src_emb = nn.Embedding(src_vocab_size, d_model)
        #位置编码层
        self.pos_emb = PositionalEncoding(d_model)
        #编码器层列表
        self.layers = nn.ModuleList([EncoderLayer() for _ in range(n_layers)])
    def forward(self, enc_inputs):
        '''
        编码器的前向传播。

        参数:
            enc_inputs (Tensor): 编码器输入张量，形状为[batch_size, src_len]。

        返回:
            Tensor: 编码器输出张量，形状为[batch_size, src_len, d_model]。
            List: 编码器自注意力张量列表。
        '''
        #源语言词嵌入
        enc_outputs = self.src_emb(enc_inputs) # [batch_size, src_len, d_model]
        #加入位置编码
        enc_outputs = self.pos_emb(enc_outputs.transpose(0,1)).transpose(0,1)# [batch_size, src_len, d_model]
        #获取编码器自注意力掩码
        enc_self_attn_mask = get_attn_pad_mask(enc_inputs, enc_inputs) # [batch_size, src_len, src_len]
        #编码器自注意力机制
        enc_self_attns = []
        for layer in self.layers:
            #编码器层前向传播
            enc_outputs, enc_self_attn = layer(enc_outputs, enc_self_attn_mask)
            enc_self_attns.append(enc_self_attn)
        return enc_outputs, enc_self_attns

In [37]:
#编码器测试
s = torch.ones((10,5)).type(torch.LongTensor)
a = Encoder()
b0,b1 = a(s)
b0.shape

torch.Size([10, 5, 512])

In [75]:
class Decoder(nn.Module):
    '''
    解码器模块。
    '''
    def __init__(self):
        '''
        初始化解码器。
        '''
        super(Decoder, self).__init__()
        #目标语言嵌入层
        self.tgt_emb = nn.Embedding(tgt_vocab_size, d_model)
        #位置编码层
        self.pos_emb = PositionalEncoding(d_model)
        #解码器层列表
        self.layers =  nn.ModuleList([DecoderLayer() for _ in range(n_layers)])
    def forward(self, dec_inputs, enc_inputs, enc_outputs):
        '''
        解码器的前向传播。

        参数:
            dec_inputs (Tensor): 解码器输入张量，形状为[batch_size, tgt_len]。
            enc_inputs (Tensor): 编码器输入张量，形状为[batch_size, src_len]。
            enc_outputs (Tensor): 编码器输出张量，形状为[batch_size, src_len, d_model]。

        返回:
            Tensor: 解码器输出张量，形状为[batch_size, tgt_len, d_model]。
            List: 解码器自注意力张量列表。
            List: 编码器-解码器注意力张量列表。
        '''
        #目标语言词嵌入
        dec_outputs = self.tgt_emb(dec_inputs) # [batch_size, tgt_len, d_model]
        #加入位置编码
        dec_outputs = self.pos_emb(dec_outputs.transpose(0,1)).transpose(0,1) # [batch_size, tgt_len, d_model]
        #获取解码器自注意力掩码
        dec_self_attn_pad_mask = get_attn_pad_mask(dec_inputs, dec_inputs) # [batch_size, tgt_len, tgt_len]
        dec_self_attn_subsequence_mask = get_attn_subsequence_mask(dec_inputs) # [batch_size, tgt_len, tgt_len]
        dec_self_attn_mask = torch.gt((dec_self_attn_pad_mask + dec_self_attn_subsequence_mask), 0) # [batch_size, tgt_len, tgt_len]
        #获取编码器-解码器注意力掩码
        dec_enc_attn_mask = get_attn_pad_mask(dec_inputs, enc_inputs) # [batc_size, tgt_len, src_len]
        #解码器子注意力机制和编码器-解码器自注意力机制
        dec_self_attns, dec_enc_attns = [], [] 
        for layer in self.layers:
            #解码器层前向传播
            dec_outputs, dec_self_attn, dec_enc_attn = layer(dec_outputs, enc_outputs, dec_self_attn_mask, dec_enc_attn_mask)
            dec_self_attns.append(dec_self_attn)
            dec_enc_attns.append(dec_enc_attn)
        return dec_outputs, dec_self_attns, dec_enc_attns
        
        

In [88]:
#解码器层测试

s = torch.ones((10,5))
s0 = s.type(torch.LongTensor)
e = torch.ones((10,5, 512))
a = Decoder()
b0,b1,b2 = a(s0,s0,e)
b0.shape

torch.Size([10, 5, 512])

In [92]:
class Transformer(nn.Module):
    '''
    Transformer模型。
    '''
    def __init__(self):
        super(Transformer, self).__init__()
        #编码器
        self.encoder = Encoder()
        #解码器
        self.decoder = Decoder()
        #线性层
        self.projection = nn.Linear(d_model, tgt_vocab_size, bias=False)
    def forward(self, enc_inputs, dec_inputs):
        '''
        Transformer的前向传播。

        参数:
            enc_inputs (Tensor): 编码器输入张量，形状为[batch_size, src_len]。
            dec_inputs (Tensor): 解码器输入张量，形状为[batch_size, tgt_len]。

        返回:
            Tensor: 解码器输出张量，形状为[batch_size * tgt_len, tgt_vocab_size]。
            List: 编码器自注意力张量列表。
            List: 解码器自注意力张量列表。
            List: 编码器-解码器注意力张量列表。
        '''
        #编码器和解码器的前向传播
        enc_outputs, enc_self_attns = self.encoder(enc_inputs)
        dec_outputs, dec_self_attns, dec_enc_attns = self.decoder(dec_inputs, enc_inputs, enc_outputs)
        #输出投影层
        dec_logits = self.projection(dec_outputs) # dec_logits: [batch_size, tgt_len, tgt_vocab_size]
        #将解码器输出张量展平为二维张量
        return dec_logits.view(-1,  dec_logits.size(-1)), enc_self_attns, dec_self_attns, dec_enc_attns

In [93]:
#transformer测试
b = torch.ones((10,5)).type(torch.LongTensor)

a = Transformer()
b0, b1, b2,b3 = a(b,b)

b0.shape

torch.Size([50, 9])

In [94]:
# 创建Transformer模型、损失函数和优化器
model = Transformer()
criterion = nn.CrossEntropyLoss(ignore_index=0)
optimizer = optim.SGD(model.parameters(), lr=1e-3, momentum=0.99)