In [None]:
import torch
import torch.nn as nn
import math

# Transformer Pytorch 实现
https://github.com/hyunwoongko/transformer

# Models

## Embedding

### Positional Encoding
- positional encoding 公式计算

In [89]:
class PostionalEncoding(nn.Module):

    def __init__(self, d_model, max_len, device):
        """ 
        PositionalEncoding
        
        :param d_model: input_embedding 维度
        :param max_len: 最大 sequence 长度
        """
        super(PostionalEncoding, self).__init__()

        # 和 input_embedding 形状相同 (max_len, d_model)
        self.encoding = torch.zeros(max_len, d_model, device=device)
        self.encoding.requires_grad = False

        pos = torch.arange(0, max_len, device=device)
        pos = pos.float().unsqueeze(dim=1)
        
        _2i = torch.arange(0, d_model, step=2, device=device).float()
        # positional encoding 计算公式，偶数位用sin，奇数位用cos
        self.encoding[:, 0::2] = torch.sin(pos / (10000 ** (_2i / d_model)))
        self.encoding[:, 1::2] = torch.cos(pos / (10000 ** (_2i / d_model)))

    def forward(self, x):
        '''
        :param x: [batch_size, seq_len] 是原本的 token 序列，还没做 embedding
        :return: [seq_len, d_model] 不同 batch 的 position encoding 相同，返回相同的就好
        '''
    	batch_size, seq_len = x.size()
    	return self.encoding[:seq_len, :]

### Transformer Embedding
- Input Embedding: 这个是根据数据集不同自行决定，这里不实现
- Positional Encoding：对输入序列加上 position 信息
- Dropout

In [80]:
class TransformerEmbedding(nn.Module):

    def __init__(self, vocab_size, d_model, max_len, drop_prob, device):
        """
        Input embedding = token embedding + positional encoding

        :param vocab_size: token 数量
        :param d_model: input embedding 维度
        :param max_len: 最大 sequence 长度
        :param drop_prob: 每层都要 drop 一下
        """
        super(TransformerEmbedding, self).__init__()
        self.tok_emb = nn.Embedding(vocab_size, d_model) # 这里 embedding 是可以参数共享的。embedding 也可以是预训练的
        self.pos_emb = PostionalEncoding(d_model, max_len, device)
        self.drop_out = nn.Dropout(p=drop_prob)

    def forward(self, x):
        '''
        :param x: [batch_size, seq_len]
        :return: [batch_size, seq_len, d_model]
        '''

        tok_emb = self.tok_emb(x) # (batch_size, seq_len, d_model)
        
        pos_emb = self.pos_emb(x) # (seq_len, d_model)
        return self.drop_out(tok_emb + pos_emb)

## Layer



### Layer Normalization
- feature normalization

In [102]:
class LayerNorm(nn.Module):
    def __init__(self, d_model, eps=1e-12):
        '''
        Layer Normalization: 对同一数据的 feature 进行 normalization

        :param d_model: input embedding 维度
        :param eps: a value added to the denominator for numerical stability. Default: 1e-5
        '''
        super(LayerNorm, self).__init__()
        self.gamma = torch.ones(d_model)
        self.beta = torch.zeros(d_model)
        self.eps = eps

    def forward(self, x):
        '''
        :param x: [batch_size, seq_len, d_model]
        '''
        mean = x.mean(-1, keepdim=True)
        var = x.var(-1, unbiased=False, keepdim=True)
        # '-1' means last dimension. 

        out = (x - mean) / torch.sqrt(var + self.eps)
        out = self.gamma * out + self.beta
        return out

### Feed-Forward Network
- linear layer 升维
- ReLU activation 激活
- dropout
- linear layer 降维

