In [1]:
import math
import torch
import numpy as np
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as Data


In [2]:
# 训练参数
device = 'cuda'
epochs = 100

src_len = 8  # 源句子最大长度 encoder_input max seq len
tgt_len = 7  # decoder_input(=output) max seq len

# transformer网络参数
d_model = 512  # Embedding Size (token embedding 和 position embedding 的编码维度)
d_ff = 2048  # Feed Farward Dimension (512->2048->512)
d_k = d_v = 64  # Dimension of K(=Q) and V (K和Q的维度是相同的，这里为了方便让V也等于Q)
n_layers = 6  # Number of encoder and decoder layer blocks
n_heads = 8  # Number of heads in Multi-Head Attention


Data

In [3]:
# 构建数据集

# 训练集
sentences = [
    # 中文和英语的单词个数不要求相同
    # encoder_input           decoder_input               decoder_output
    ['我 有 一 个 好 朋 友 P', 'S i have a good friend .', 'i have a good friend . E'],
    ['我 有 零 个 女 朋 友 P', 'S i have zero girl friend .', 'i have zero girl friend . E']
]

# 测试集
# 输入：”我 有 一 个 女 朋 友“
# 输出：”i have a girlfriend“

# 构建中文词表
src_vocab = {'P': 0, '我': 1, '有': 2, '一': 3,
             '个': 4, '好': 5, '朋': 6, '友': 7, '零': 8, '女': 9}
src_idx2word = {i: w for i, w in enumerate(src_vocab)}
src_vocab_size = len(src_vocab)

# 构建英文词表
tgt_vocab = {'P': 0, 'i': 1, 'have': 2, 'a': 3, 'good': 4,
             'friend': 5, 'zero': 6, 'girl': 7, 'S': 8, 'E': 9, '.': 10}
tgt_idx2word = {i: w for i, w in enumerate(tgt_vocab)}
tgt_vocab_size = len(tgt_vocab)

# 构建数据


def make_data(sentences):
    """把句子中的单词序列转换为单词对应词表中的索引序列"""
    encoder_inputs, decoder_inputs, decoder_outputs = [], [], []
    for i in range(len(sentences)):
        encoder_input = [[src_vocab[n] for n in sentences[i][0].split()]]
        decoder_input = [[tgt_vocab[n] for n in sentences[i][1].split()]]
        decoder_output = [[tgt_vocab[n] for n in sentences[i][2].split()]]

        encoder_inputs.extend(encoder_input)
        decoder_inputs.extend(decoder_input)
        decoder_outputs.extend(decoder_output)

    return torch.LongTensor(encoder_inputs), torch.LongTensor(decoder_inputs), torch.LongTensor(decoder_outputs)


encoder_inputs, decoder_inputs, decoder_outputs = make_data(sentences)
# encoder_inputs.shape[0],encoder_inputs,decoder_inputs,decoder_outputs


DataSet and DataLoader

In [4]:
class SentenceDataSet(Data.Dataset):
    """自定义DataSet"""

    def __init__(self, encoder_inputs, decoder_inputs, decoder_outputs):
        super(SentenceDataSet, self).__init__()
        self.encoder_inputs = encoder_inputs
        self.decoder_inputs = decoder_inputs
        self.decoder_outputs = decoder_outputs

    def __len__(self):
        return self.encoder_inputs.shape[0]

    def __getitem__(self, idx):
        return self.encoder_inputs[idx], self.decoder_inputs[idx], self.decoder_outputs[idx]


loader = Data.DataLoader(SentenceDataSet(
    encoder_inputs, decoder_inputs, decoder_outputs), 2, True)


Positional Encoding

In [5]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        i_mat = torch.pow(10000, torch.arange(0, d_model, 2) / d_model)
        pe[:, 0::2] = torch.sin(position / i_mat)
        pe[:, 1::2] = torch.cos(position / i_mat)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)  # 在模型中定义一个常量pe

    def forward(self, x):
        """x:[seq_len, batch_size, d_model]"""
        x = x+self.pe[:x.size(0), :]
        return self.dropout(x)


Attention Padding Mask

In [6]:
def get_attention_padding_mask(seq):
    """在对value向量加权和的时候, 使得padding的位置的alpha_ij=0"""
    # seq:[batch_size, seq_len]

    batch_size, seq_len = seq.size()
    padding_attention_mask = seq.data.eq(0).unsqueeze(1)  # [batch_size, 1, seq_len]

    # [batch_size, seq_len, seq_len]
    return padding_attention_mask.expand(batch_size, seq_len, seq_len)

# seq_k = torch.Tensor([[1,2,3,4,0], [1,0,3,5,0]])
# print(get_attention_padding_mask(seq_k))


Attention Subsequence Mask

In [7]:
def get_attention_subsequence_mask(seq):
    """用于decoder_input的上三角mask"""
    # seq:[batch_size, tgt_len]

    # attn_shape:[batch_size, tgt_len, tgt_len]
    attention_shape = [seq.size(0), seq.size(1), seq.size(1)]

    # 返回一个上三角矩阵，第k条对角线以下元素全为0（主对角线为第0条）
    subsequence_mask = np.triu(np.ones(attention_shape), k=1)

    subsequence_mask = torch.from_numpy(subsequence_mask).byte()

    return subsequence_mask

seq_k = torch.Tensor([[1,2,3,4,0], [1,0,3,5,0]])
get_attention_subsequence_mask(seq_k)


tensor([[[0, 1, 1, 1, 1],
         [0, 0, 1, 1, 1],
         [0, 0, 0, 1, 1],
         [0, 0, 0, 0, 1],
         [0, 0, 0, 0, 0]],

        [[0, 1, 1, 1, 1],
         [0, 0, 1, 1, 1],
         [0, 0, 0, 1, 1],
         [0, 0, 0, 0, 1],
         [0, 0, 0, 0, 0]]], dtype=torch.uint8)

Scaled Dot-Product Attention

In [8]:
class ScaledDotProductAttention(nn.Module):
    def __init__(self):
        super(ScaledDotProductAttention, self).__init__()

    def forward(self, Q, K, V, attn_mask):
        """
        Q: [batch_size, n_heads, len_q, d_k]
        K: [batch_size, n_heads, len_k, d_k]
        V: [batch_size, n_heads, len_v(=len_k), d_v]
        attn_mask: [batch_size, n_heads, seq_len, seq_len]

        说明: 在encoder-decoder的Attention层中len_q(q1,..qt)和len_k(k1,...km)可能不同
        """
        scores = torch.matmul(Q, K.transpose(-1, -2) / np.sqrt(d_k)) # scores:[batch_size, n_heads, len_q, len_k]
        
        # 使用mask矩阵填充scores(将scores中对应attn_mask为True的位置变为-1e9)
        scores.masked_fill_(attn_mask, -1e9)

        attn = nn.Softmax(dim=-1)(scores) 

        # attn:[batch_size, n_heads, len_q, len_k]
        # V: [batch_size, n_heads, len_v(=len_k), d_v]
        context = torch.matmul(attn, V) # context: [batch_size, n_heads, len_q, d_v]
        
        # context：[[z1,z2,...],[...]]向量, attn为注意力稀疏矩阵（用于可视化的）
        return context, attn

        
