# Reference
* [熬了一晚上，我从零实现了Transformer模型，把代码讲给你听](https://mp.weixin.qq.com/s/q30K-W4v853rEcDsP4SgyA)
* ![](https://miro.medium.com/max/958/1*2KrICIr3FUjUj1ukBvbNKw.png)![](https://mmbiz.qpic.cn/sz_mmbiz_jpg/gYUsOT36vfrc46CPgjmcu492RZKlbMIeKcBYEayBkpjfx2gBAe81a1WicMMBLeicjH2KR46KzkUgOgZd37K9ZXfQ/640?wx_fmt=jpeg&wxfrom=5&wx_lazy=1&wx_co=1)

In [24]:
import torch
import torch.nn as nn
import numpy as np
import math

if torch.cuda.is_available():
    print('CUDA is available')
    cuda0 = torch.device('cuda:0')

CUDA is available


# Config

In [77]:
class Config(object):
    def __init__(self, 
                 vocab_size, d_model, n_heads, padding_size, N, dropout, mask=True 
                ):
        super(Config, self).__init__()
        self.vocab_size = vocab_size
        self.d_model = d_model # embedding size
        assert self.d_model % n_heads == 0
        self.n_heads = n_heads # number of heads
        self.padding_size = padding_size # or seq_size
        self.N = N # number of encoder and decoder
        self.PAD = 0 # padding index
        self.UNK = 1 # unkown index
        self.dropout = dropout
        self.mask = mask # whether use masking

In [78]:
config = Config(vocab_size=6, d_model=20, n_heads=5, padding_size=8, N=10, dropout=0.5)

# Embedding
* input: [batch_size * seq_len]
* output: [batch_size * seq_len * d_model]
* add a normal embedding layer and position embedding layer

In [34]:
class Embedding(nn.Module):
    def __init__(self, padding_size, PAD, UNK, vocab_size, d_model, seq_len):
        super(Embedding, self).__init__()
        self.padding_szie = padding_size
        self.PAD = PAD
        self.UNK = UNK
        self.vocab_size = vocab_size
        self.d_model = d_model
        self.seq_len = seq_len
        self.embedding = nn.Embedding(
            embedding_dim=self.d_model, 
            padding_idx=self.PAD, 
            vocab_size=self.vocab_size
        )

    def forward(self, x):
        batch_size_ = x.shape[0]
        for i in range(batch_size_):
            if len(x[i]) < self.seq_len:
                x[i].extend([self.PAD] * (self.padding_szie - len(x[i])))
            else:
                x[i] = x[:self.padding_szie]
        x = self.embedding(torch.tensor(x))
        return x

# Positional Embedding
![](https://mmbiz.qpic.cn/sz_mmbiz_jpg/gYUsOT36vfrc46CPgjmcu492RZKlbMIe0MYRn97p9ffMJQhuOGqPmQteX8JmS68FMbZ0aW1WicKLRWmzniaxC1Ng/640?wx_fmt=jpeg&wxfrom=5&wx_lazy=1&wx_co=1)

In [79]:
class PositionalEmbedding(nn.Module):
    def __init__(self, d_model):
        super(PositionalEmbedding, self).__init__()
        self.d_model = d_model
    
    def forward(self, seq_len):
        position_embed = np.zeros((seq_len, self.d_model))
        for pos in range(position_embed.shape[0]):
            for i in range(position_embed.shape[1]):
                position_embed[pos][i] = math.sin(pos/(10000**((2*i)/self.d_model))) if i % 2 == 0 else math.cos(pos/(10000**((2*i)/self.d_model)))
        return torch.from_numpy(position_embed)

# Multi-head Self-attention
![](https://camo.githubusercontent.com/9ebfaedcab26d98abf2520743e52a3eefb56811036d843811d38b55c5d8668c1/68747470733a2f2f706963322e7a68696d672e636f6d2f38302f76322d65643932336462613966336333663332646563643262386136323833646134315f373230772e6a7067)

In [71]:
# Encoder
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, seq_len, n_heads, dim_k, dim_v):
        super(MultiHeadAttention, self).__init__()
        self.d_model = d_model
        self.seq_len = seq_len
        self.dim_k = dim_k
        self.dim_v = dim_v
        self.n_heads = n_heads
        assert self.dim_k % self.n_heads == 0 and self.dim_v % self.n_heads == 0 # dim_v also is output size

        self.q = nn.Linear(self.d_model, self.dim_k, bias=True)
        self.k = nn.Linear(self.d_model, self.dim_k, bias=True)
        self.v = nn.Linear(self.d_model, self.dim_v, bias=True)
        self.norm_factor = 1 / math.sqrt(dim_k)
        self.softmax = nn.Softmax(dim=-1)
        self.o = nn.Linear(self.dim_v, self.d_model)

    def generate_mask(self, dim):
        # 此处是 sequence mask ，防止 decoder窥视后面时间步的信息。
        init_mt = np.ones((dim, dim))
        masking = torch.tensor(np.tril(init_mt))
        return masking == 1

    def forward(self, x, y, requires_mask=False):
        # x: [batch_size, seq_len, d_model]
        Q = self.q(x).reshape(-1, x.shape[1], self.n_heads, self.dim_k//self.n_heads)
        K = self.k(x).reshape(-1, x.shape[1], self.n_heads, self.dim_k//self.n_heads)
        V = self.v(y).reshape(-1, y.shape[1], self.n_heads, self.dim_v//self.n_heads)
        scaled_attention_score = (Q @ K.permute(0, 1, 3, 2)) * self.norm_factor
        if requires_mask:
            masking = self.generate_mask(x.shape[1])
            scaled_attention_score = scaled_attention_score.fill_mask(masking, value=float("-inf"))
        output = (self.softmax(scaled_attention_score) @ V).reshape(y.shape[0], y.shape[1], -1)
        output = self.o(output)
        return output


# Feed Forward
![](https://mmbiz.qpic.cn/sz_mmbiz_jpg/gYUsOT36vfrc46CPgjmcu492RZKlbMIeXs8Btdy2AKetj5jeRKnLdRQKJU1nxwmrYazXuMTFtKGF63udn9oAdA/640?wx_fmt=jpeg&wxfrom=5&wx_lazy=1&wx_co=1)

In [73]:
class FeedForward(nn.Module):
    def __init__(self, input, hidden_dim=2048):
        super(FeedForward, self).__init__()
        self.L1 = nn.Linear(input, hidden_dim, bias=True)
        self.L2 = nn.Linear(hidden_dim, input, bias=True)

    def forward(self, x):
        return self.L2(nn.ReLU(self.L1(x)))

# Add & LayerNorm
![](https://mmbiz.qpic.cn/sz_mmbiz_jpg/gYUsOT36vfrc46CPgjmcu492RZKlbMIemOsN9yqOQNdS8a4A6lQXDTfB2ey09cD5uUHexdiaSJYrdqnYwMsmSng/640?wx_fmt=jpeg&wxfrom=5&wx_lazy=1&wx_co=1)

In [None]:
class AddLayerNorm(nn.Module):
    def __init__(self, dropout):
        super(AddLayerNorm, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

    def forward(self, x, sublayer, **kwarg):
        # sublayer can be FFN or Multi-head self-attention layer
        subout = sublayer(x, **kwarg)
        x = self.dropout(x + subout)
        layer_norm = nn.LayerNorm(x.size()[1:])
        output = layer_norm(x)
        return output

# Encoder
![](https://miro.medium.com/max/958/1*2KrICIr3FUjUj1ukBvbNKw.png)

In [None]:
class Encoder(nn.Module):
    def __init__(self, d_model, seq_size, dim_k, dim_v, n_heads, mask, PAD, UNK, vocab_size, dropout):
        super(Encoder, self).__init__()
        self.d_model = d_model
        self.seq_size = seq_size
        self.dim_k = dim_k
        self.dim_v = dim_v
        self.n_heads = n_heads
        self.mask = mask
        self.PAD = PAD
        self.UNK = UNK
        self.vocab_size = vocab_size
        self.dropout = dropout

        self.position_embedding = PositionalEmbedding(d_model=self.d_model)
        self.attention = MultiHeadAttention(
            d_model=self.d_model,
            seq_len=self.seq_size,
            n_heads=self.n_heads,
            dim_k=self.dim_k,
            dim_v=self.dim_v
        )
        self.ffn = FeedForward(input=self.d_model) # or self.seq_size? to be confirmed
        self.add_norm = AddLayerNorm(dropout=self.dropout)

    def forward(self, x):
        # x tensor: [batch_size, seq_size]
        x += self.position_embedding(self.seq_size)
        output = self.add_norm(x, self.attention, y=x)
        output = self.add_norm(output, self.ffn)

        return output

# Decoder

In [None]:
class Decoder(nn.Module):
    def __init__(self, d_model, seq_size, dim_k, dim_v, n_heads, mask, PAD, UNK, vocab_size, dropout):
        super(Decoder, self).__init__()
        pass

In [None]:
self.embedding = Embedding(
            padding_size=self.seq_size, PAD=self.PAD,
            UNK=self.UNK,
            vocab_size=self.vocab_size,
            d_model=self.d_model,
        )

In [69]:
x = torch.randn(2, config.padding_size, config.d_model)
x.shape

torch.Size([2, 8, 20])