In [95]:
class PositionwiseFeedForward(nn.Module):

    def __init__(self, d_model, hidden, drop_prob=0.1):
        """ 
        Feed-Forward Network: 就是先升维激活再降维，在非线性空间拉开差异
        
        :param d_model: input embedding 维度
        :param hidden: 中间层维度
        :param drop_prob: dropout 概率，每一层都要 drop
        """
        super(PositionwiseFeedForward, self).__init__()
        self.linear1 = nn.Linear(d_model, hidden) # 先升维
        self.linear2 = nn.Linear(hidden, d_model) # 再降维
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(p=drop_prob)

    def forward(self, x):
        '''
        :param x: [batch_size, seq_len, d_model]
        :return: [batch_size, seq_len, d_model]
        '''
        x = self.linear1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.linear2(x)
        return x

### Self-Attention
- attentionMatrix = queryMatrix * keyMatrix.T
- scale attentionMatrix
- softmax attentionMatrix
- output = attentionMatrix * valueMatrix

In [94]:
class ScaleDotProductAttention(nn.Module):

    def __init__(self):
        super(ScaleDotProductAttention, self).__init__()
        self.softmax = nn.Softmax(dim=-1)

    def forward(self, q, k, v, mask=None, e=1e-12):
        """
        self-attention 计算
        :param q: [batch_size, n_head, seq_len, d_tensor] token query
        :param k: [batch_size, n_head, seq_len, d_tensor] token keys
        :param v: [batch_size, n_head, seq_len, d_tensor] token values
        :return [batch_size, n_head, seq_len, d_tensor]
        """
        # input is 4 dimension tensor
        # [batch_size, head, length, d_tensor]
        batch_size, head, length, d_tensor = k.size()

        # Query 和 Key 点乘，得到 attentionMatrix [batch_size, n_head, seq_len, seq_len]
        # 再做 scale，防止点乘结果太大，影响后面 softmax，导致梯度消失
        k_t = k.transpose(2, 3)  # transpose
        score = (q @ k_t) / math.sqrt(d_tensor)  # scaled dot product

        # (optional) 加 mask, decoder 第一个 multi-head attention 需要做这步
        if mask is not None:
            score = score.masked_fill(mask == 0, -e)

        # 用 softmax 将各个 attention 聚集到 0-1 范围内
        score = self.softmax(score)

        # 根据 attention score，accumulate weighted value 得到 output
        v = score @ v

        # 这里返回 attention 是因为原作者想要可视化 attention 的分布
        return v, score

### Multi-Head Attention
- multi-head split
- self-attention
- multi-head concat
- linear transfromation

In [93]:
class MultiHeadAttention(nn.Module):

    def __init__(self, d_model, n_head):
        """ 
        Multi-Head Attention
        
        :param d_model: input embedding 维度
        :param n_head: multi-head attention layer, self-attention 的层数
        """
        super(MultiHeadAttention, self).__init__()
        self.n_head = n_head
        self.attention = ScaleDotProductAttention()
        self.w_q = nn.Linear(d_model, d_model)
        self.w_k = nn.Linear(d_model, d_model)
        self.w_v = nn.Linear(d_model, d_model)
        self.w_concat = nn.Linear(d_model, d_model)

    def forward(self, q, k, v, mask=None):
        # 根据 query, key, value 的权重矩阵(Linear Layer) 求出输入值对应的 QueryMatrix, KeyMatrix, ValueMatrix
        # [batch_size, seq_len, d_model]
        q, k, v = self.w_q(q), self.w_k(k), self.w_v(v)

        # 切割维度，分为 n_head，每个子维度为 d_tensor = d_model // n_head
        # [batch_size, seq_len, n_head, d_tensor]
        q, k, v = self.split(q), self.split(k), self.split(v)

        # 做 self-attention，考虑要不要加 mask
        out, attention = self.attention(q, k, v, mask=mask)

        # 将 multi-head 结果合并在一起，再加一个 linear layer 保证输出维度和输入一致
        out = self.concat(out)
        out = self.w_concat(out)

        return out

    def split(self, tensor):
        """
        根据 n_head 个数将维度切割成各个小维度集 n_head * d_tensor == d_model
        :param tensor: [batch_size, seq_len, d_model]
        :return: [batch_size, head, seq_len, d_tensor]
        """
        batch_size, length, d_model = tensor.size()

        d_tensor = d_model // self.n_head
        tensor = tensor.view(batch_size, length, self.n_head, d_tensor).transpose(1, 2)
        # 每一个 token 的 feature(dimension) 被分为 n_head

        return tensor

    def concat(self, tensor):
        """
        将 multi-head 结果合并在一起
        :param tensor: [batch_size, head, seq_len, d_tensor]
        :return: [batch_size, seq_len, d_model]
        """
        batch_size, head, length, d_tensor = tensor.size()
        d_model = head * d_tensor

        tensor = tensor.transpose(1, 2).contiguous().view(batch_size, length, d_model)
        return tensor

