<a href="https://colab.research.google.com/github/bluetinue/transforner/blob/main/%E6%89%8B%E6%92%95Tansformer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [67]:
import torch.nn as nn
import torch
import torch.nn.functional as F
import copy
# 数学计算工具包
import math

# 输入部分

In [153]:
#@title 词嵌入层
class Embed(nn.Module):
  def __init__(self,vocab,embed_dim):
    super().__init__()
    self.vocab = vocab
    self.embed_dim = embed_dim
    self.lcut = nn.Embedding(self.vocab,self.embed_dim)

  def forward(self,x):
    return self.lcut(x) * math.sqrt(self.embed_dim)

In [154]:
#@title 位置编码器
class PostionalEncoding(nn.Module):
  def __init__(self,d_model,dropout,max_len=60):
    super().__init__()
    self.d_model = d_model
    #drop层防止过拟合
    self.dropout = nn.Dropout(p=dropout)
    #[60,512]
    pe = torch.zeros(max_len,d_model)
    #[60,1]
    position = torch.arange(0,max_len,dtype=torch.float).unsqueeze(1)

    #定义变化矩阵 [256]
    div_term = torch.exp(torch.arange(0,d_model,2) * -(math.log(10000.0) / d_model))

    #矩阵相乘 [60,256]
    my_matmulres = position * div_term
    #按照奇数位*sin，偶数位置 *cos
    pe[:,0::2] = torch.sin(my_matmulres)
    pe[:,1::2] = torch.cos(my_matmulres)

    #[60,512] -->[1,60,512]
    pe = pe.unsqueeze(0)

    #持久化pe
    self.register_buffer("pe",pe)

  def forward(self,x):
    #对句子长度对应的位置索引进行相加
    x = x + self.pe[:,x.size()[1]]
    return self.dropout(x)

# 编码器部分

In [138]:
#@title 自注意力层
def attention(q,k,v,mask=None,dropout=None):
  d_k = q.size(-1)

  #计算权重矩阵q*k的转置
  scores = torch.matmul(q,k.transpose(-2,-1)) / math.sqrt(d_k)

  #是否对权重进行掩码计算和dropout
  if mask is not None:
    scores = scores.masked_fill(mask==0,-1e9)

  #经过softmax输出权重分布
  p_attn = F.softmax(scores,dim=-1)
  if dropout is not None:
    p_attn = dropout(p_attn)

  #返回权重计算结果和权重矩阵
  return torch.matmul(p_attn,v),p_attn

In [151]:
#@title 多头自注意力层
def clones(module,N):
  return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])

class MultiHeadAttention(nn.Module):
  def __init__(self,head,embed_dim,dropout_p=0.1):
    super().__init__()
    #确认维度可以给分
    assert embed_dim % head == 0
    #计算每个头的维度
    self.d_k = embed_dim // head
    self.head = head
    #随即失活；
    self.dropout = nn.Dropout(p=dropout_p)
    #定义线性层
    self.linears = clones(nn.Linear(embed_dim,embed_dim),4)
    #定义atten权重属性
    self.atten = None

  def forward(self,q,k,v,mask=None):
    if mask is not None:
      mask = mask.unsqueeze(0)
    #计算数据有多少个批次 [2,4,512]
    batch_size = q.size(0)
    #数据变换，将数据经过线性层组合链接在一起[2,8,4,64]
    q,k,v = [model(x).view(batch_size,-1,self.head,self.d_k).transpose(1,2)
     for model, x in zip(self.linears,(q,k,v))]
    #经过自注意力计算求个各个头之间的自注意力
    x,self.attn = attention(q,k,v,mask=mask,dropout=self.dropout)
    #数据合并
    x = x.transpose(1,2).contiguous().view(batch_size,-1,self.head*self.d_k)
    #返回线性层输出
    return self.linears[-1](x)

In [140]:
#@title 前馈连接层
class FeedForward(nn.Module):
  def __init__(self,d_model,d_ff,dropout=0.1):
    super().__init__()
    #定义两个线性层让数据走一遍，不改变原有的形状
    self.w_1 = nn.Linear(d_model,d_ff)
    self.w_2 = nn.Linear(d_ff,d_model)
    self.dropout = nn.Dropout(dropout)

  def forward(self,x):
    #第一层经过relu激活函数，在经过dropout，在进第二层
    return self.w_2(self.dropout(F.relu(self.w_1(x))))

