## Transformer 学习，实现

还有 pytorch 库中一些函数的用法 tips  
[transformer](https://wmathor.com/index.php/archives/1438/)  
[大佬视频讲解](https://www.bilibili.com/video/BV1mk4y1q7eK/?spm_id_from=333.999.0.0&vd_source=744197c073f4828379c29fa20f3ea477)
![](./img/figure1.png)


In [49]:
import torch.utils.data as Data
import torch
import torch.nn as nn
import math
import numpy as np

sentences = [
    # enc_input           dec_input         dec_output
    ["ich mochte ein bier P", "S i want a beer .", "i want a beer . E"],
    ["ich mochte ein cola P", "S i want a coke .", "i want a coke . E"],
]

# Padding Should be Zero
src_vocab = {"P": 0, "ich": 1, "mochte": 2, "ein": 3, "bier": 4, "cola": 5}
src_vocab_size = len(src_vocab)

tgt_vocab = {
    "P": 0,
    "i": 1,
    "want": 2,
    "a": 3,
    "beer": 4,
    "coke": 5,
    "S": 6,
    "E": 7,
    ".": 8,
}
idx2word = {i: w for i, w in enumerate(tgt_vocab)}
tgt_vocab_size = len(tgt_vocab)

src_len = 5  # enc_input max sequence length
tgt_len = 6  # dec_input(=dec_output) max sequence length


def make_data(sentences):
    enc_inputs, dec_inputs, dec_outputs = [], [], []
    for i in range(len(sentences)):
        # [[1, 2, 3, 4, 0], [1, 2, 3, 5, 0]]
        enc_input = [[src_vocab[n] for n in sentences[i][0].split()]]
        # [[6, 1, 2, 3, 4, 8], [6, 1, 2, 3, 5, 8]]
        dec_input = [[tgt_vocab[n] for n in sentences[i][1].split()]]
        # [[1, 2, 3, 4, 8, 7], [1, 2, 3, 5, 8, 7]]
        dec_output = [[tgt_vocab[n] for n in sentences[i][2].split()]]

        enc_inputs.extend(enc_input)
        dec_inputs.extend(dec_input)
        dec_outputs.extend(dec_output)

    return (
        torch.LongTensor(enc_inputs),
        torch.LongTensor(dec_inputs),
        torch.LongTensor(dec_outputs),
    )


enc_inputs, dec_inputs, dec_outputs = make_data(sentences)


class MyDataSet(Data.Dataset):
    def __init__(self, enc_inputs, dec_inputs, dec_outputs):
        super(MyDataSet, self).__init__()
        self.enc_inputs = enc_inputs
        self.dec_inputs = dec_inputs
        self.dec_outputs = dec_outputs

    def __len__(self):
        return self.enc_inputs.shape[0]

    def __getitem__(self, idx):
        return self.enc_inputs[idx], self.dec_inputs[idx], self.dec_outputs[idx]


loader = Data.DataLoader(MyDataSet(enc_inputs, dec_inputs, dec_outputs), 2, True)

### Step1 Positional Encoding

因为没有用循环神经网络，需要序列
在 transformer 里不训练，在 Bert 里会训练
[文章理解](https://wmathor.com/index.php/archives/1453/)

- 编码唯一
- 值有界
- 不同长度的句子之间，任何两个字之间的差值应该一致
- ![](./img/position2.png)
- ![](./img/position.png)


In [50]:
# 参数
d_model = 512
d_ff = 2048
d_k = d_v = 64
n_layers = 6  # number of encoder and decoder layers
n_heads = 8


class PositionalEncoding(nn.Module):
    def __init__(self, d_module, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        pe = torch.zeros(max_len, d_module)
        # unsqueeze => 在指定位置插入一个新的维度
        position = torch.arange(0, max_len, dtype=torch.float32).unsqueeze(1)
        # (seq_len, batch_size, d_model)
        # 比例因子
        div_term = torch.exp(
            torch.arange(0, d_module, 2).float() * (-math.log(10000.0) / d_module)
        )
        # 0::2 切片表达式，索引从零开始，步长为2（隔一个索引取一个）
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        pe = pe.unsqueeze(0).transpose(0, 1)
        # 将位置编码矩阵注册为一个缓冲区, 避免在每次前向传播时重新计算位置编码
        self.register_buffer("pe", pe)

    def forward(self, x):
        x = x + self.pe[0 : x.size(0), :]
        return self.dropout(x)

Tip1 unsqueeze


Tip2 transpose


Tip3 register_buffer


In [51]:
import torch
import math

x = torch.arange(0, 10)
print(x)
print(x[1::2])
print(torch.arange(0, 10, 2, dtype=torch.float32))
print(torch.arange(0, 10, dtype=torch.float32).unsqueeze(1))
print(torch.arange(0, 10, dtype=torch.float32).unsqueeze(0))
# x1 = torch.arange(0, 10, 2)
# x2 = math.log(10000.0) / 10
# print(x1)
# print(x2)
# print(torch.exp(x1)/-x2)

tensor([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
tensor([1, 3, 5, 7, 9])
tensor([0., 2., 4., 6., 8.])
tensor([[0.],
        [1.],
        [2.],
        [3.],
        [4.],
        [5.],
        [6.],
        [7.],
        [8.],
        [9.]])
tensor([[0., 1., 2., 3., 4., 5., 6., 7., 8., 9.]])


### Step2 Pad_Mask and Subsequence Mask

按照 mini-batch 中最大的句长对剩余的句子进行补齐，一般用 0 进行填充(padding)
mask 操作，让无效的区域不参与运算，一般是给无效区域加一个很大的负数偏置

![](./img/padding_mask.png)


In [52]:
def get_attn_pad_mask(seq_q, seq_k):
    batch_size, len_q = seq_q.size()
    print(seq_q.size())
    print(seq_k.size()[0])
    batch_size, len_k = seq_k.size()
    # data.eq(0) 是比较操作，找出序列中所有等于零的元素,返回一个True（即填充（PAD）token），False 表示其他非填充元素
    pad_attn_mask = seq_k.data.eq(0).unsqueeze(1)
    # 根据指定的形状参数沿着指定的维度扩展输入张量
    # print(pad_attn_mask.expand(batch_size, len_q, len_k))
    return pad_attn_mask.expand(batch_size, len_q, len_k)

Tip4 eq()


In [53]:
def get_attn_subsequence_mask(seq):
    # 在decoder中用到，屏蔽未来时刻的信息
    attn_shape = [seq.size(0), seq.size(1), seq.size(1)]
    subsequence_mask = np.triu(np.ones(attn_shape), k=1)
    subsequence_mask = torch.from_numpy(subsequence_mask).byte()
    # torch.from_numpy().byte() 将numpy数组转换为Tensor
    return subsequence_mask

Tip5 np.triu()


In [54]:
import torch

# 原始一维张量
a = torch.tensor([1, 2, 3])
print(a)
print(a.size())
# 在最后一个维度之前添加新维度
b = a.unsqueeze(0)
print(b.size())
print(b)
c = a.unsqueeze(1)
print(c)
print(c.size())

tensor([1, 2, 3])
torch.Size([3])
torch.Size([1, 3])
tensor([[1, 2, 3]])
tensor([[1],
        [2],
        [3]])
torch.Size([3, 1])


### Step3 ScaledDotProductAttention

- ![](./img/attention.png)
- ![](./img/self_attention.png)


In [55]:
class ScaledDotProductAttention(nn.Module):
    """缩放点积注意力 单词间的权重计算"""

    def __init__(self):
        super(ScaledDotProductAttention, self).__init__()

    """
        Q: [batch_size, n_heads, len_q, d_k]
        K: [batch_size, n_heads, len_k, d_k]
        V: [batch_size, n_heads, len_v(=len_k), d_v]
        attn_mask: [batch_size, n_heads, seq_len, seq_len]
    """

    def forward(self, Q, K: torch.Tensor, V, attn_mask):
        # 将Q和K的最后一个维度进行点积，在最后一个维度上进行的。
        scores: torch.Tensor = torch.matmul(Q, K.transpose(-1, -2)) / np.sqrt(d_k)
        # mask --- qt~qn => 很大的负数
        scores.masked_fill_(attn_mask, -1e9)
        # softmax()高得分接近1，低得分接近0，所有概率之和为1
        attn = nn.Softmax(dim=1)(scores)
        # 再乘值向量得到上下文的权重
        context = torch.matmul(attn, V)

        return context, attn

### Step4 MultiHeadAttention

增加可学习的参数 W_Q, W_K, W_V


In [56]:
class MultiHeadAttention(nn.Module):
    def __init__(self):
        super(MultiHeadAttention, self).__init__()
        self.W_Q = nn.Linear(d_model, d_k * n_heads, bias=False)
        self.W_K = nn.Linear(d_model, d_k * n_heads, bias=False)
        self.W_V = nn.Linear(d_model, d_v * n_heads, bias=False)
        # 将多头注意力的输出进行聚合和转换，将输入维度（batch_size,n_heads*d_v)转换为(~, d_model)
        self.fc = nn.Linear(n_heads * d_v, d_model, bias=False)

    def forward(self, input_Q, input_K, input_V, attn_mask):
        """
        input_Q: [batch_size, len_q, d_model]
        input_K: [batch_size, len_k, d_model]
        input_V: [batch_size, len_v(=len_k), d_model]
        attn_mask: [batch_size, seq_len, seq_len]
        """
        # 残差
        residual, batch_size = input_Q, input_Q.size(0)

        Q = self.W_Q(input_Q).view(batch_size, -1, n_heads, d_k).transpose(1, 2)
        K = self.W_K(input_K).view(batch_size, -1, n_heads, d_k).transpose(1, 2)
        V = self.W_V(input_V).view(batch_size, -1, n_heads, d_k).transpose(1, 2)

        attn_mask = attn_mask.unsqueeze(1).repeat(1, n_heads, 1, 1)
        # 实例化->传递参数
        contextm, attn = ScaledDotProductAttention()(Q, K, V, attn_mask)
        context = context.transpose(1, 2).reshape(batch_size, -1, n_heads * d_v)
        # 全连接映射成一维矩阵
        output = self.fc(context)
        # 残差
        return nn.LayerNorm(d_model).cuda()(output + residual), attn

Tip6 torch.matmul()


Tip7 masked_fill()


### Step5 FeedForward Layer

前馈神经网络  
两次线性变换，RELU 作激活层  
残差连接（防止原始数据丢失）


In [57]:
class PoswiseFeedForwardNet(nn.Module):
    def __init__(self):
        super(PoswiseFeedForwardNet, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(d_model, d_ff, bias=False),
            nn.ReLU(),
            nn.Linear(d_ff, d_model, bias=False),
        )

    def forward(self, inputs):
        # 残差保存原始输入
        residual = inputs
        output = self.fc(inputs)
        return nn.LayerNorm(d_model).cuda()(output + residual)

### Step6 Encoder

self-attention feedforward_layer


In [58]:
class EncoderLayer(nn.Module):
    def __init__(self):
        super(EncoderLayer, self).__init__()
        self.enc_self_attn = MultiHeadAttention()
        self.pos_ffn = PoswiseFeedForwardNet()

    def forward(self, enc_inputs, enc_self_attn_mask):
        # K, Q, V, attn_mask
        enc_outputs, attn = self.enc_self_attn(
            enc_inputs, enc_inputs, enc_inputs, enc_self_attn_mask
        )
        enc_inputs = self.pos_ffn(enc_outputs)

        return enc_outputs, attn

In [59]:
class Encoder(nn.Module):
    """Encoder Block"""

    def __init__(self):
        super(Encoder, self).__init__()
        # 词嵌入
        self.src_emb = nn.Embedding(src_vocab_size, d_model)
        # 位置编码
        self.pos_emb = PositionalEncoding(d_model)
        # ? 模块列表，包含多个编码器层
        self.layers = nn.ModuleList([EncoderLayer() for _ in range(n_layers)])

    def forward(self, enc_inputs):
        enc_outputs = self.src_emb(enc_inputs)
        enc_outputs = self.pos_emb(enc_outputs.transpose(0, 1)).transpose(0, 1)
        print(enc_outputs)
        enc_self_attn_mask = get_attn_pad_mask(enc_inputs, enc_outputs)
        enc_self_attns = []
        # 循环遍历每一个编码器层，将词向量和自注意力掩码传递给每一个层，获取该层的输出及自注意力权重，并存储在列表中
        for layer in self.layers:
            enc_outputs, enc_self_attn = layer(enc_outputs, enc_self_attn_mask)
            enc_self_attns.append(enc_self_attn)

        return enc_outputs, enc_self_attns

### Step7 Decoder

Masked Multihead attention
Multihead attention
Feedforward network


In [60]:
class DecoderLayer(nn.Module):
    def __init__(self):
        super(DecoderLayer, self).__init__()
        self.dec_self_attn = MultiHeadAttention()
        self.dec_enc_attn = MultiHeadAttention()
        self.pos_ffn = PoswiseFeedForwardNet()

    def forward(self, dec_inputs, enc_outputs, dec_self_attn_mask, dec_enc_attn_mask):
        dec_outputs, dec_self_attn = self.dec_self_attn(
            dec_inputs, dec_inputs, dec_inputs, dec_self_attn_mask
        )
        # ?将 dec_outputs 作为生成 Q 的元素，enc_outputs 作为生成 K 和 V 的元素
        dec_outputs, dec_enc_attn = self.dec_enc_attn(
            dec_outputs, enc_outputs, enc_outputs, dec_enc_attn_mask
        )

        dec_outputs = self.pos_ffn(dec_outputs)

        return dec_outputs, dec_self_attn, dec_enc_attn

In [61]:
class Decoder(nn.Module):
    def __init__(self):
        super(Decoder, self).__init__()
        self.tgt_emb = nn.Embedding(tgt_vocab_size, d_model)
        self.pos_emb = PositionalEncoding(d_model)
        self.layers = nn.ModuleList([DecoderLayer() for _ in range(n_layers)])

    def forward(self, dec_inputs, enc_inputs, enc_outputs):
        dec_outputs = self.tgt_emb(dec_inputs)
        dec_outputs = self.pos_emb(dec_outputs.transpose(0, 1)).transpose(0, 1).cuda()

        dec_self_attn_pad_mask = get_attn_pad_mask(dec_inputs, dec_inputs).cuda()

        dec_self_attn_subsequence_mask = get_attn_subsequence_mask(dec_inputs).cuda()
        # torch.gt(a, value),将 a 中各个位置上的元素和 value 比较，若大于 value，则该位置取 1，否则取 0
        dec_self_attn_mask = torch.gt(
            (dec_self_attn_pad_mask + dec_self_attn_subsequence_mask), 0
        ).cuda()

        dec_enc_attn_mask = get_attn_pad_mask(dec_inputs, enc_inputs)

        dec_self_attns, dec_enc_attns = [], []
        for layer in self.layers:
            dec_outputs, dec_self_attn, dec_enc_attn = layer(
                dec_outputs, enc_outputs, dec_self_attn_mask, dec_enc_attn_mask
            )
            dec_self_attns.append(dec_self_attn)
            dec_enc_attns.append(dec_enc_attn)
        return dec_outputs, dec_self_attns, dec_enc_attns

Tip torch.gt()  
torch.gt(a, value) 的意思是，将 a 中各个位置上的元素和 value 比较，若大于 value，则该位置取 1，否则取 0


### Step8 Transformer


In [62]:
class Transformer(nn.Module):
    def __init__(self):
        super(Transformer, self).__init__()
        self.encoder = Encoder().cuda()
        self.decoder = Decoder().cuda()
        self.projection = nn.Linear(d_model, tgt_vocab_size, bias=False).cuda()

    def forward(self, enc_inputs, dec_inputs):
        enc_outputs, enc_self_attns = self.encoder(enc_inputs)
        dec_outputs, dec_self_attns, dec_enc_attns = s = self.decoder(
            dec_inputs, enc_inputs, enc_outputs
        )
        dec_logits = self.projection(dec_outputs)

        return (
            dec_logits.view(-1, dec_logits.size(-1)),
            enc_self_attns,
            dec_self_attns,
            dec_enc_attns,
        )

### Step9 数据处理导入


### Step10 损失函数，优化器


In [63]:
model = Transformer()
# ignore_index=0,不计算pad的损失
criterion = nn.CrossEntropyLoss(ignore_index=0)
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3, momentum=0.99)

### Step11 训练


In [64]:
for epoch in range(30):
    for enc_inputs, dec_inputs, dec_outputs in loader:
        # 存储到gpu中
        enc_inputs, dec_inputs, dec_outputs = (
            enc_inputs.cuda(),
            dec_inputs.cuda(),
            dec_outputs.cuda(),
        )
        outputs, enc_self_attns, dec_self_attns, dec_enc_attns = model(
            enc_inputs, dec_inputs
        )
        loss = criterion(outputs, dec_outputs.view(-1))
        print("Epoch:", "%04d" % (epoch + 1), "loss =", "{:.6f}".format(loss))

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

tensor([[[ 0.6063,  3.3110,  0.5439,  ..., -0.4228,  0.1899,  1.7755],
         [ 0.1267,  0.3140,  2.4090,  ...,  0.1230, -0.4121,  0.0000],
         [ 2.2552, -1.9409,  0.7567,  ..., -0.5302,  1.0599,  1.6775],
         [-0.0000, -0.0000, -1.0321,  ...,  2.2619,  0.6345,  1.4845],
         [-1.9242, -0.9691, -0.5569,  ...,  0.6674, -0.4525,  0.8443]],

        [[ 0.0000,  3.3110,  0.5439,  ..., -0.4228,  0.1899,  0.0000],
         [ 0.1267,  0.3140,  2.4090,  ...,  0.0000, -0.4121,  0.8895],
         [ 2.2552, -1.9409,  0.7567,  ..., -0.5302,  1.0599,  1.6775],
         [ 0.4689, -0.5463, -1.2471,  ...,  1.6937,  0.5087,  0.9944],
         [-1.9242, -0.9691, -0.5569,  ...,  0.6674, -0.4525,  0.0000]]],
       device='cuda:0', grad_fn=<TransposeBackward0>)
torch.Size([2, 5])
2


ValueError: too many values to unpack (expected 2)

### Step12 测试


In [None]:
def greedy_decoder(model, enc_input, start_symbol):
    """
    For simplicity, a Greedy Decoder is Beam search when K=1. This is necessary for inference as we don't know the
    target sequence input. Therefore we try to generate the target input word by word, then feed it into the transformer.
    Starting Reference: http://nlp.seas.harvard.edu/2018/04/03/attention.html#greedy-decoding
    :param model: Transformer Model
    :param enc_input: The encoder input
    :param start_symbol: The start symbol. In this example it is 'S' which corresponds to index 4
    :return: The target input
    """
    enc_outputs, enc_self_attns = model.encoder(enc_input)
    dec_input = torch.zeros(1, 0).type_as(enc_input.data)
    terminal = False
    next_symbol = start_symbol
    while not terminal:
        dec_input = torch.cat(
            [
                dec_input.detach(),
                torch.tensor([[next_symbol]], dtype=enc_input.dtype).cuda(),
            ],
            -1,
        )
        dec_outputs, _, _ = model.decoder(dec_input, enc_input, enc_outputs)
        projected = model.projection(dec_outputs)
        prob = projected.squeeze(0).max(dim=-1, keepdim=False)[1]
        next_word = prob.data[-1]
        next_symbol = next_word
        if next_symbol == tgt_vocab["."]:
            terminal = True
        print(next_word)
    return dec_input


# Test
enc_inputs, _, _ = next(iter(loader))
enc_inputs = enc_inputs.cuda()
for i in range(len(enc_inputs)):
    greedy_dec_input = greedy_decoder(
        model, enc_inputs[i].view(1, -1), start_symbol=tgt_vocab["S"]
    )
    predict, _, _, _ = model(enc_inputs[i].view(1, -1), greedy_dec_input)
    predict = predict.data.max(1, keepdim=True)[1]
    print(enc_inputs[i], "->", [idx2word[n.item()] for n in predict.squeeze()])