## Blocks

### Encoder Block
- Multi-Head Attention
- Residual & Add & Norm & Dropout
- Feed Forwad
- Residual & Add & Norm & Dropout

In [99]:
class EncoderLayer(nn.Module):

    def __init__(self, d_model, ffn_hidden, n_head, drop_prob):
        """ 
        Encoder Block
        
        :param d_model: input embedding 维度
        :param ffn_hidden: ffn 中间层维度
        :param n_head: multi-head attention layer, self-attention 的层数
        :param drop_prob: dropout 概率，每一层都要 drop
        """
        super(EncoderLayer, self).__init__()
        self.attention = MultiHeadAttention(d_model=d_model, n_head=n_head)
        self.norm1 = LayerNorm(d_model=d_model)
        self.dropout1 = nn.Dropout(p=drop_prob)

        self.ffn = PositionwiseFeedForward(d_model=d_model, hidden=ffn_hidden, drop_prob=drop_prob)
        self.norm2 = LayerNorm(d_model=d_model)
        self.dropout2 = nn.Dropout(p=drop_prob)

    def forward(self, x, s_mask):
        # self-attention
        _x = x
        x = self.attention(q=x, k=x, v=x, mask=s_mask)
        
        # Residual & Add & Norm & Dropout
        x = self.norm1(x + _x)
        x = self.dropout1(x)
        
        # Positionwise Feed Forward Network
        _x = x
        x = self.ffn(x)
      
        # Residual & Add & Norm & Dropout
        x = self.norm2(x + _x)
        x = self.dropout2(x)
        return x

### Decoder Block
- Multi-Head Attention (target sequence)
- Residual & Add & Norm & Dropout
- Multi-Head Attention (source sequence)
- Residual & Add & Norm & Dropout
- Feed Forwad
- Residual & Add & Norm & Dropout

In [100]:
class DecoderLayer(nn.Module):

    def __init__(self, d_model, ffn_hidden, n_head, drop_prob):
        """ 
        Decoder Block
        
        :param d_model: input embedding 维度
        :param ffn_hidden: ffn 中间层维度
        :param n_head: multi-head attention layer, self-attention 的层数
        :param drop_prob: dropout 概率，每一层都要 drop
        """
        super(DecoderLayer, self).__init__()
        self.self_attention = MultiHeadAttention(d_model=d_model, n_head=n_head)
        self.norm1 = LayerNorm(d_model=d_model)
        self.dropout1 = nn.Dropout(p=drop_prob)

        self.enc_dec_attention = MultiHeadAttention(d_model=d_model, n_head=n_head)
        self.norm2 = LayerNorm(d_model=d_model)
        self.dropout2 = nn.Dropout(p=drop_prob)

        self.ffn = PositionwiseFeedForward(d_model=d_model, hidden=ffn_hidden, drop_prob=drop_prob)
        self.norm3 = LayerNorm(d_model=d_model)
        self.dropout3 = nn.Dropout(p=drop_prob)

    def forward(self, dec, enc, t_mask, s_mask):
        # 第一个 multi-head attention 是带 mask 的，获取 decoder 已有 sequence 的信息
        _x = dec
        x = self.self_attention(q=dec, k=dec, v=dec, mask=t_mask)
        
        # Residual & Add & Norm & Dropout
        x = self.norm1(x + _x)
        x = self.dropout1(x)

        if enc is not None:
            # 第二个 multi-head 和结合 encoder 的结果来做的，获取整个 sequence 的信息
            _x = x
            x = self.enc_dec_attention(q=x, k=enc, v=enc, mask=s_mask)
            
            # Residual & Add & Norm & Dropout
            x = self.norm2(x + _x)
            x = self.dropout2(x)

        # Positionwise Feed Forward Network
        _x = x
        x = self.ffn(x)

        # Residual & Add & Norm & Dropout
        x = self.norm3(x + _x)
        x = self.dropout3(x)
        return x