In [141]:
#@title 规范化层
class LayerNorm(nn.Module):
  def __init__(self,features,eps=1e-6):
    super().__init__()
    #权重
    self.a1 = nn.Parameter(torch.ones(features))
    #偏置
    self.w1 = nn.Parameter(torch.zeros(features))

    self.eps = eps

  def forward(self,x):
    mean = x.mean(-1,keepdim=True)
    std = x.std(-1,keepdim=True)
    return self.a1 * (x-mean) / (std+self.eps) + self.w1


In [142]:
#@title 拼接两个子层起来形成编码器层

# 子层连接结构 子层(前馈全连接层 或者 注意力机制层)+ norm层 + 残差连接
# SublayerConnection实现思路分析
# 1 init函数  (self, size, dropout=0.1):
# 定义self.norm层 self.dropout层, 其中LayerNorm(size)
# 2 forward(self, x, sublayer) 返回+以后的结果
# 数据self.norm() -> sublayer()->self.dropout() + x
class SublayerConnection(nn.Module):
  def __init__(self,size,dropout):
    super().__init__()
    #定义norm和dropout层
    self.norm = LayerNorm(size)
    self.dropout = nn.Dropout(dropout)

  def forward(self,x,sublayer):
    myres = x + self.dropout(self.norm(sublayer(x)))
    return myres

#编码器层
class EncoderLayer(nn.Module):
  def __init__(self,size,self_atten,feed_forward,dropout):
    super().__init__()
    self.self_atten = self_atten
    self.feed_forward = feed_forward
    self.size = size
    self.sublayers = clones(SublayerConnection(size,dropout),2)

  def forward(self,x,mask):
    x = self.sublayers[0](x,lambda a:self.self_atten(a,a,a,mask))
    x = self.sublayers[1](x,self.feed_forward)
    return x

In [143]:
#@title 拼接6个编码层起来形成编码器
#编码器
class Encoder(nn.Module):
  def __init__(self,layer,N):
    super().__init__()
    self.layer = layer
    self.N = N

    self.layers = clones(layer,N)
    #实例化规范化层
    self.norm = LayerNorm(layer.size)

  def forward(self,x,mask):
    for layer in self.layers:
      x = layer(x,mask)
    return self.norm(x)

In [149]:
#@title 测试
embed_dim = 512
vocab = 1000
x = torch.tensor([[1,2,3,4],[40,50,60,70]])

# 解码器层


In [145]:
#@title 解码器层
class DecoderLayer(nn.Module):
  def __init__(self,size,self_attn,src_attn,feed_forward,dropout):
    super().__init__()
    self.size = size
    self.self_attn = self_attn
    self.src_attn = src_attn
    self.feed_forward = feed_forward
    self.sublayers = clones(SublayerConnection(size,dropout),3)

  def forward(self,y,encoder_output,source_mask,target_mask):
    y1 = self.sublayers[0](y,lambda a:self.self_attn(a,a,a,target_mask))
    y2 = self.sublayers[1](y1,lambda a:self.src_attn(a,encoder_output,encoder_output,source_mask))
    y3 = self.sublayers[2](y2,self.feed_forward)
    return y3

In [146]:
#@title 解码器由若干个解码器层堆叠而成
class Decoder(nn.Module):
  def __init__(self,layer,N):
    super().__init__()
    self.layer = layer
    self.N = N

    self.layers = clones(layer,N)
    #实例化规范化层
    self.norm = LayerNorm(layer.size)

  def forward(self,y,encoder_output,source_mask,target_mask):
    for layer in self.layers:
      y = layer(y,encoder_output,source_mask,target_mask)
    return self.norm(y)

# 输出层

In [88]:
#@title 构建线性层和softmax层作为输出层# 解码器类 Generator 实现思路分析
# init函数 (self, d_model, vocab_size)
# 定义线性层self.project
# forward函数 (self, x)
# 数据 F.log_softmax(self.project(x), dim=-1)
class Output(nn.Module):
  def __init__(self,d_model,vocab_size):
    super().__init__()
    #定义线性层
    self.linear = nn.Linear(d_model,vocab_size)

  def forward(self,x):
    return F.log_softmax(self.linear(x),dim=-1)

# 将多个层构建成模型类

