## todo_list
1. [ ] 是否已经实现了post_norm

# 1. 导入相关的库

In [19]:
import math
from copy import deepcopy
import torch
import torch.nn as nn
import torch.nn.functional as F

# 2. 实现相关的函数

In [20]:
def clones(module, N: int):
	return nn.ModuleList([deepcopy(module) for _ in range(N)])


class LayerNorm(nn.Module):
	def __init__(self, size, eps: float = 1e-5):
		"""eps的数值参考了pytorch的transformer实现"""
		super(LayerNorm, self).__init__()
		self.a_2 = nn.Parameter(torch.ones(size))
		self.b_2 = nn.Parameter(torch.zeros(size))
		self.eps = eps
		
		# 源代码上a_2和b_2没有作参数初始化，现在已经补充上
		nn.init.ones_(self.a_2)
		nn.init.zeros_(self.b_2)
	
	def forward(self, x):
		mean = x.mean(-1, keepdim=True)
		std = x.std(-1, keepdim=True)
		return self.a_2 * (x - mean) / (std + self.eps) + self.b_2


class SublayerConnection(nn.Module):
	def __init__(self, size, dropout):
		super(SublayerConnection, self).__init__()
		self.norm = LayerNorm(size)
		self.dropout = nn.Dropout(dropout)
	
	def forward(self, x, sublayer):
		# 看得出来这里的实现已经改为了pre-norm
		return x + self.dropout(sublayer(self.norm(x)))


class Generator(nn.Module):
	def __init__(self, d_model: int, vocab_size: int):
		super(Generator, self).__init__()
		self.proj = nn.Linear(d_model, vocab_size)
	
	def forward(self, x):
		# 用log_softmax代替softmax可以有效地避免数值溢出。此事在源码中亦有记载。
		return F.log_softmax(self.project(x), dim=1)



# 3. 实现transformer

In [21]:
class Transformer(nn.Module):
	def __init__(self, encoder, decoder, src_emb, tgt_emb, generator):
		"""
		对整个模型结构的抽象。有关参数传递的设计与常见的有很大不同。
		因为EncoderLayer和DecoderLayer的设计基本相同，所以笔记就写在这里了：
		与torch的实现相比，我还是更喜欢源代码的设计。比如参数传递的方法以及对象的构造等等...
		也许以后会做更大规模的重构...
		"""
		super(Transformer, self).__init__()
		self.encoder = encoder
		self.decoder = decoder
		self.src_emb = src_emb
		self.tgt_emb = tgt_emb
		self.generator = generator
	
	def forward(self, src, tgt, src_mask, tgt_mask):
		"""
		todo： 也许可以考虑增加 src_mask 以及 tgt_mask 的默认值设计
		"""
		return self.decode(self.encode(src, src_mask), src_mask, tgt, tgt_mask)
	
	def encode(self, src, src_mask):
		return self.encoder(self.src_emb(src), src_mask)
	
	def decode(self, memory, src_mask, tgt, tgt_mask):
		return self.decoder(self.tgt_emb(tgt), memory, src_mask, tgt_mask)


class Encoder(nn.Module):
	def __init__(self, layer, N: int):
		super(Encoder, self).__init__()
		self.layers = clones(layer, N)
		self.norm = LayerNorm(layer.size)
	
	def forward(self, x, mask):
		for layer in layers:
			x = layer(x, mask)
		return self.norm(x)


class EncoderLayer(nn.Module):
	def __init__(self, size, self_attn, feed_forward, dropout=0.1):
		super(EncoderLayer, self).__init__()
		self.size = size
		self.self_attn = self_attn
		self.feed_forward = feed_forward
		self.sublayer = clones(SublayerConnection(size, dropout), 2)
	
	def forward(self, x, mask):
		x = self.sublayer[0](x, lambda src: self.self_attn(src, src, src, mask))
		return self.sublayer[1](x, self.feed_forward)


class Decoder(nn.Module):
	def __init__(self, layer, N):
		super(Decoder, self).__init__()
		self.layers = clones(layer, N)
		self.norm = LayerNorm(layer.size)
	
	def forward(self, x, memory, src_mask, tgt_mask):
		for layer in layers:
			x = layer(x, memory, src_mask, tgt_mask)
		return self.norm(x)


class DecoderLayer(nn.Module):
	def __init__(self, size, self_attn, cross_attn, feed_forward, dropout=0.1):
		super(DecoderLayer, self).__init__()
		self.size = size
		self.self_attn = self_attn
		self.cross_attn = cross_attn
		self.feed_forward = feed_forward
		self.sublayer = clones(SublayerConnection(size, dropout), 3)
	
	def forward(self, x, memory, cross_mask, tgt_mask):
		m = memory
		x = self.sublayer[0](x, lambda tgt: self.self_attn(tgt, tgt, tgt, tgt_mask))
		x = self.sublayer[1](x, lambda tgt: self.cross_attn(tgt, m, m, cross_mask))
		return self.sublayer[2](x, self.feed_forward)

# 4. 实现attention

In [22]:
def subsequent_mask(seq_size: int):
	# 生成tgt使用的特殊mask
	attn_shape = (1, seq_size, seq_size)
	mask = (torch.triu(torch.ones(attn_shape), diagonal=1).
			type(torch.uint8))
	return mask == 0