## Model

### Encoder Model
- Input Embedding
- Encoder Block 多个

In [97]:
class Encoder(nn.Module):

    def __init__(self, enc_voc_size, max_len, d_model, ffn_hidden, n_head, n_layers, drop_prob, device):
        """ 
        Encoder Model
        
        :param enc_voc_size: encoder token 数量
        :param max_len: 最大 sequence 长度
        :param d_model: input embedding 维度
        :param ffn_hidden: ffn 中间层维度
        :param n_head: multi-head attention layer, self-attention 的层数
        :param n_layers: encoder block 重复次数，论文里是 8 次
        :param drop_prob: dropout 概率，每一层都要 drop
        """
        super().__init__()
        self.emb = TransformerEmbedding(d_model=d_model,
                                        max_len=max_len,
                                        vocab_size=enc_voc_size,
                                        drop_prob=drop_prob,
                                        device=device)

       
        self.layers = nn.ModuleList([EncoderLayer(d_model=d_model,
                                                  ffn_hidden=ffn_hidden,
                                                  n_head=n_head,
                                                  drop_prob=drop_prob)
                                     for _ in range(n_layers)])

    def forward(self, x, s_mask):
        '''
        :param x: [batch_size, seq_len]
        :param s_mask: attention 计算是否要 mask
        :return: [batch_size, seq_len, d_model]
        '''

        # 输入部分，做 embedding + positional encoding + dropout
        x = self.emb(x)

        # 重复 n_layers 次 encoder block 部分
        for layer in self.layers:
            x = layer(x, s_mask)

        return x

In [None]:
# test
# Encoder(10, 12, 4, 8, 2, 4, 0.1, None)

### Decoder Model
- Input Embedding
- Decoder Block 多个
- Linear layer 输出

In [None]:
class Decoder(nn.Module):
    
    def __init__(self, dec_voc_size, max_len, d_model, ffn_hidden, n_head, n_layers, drop_prob, device):
        """ 
        Encoder Model
        
        :param dec_voc_size: decoder token 数量
        :param max_len: 最大 sequence 长度
        :param d_model: input embedding 维度
        :param ffn_hidden: ffn 中间层维度
        :param n_head: multi-head attention layer, self-attention 的层数
        :param n_layers: encoder block 重复次数，论文里是 8 次
        :param drop_prob: dropout 概率，每一层都要 drop
        """
        super().__init__()
        self.emb = TransformerEmbedding(d_model=d_model,
                                        drop_prob=drop_prob,
                                        max_len=max_len,
                                        vocab_size=dec_voc_size,
                                        device=device)

        self.layers = nn.ModuleList([DecoderLayer(d_model=d_model,
                                                  ffn_hidden=ffn_hidden,
                                                  n_head=n_head,
                                                  drop_prob=drop_prob)
                                     for _ in range(n_layers)])

        self.linear = nn.Linear(d_model, dec_voc_size)

    def forward(self, trg, enc_src, trg_mask, src_mask):
        '''
        :param trg: [batch_size, seq_len] 已有序列，训练阶段为 truth sequence, 测试阶段为 predicted sequence
        :param enc_src: encoder 的输出结果
        :param trg_mask: target sequence 的 mask (通常得有), 第一个 multi-head attention
        :param src_mask: src sequence 的 mask (通常没有), 第二个 multi-head attention
        :return: [batch_size, seq_len, dec_voc_size]
        '''
        trg = self.emb(trg)

        for layer in self.layers:
            trg = layer(trg, enc_src, trg_mask, src_mask)

        output = self.linear(trg)
        return output