In [105]:
#@title 编码器-解码器
#思路
# 编码解码内部函数类 EncoderDecoder 实现分析
# init函数 (self, encoder, decoder, source_embed, target_embed, generator)
# 5个成员属性赋值 encoder 编码器对象 decoder 解码器对象 source_embed source端词嵌入层对象
# target_embed target端词嵌入层对象 generator 输出层对象
# forward函数 (self, source,  target, source_mask, target_mask)
# 1 编码 s.encoder(self.src_embed(source), source_mask)
# 2 解码 s.decoder(self.target_embed(target), memory, source_mask, target_mask)
# 3 输出 s.output()

class EncoderDecoder(nn.Module):
  def __init__(self,encoder,decoder,x_embed,y_embed,output):
    super().__init__()
    #encoder 编码器
    #decoder 解码器
    #x_embed 源文本词嵌入层及位置索引编码器
    #y_embed 目标文本词嵌入层及位置索引编码器
    #output 输出层
    self.encoder = encoder
    self.decoder = decoder
    self.x_embed = x_embed
    self.y_embed = y_embed
    self.output = output

  def forward(self,x,y,x_mask,y_mask):
    return self.output(self.decoder(self.encode(x,x_mask),x_mask,y,y_mask))

  def encode(self,x,x_mask):
    return self.encoder(self.x_embed(x),x_mask)

  def decode(self,y,y_mask):
    return self.decoder(self.y_embed(y),y_mask)


In [121]:
#@title Tansformer模型构建过程
# make_model函数实现思路分析
# 函数原型(x_vocab, y_vocab, N=6, d_model=512, d_ff=2048, head=8, dropout=0.1)
# 实例化多头注意力层对象 attn
# 实例化前馈全连接对象ff
# 实例化位置编码器对象position
# 构建 EncoderDecoder对象(Encoder对象, Decoder对象,)
# x端输入部分nn.Sequential(),
# y端输入部分nn.Sequential(),
# 线性层输出Generator)
# 对模型参数初始化 nn.init.xavier_uniform_(p)
# 注意使用 c = copy.deepcopy
# 返回model

def MakeTransformer(x_vocab, y_vocab, N=6, d_model=512, d_ff=2048, head=8, dropout=0.1):
  c = copy.deepcopy
  #实例化多头注意力层对象 attn
  attn = MultiHeadAttention(head=head,embed_dim=512)
  # 实例化前馈全连接对象ff
  ff = FeedForward(d_model,d_ff)
  # 实例化位置编码器对象position
  position = PostionalEncoding(d_model,dropout)
  #构建 EncoderDecoder对象
  model = EncoderDecoder(
      #编码器对象
      Encoder(EncoderLayer(d_model,c(attn),c(ff),dropout),N),
      #解码器对象
      Decoder(DecoderLayer(d_model,c(attn),c(attn),c(ff),dropout),N),
      #x端输入部分nn.Sequential()
      nn.Sequential(Embed(x_vocab,d_model),c(position)),
      #y端输入部分nn.Sequential()
      nn.Sequential(Embed(y_vocab,d_model),c(position)),
      #线性层输出Generator
      Output(d_model,y_vocab))

  for p in model.parameters():
    if p.dim() > 1:
      nn.init.xavier_uniform_(p)

  return model

In [108]:
print(mytransformer)

EncoderDecoder(
  (encoder): Encoder(
    (layer): EncoderLayer(
      (self_atten): MultiHeadAttention(
        (dropout): Dropout(p=0.1, inplace=False)
        (linears): ModuleList(
          (0-3): 4 x Linear(in_features=512, out_features=512, bias=True)
        )
      )
      (feed_forward): FeedForward(
        (w_1): Linear(in_features=512, out_features=2048, bias=True)
        (w_2): Linear(in_features=2048, out_features=512, bias=True)
        (dropout): Dropout(p=0.1, inplace=False)
      )
      (sublayers): ModuleList(
        (0-1): 2 x SublayerConnection(
          (norm): LayerNorm()
          (dropout): Dropout(p=0.1, inplace=False)
        )
      )
    )
    (layers): ModuleList(
      (0-5): 6 x EncoderLayer(
        (self_atten): MultiHeadAttention(
          (dropout): Dropout(p=0.1, inplace=False)
          (linears): ModuleList(
            (0-3): 4 x Linear(in_features=512, out_features=512, bias=True)
          )
        )
        (feed_forward): FeedForward(