def attention(query, key, value, mask=None, dropout=None):
	d_k = query.size(-1)
	# 自动使用batched matmul
	scores = torch.matmul(query, key.transpose(-2, -1) / math.sqrt(d_k))
	if mask is not None:
		scores = scores.masked_fill(mask == 0, 1e-9)
	p_attn = scores.softmax(dim=-1)
	if droput is not None:
		p_attn = dropout(p_attn)
	return torch.matmul(p_attn, value), p_attn


class MHA(nn.Module):
	def __init__(self, n_head: int, d_model: int, dropout: float = 0.1):
		super(MHA, self).__init__()
		assert d_model % h == 0
		self.d_k = d_model // n_head
		self.n_head = n_head
		# 提问：为什么是clone(..., 4)?
		# 回答：四个线性映射，前三个对应 q k v
		# 提问：在这里的实现中是简单的做了一个大linear再切分。这是否与
		#      通过n_head的多个linear等价？
		# (该问题暂时无法回答)
		self.linears = clones(nn.Linear(d_model, d_model), 4)
		self.attn = None
		self.dropout = nn.Dropout(p=dropout)
	
	def forward(self, query, key, value, mask=None):
		if mask is not None:
			mask = mask.unsqueeze(1)
		batch_num = query.size(0)
		
		query, key, value = [
			linear(x).view(batch_num, -1, self.h, self.d_k).transpose(1, 2)
			for linear, x in zip(self.linears, (query, key, value))
		]
		
		x, self.attn = attention(query, key, value, mask=mask, dropout=self.dropout)
		
		x = (
			x.transpose(1, 2)
			.contiguous()
			.view(batch_num, -1, self.h, self.d_k)
		)
		del query
		del key
		del value
		
		return self.linears[-1](x)

# 5. 实现FFN
> 如果看源码的话,可以看到linear的参数都已经做了初始化而且bias默认为True

In [23]:
class FFN(nn.Module):
	def __init__(self, d_model: int, d_ffn: int, dropout=0.1):
		super(FFN, self).__init__()
		# 注意到bias：default = true
		self.w_1 = nn.Linear(d_model, d_ffn)
		self.w_2 = nn.Linear(d_ffn, d_model)
		self.dropout = nn.Dropout(dropout)
	
	def forward(self, x):
		# 不知道这个relu哪里来的
		# return self.w_2(self.dropout(self.w_1(x).relu()))
		return self.w_2(self.dropout(nn.ReLU()(self.w_1(x))))

# 6. 实现 embedding 和 softmax 
> 参考源码，这里调整为log_softmax

In [24]:
class Embedding(nn.Module):
	def __init__(self, vocab_size: int, d_model: int):
		super(Embedding, self).__init__()
		self.emb = nn.Embedding(vocab_size, d_model)
		self.d_model = d_model
	
	def forward(self, x):
		return self.emb(x) * math.sqrt(self.d_model)


class PosEmb(nn.Module):
	def __init__(self, d_model: int, max_len: int = 5000, dropout: float = 0.1):
		super(PosEmb, self).__init__()
		self.dropout = nn.Dropout(p=dropout)
		
		pe = torch.zeros(max_len, d_model)
		position = torch.arange(0, max_len).unsqueeze(1)
		div_term = torch.exp(
			torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model)
		)
		pe[:, 0::2] = torch.sin(position * div_term)
		pe[:, 1::2] = torch.cos(position * div_term)
		pe = pe.unsqueeze(0)
		self.register_buffer("pe", pe)
	
	def forward(self, x):
		x = x + self.pe[:, :x.size(1)].requires_grad_(False)
		return self.dropout(x)

## 测试：PosEmb的可视化

In [25]:
import torch
import pandas as pd
import altair as alt


In [26]:
def example_positional():
	# 设置dropout=0以免图像撕裂
	pe = PosEmb(20, dropout=0)
	y = pe.forward(torch.zeros(1, 100, 20))
	
	data = pd.concat(
		[
			pd.DataFrame(
				{
					"embedding": y[0, :, dim],
					"dimension": dim,
					"position": list(range(100)),
				}
			)
			for dim in [4, 5, 6, 7]
		]
	)
	
	return (
		alt.Chart(data)
		.mark_line()
		.properties(width=800)
		.encode(x="position", y="embedding", color="dimension:N")
		.interactive()
	)

# 取消下一行的注释就可以运行以查看可视化的结果
# example_positional()

# 7. Full Model

In [27]:
def set_model(
		src_vocab_size, tgt_vocab_size, n_layer=6, d_model=512, d_ffn=2048, n_head=8, dropout=0.1
):
	attn = MHA(n_head, d_model, dropout)
	ffn = FFN(d_model, d_ffn, dropout)
	pe = PosEmb(d_model, dropout=dropout)
	model = Transformer(
		Encoder(EncoderLayer(d_model, deepcopy(attn), deepcopy(ffn), dropout), n_layer),
		Decoder(DecoderLayer(d_model, deepcopy(attn), deepcopy(attn), deepcopy(ffn), dropout), n_layer),
		nn.Sequential(Embedding(len(src_vocab_size), d_model), deepcopy(pe)),
		nn.Sequential(Embedding(len(src_vocab_size), d_model), deepcopy(pe)),
		Generator(d_model, tgt_vocab_size)
	)
	
	# 一个比较简单干脆的参数初始化方法。。。不过并不确定是哪些参数被重复初始化了。
	for p in model.parameters():
		if p.dim() > 1:
			nn.init.xavier_uniform_(p)
	return model
	