### Transformer

In [None]:
class Transformer(nn.Module):

    def __init__(self, src_pad_idx, trg_pad_idx, trg_sos_idx, enc_voc_size, dec_voc_size, d_model, n_head, max_len,
                 ffn_hidden, n_layers, drop_prob, device):
        """ 
        Transformer Model
        
        :param src_pad_idx:
        :param trg_pad_idx:
        :param trg_sos_idx:
        :param enc_voc_size: encoder token 数量
        :param dec_voc_size: decoder token 数量
        :param d_model: input embedding 维度
        :param n_head: multi-head attention layer, self-attention 的层数
        :param max_len: 最大 sequence 长度
        :param ffn_hidden: ffn 中间层维度
        :param n_layers: encoder block 重复次数，论文里是 8 次
        :param drop_prob: dropout 概率，每一层都要 drop
        """
        super().__init__()
        self.src_pad_idx = src_pad_idx
        self.trg_pad_idx = trg_pad_idx
        self.trg_sos_idx = trg_sos_idx
        self.device = device
        self.encoder = Encoder(d_model=d_model,
                               n_head=n_head,
                               max_len=max_len,
                               ffn_hidden=ffn_hidden,
                               enc_voc_size=enc_voc_size,
                               drop_prob=drop_prob,
                               n_layers=n_layers,
                               device=device)

        self.decoder = Decoder(d_model=d_model,
                               n_head=n_head,
                               max_len=max_len,
                               ffn_hidden=ffn_hidden,
                               dec_voc_size=dec_voc_size,
                               drop_prob=drop_prob,
                               n_layers=n_layers,
                               device=device)

    def forward(self, src, trg):
        '''
        :param src: [batch_size, seq_len] source 序列
        :param trg: [batch_size, seq_len] target 序列，训练阶段为 truth sequence, 测试阶段为 predicted sequence
        :return: [batch_size, seq_len, dec_voc_size]
        '''
        src_mask = self.make_pad_mask(src, src)

        src_trg_mask = self.make_pad_mask(trg, src)

        trg_mask = self.make_pad_mask(trg, trg) * self.make_no_peak_mask(trg, trg)

        enc_src = self.encoder(src, src_mask)
        output = self.decoder(trg, enc_src, trg_mask, src_trg_mask)
        return output

    def make_pad_mask(self, q, k):
        '''
        给 q, k 两个 sequence 加 padding mask

        :param q: [batch_size, q_seq_len] 
        :param k: [batch_size, k_seq_len]
        :return: [batch_size, seq_len, dec_voc_size]
        '''
        len_q, len_k = q.size(1), k.size(1)

        # torch.ne 计算两个 tensor 中各个位置是否不相等 e.g. ne([1, 0], [0, 0]) == [True, False]

        # batch_size x 1 x 1 x len_k
        k = k.ne(self.src_pad_idx).unsqueeze(1).unsqueeze(2)
        # batch_size x 1 x len_q x len_k
        k = k.repeat(1, 1, len_q, 1)

        # batch_size x 1 x len_q x 1
        q = q.ne(self.src_pad_idx).unsqueeze(1).unsqueeze(3)
        # batch_size x 1 x len_q x len_k
        q = q.repeat(1, 1, 1, len_k)

        mask = k & q
        return mask

    def make_no_peak_mask(self, q, k):
        # 这个就是 decoder 第一个 multi-head attention 计算时的 mask
        len_q, len_k = q.size(1), k.size(1)

        # len_q x len_k
        mask = torch.tril(torch.ones(len_q, len_k)).type(torch.BoolTensor).to(self.device)

        return mask