# Transformer实现

In [3]:
import torch
from torch import nn
import torch.nn.functional as F
import math


### 1. TransformerEmbedding

In [9]:
# 输入词汇表---->指定维度的embedding
class TokenEmbedding(nn.Embedding):
    def __init__(self,vocab_size,d_model):
        super(TokenEmbedding,self).__init__(vocab_size,d_model,padding_idx=1)


In [10]:
# 编码可以改
class PositionEmbedding(nn.Module):
    def __init__(self, d_model, max_len, device):
        super().__init__(PositionEmbedding,self).__init__()
        self.encoding=torch.zeros(max_len,d_model,device=device)
        self.encoding.requires_grad=False
        pos=torch.arange(0,max_len,device=device)
        pos=pos.float().unsqueeze(dim=1)
        _2i=torch.arange(0,d_model,step=2,device=device).float()
        self.encoding[:,0::2]=torch.sin(pos/(10000**(_2i/d_model)))
        self.encoding[:,1::2]=torch.cos(pos/(10000**(_2i/d_model)))

    def forward(self,x):
        batch_size,seq_len=x.size()
        return self.encoding[:seq_len,:]



In [11]:
class TransformerEmbedding(nn.Module):
    def __init__(self, vocab_size, d_model, max_len, drop_prob,device):
        super(TransformerEmbedding,self).__init__()
        self.tok_embedding=TokenEmbedding(vocab_size,d_model)
        self.pos_embedding=PositionEmbedding(d_model,max_len,device)
        self.drop_out=nn.Dropout(p=drop_prob)

    def forward(self,x):
        tok_emb=self.tok_embedding(x)
        pos_emb=self.pos_embedding(x)
        return self.drop_out(tok_emb+pos_emb)
    
    

### 2. multi-attention

In [12]:
x=torch.rand(128,32,512)
# batchsize,seqlen,feature

d_model=512
n_head=8

In [13]:
class MultiHeadAttention(nn.Module):#multi-head self-attention
    def __init__(self, d_model, n_head):
        super(MultiHeadAttention,self).__init__()
        self.n_head=n_head
        self.d_model=d_model
        self.w_q=nn.Linear(d_model,d_model)
        self.w_k=nn.Linear(d_model,d_model)
        self.w_v=nn.Linear(d_model,d_model)
        self.w_combine=nn.Linear(d_model,d_model)
        self.softmax=nn.Softmax(dim=-1)
    
    def forward(self,q,k,v,mask=None):
        batch,time,dimension=q.shape
        n_d=self.d_model//self.n_head
        q,k,v=self.w_q(q),self.w_k(k),self.w_v(v)
        q=q.view(batch,time,self.n_head,n_d).permute(0,2,1,3)
        k=k.view(batch,time,self.n_head,n_d).permute(0,2,1,3)
        v=v.view(batch,time,self.n_head,n_d).permute(0,2,1,3)
        """
        变换前的形状：(batch, time, n_head, head_dim)
        变换后的形状：(batch, n_head, time, head_dim)
        torch.randn(2, 3, 4, 5).permute(0, 2, 1, 3).shape 会从 (2, 3, 4, 5) 变为 (2, 4, 3, 5)。
        """


        score=q@k.transpose(2,3)/math.sqrt(n_d)
        if mask is not None:
            score=score.masked_fill(mask==0,-10000)
        score=self.softmax(score)@v
        score=score.permute(0,2,1,3).contiguous().view(batch,time,dimension)
        output=self.w_combine(score)
        return output
    
attention=MultiHeadAttention(d_model,n_head)
out=attention(x,x,x)


In [14]:
out.shape

torch.Size([128, 32, 512])

### 3.LayerNorm

In [45]:
class LayerNorm(nn.Module):
    def __init__(self, d_model, eps=1e-12):
        super(LayerNorm,self).__init__()
        self.gamma=nn.Parameter(torch.ones(d_model))
        self.beta=nn.Parameter(torch.zeros(d_model))
        self.eps=eps

    def forward(self,x):
        mean=x.mean(dim=-1,keepdim=True)
        var=x.var(dim=-1,unbiased=False,keepdim=True)
        out=(x-mean)/torch.sqrt(var+self.eps)
        out=self.gamma*out+self.beta
        return out
    
x=torch.rand(128,32,512)
# batchsize,seqlen,feature

d_model=512
n_head=8
ln = LayerNorm(d_model)
ln(x)

    

tensor([[[ 0.5022, -1.1630,  0.6301,  ...,  0.3080,  1.1756, -1.3340],
         [ 0.9980,  1.0208, -0.1344,  ..., -0.2779,  0.1045, -1.6126],
         [ 0.0100,  1.1663, -0.7668,  ..., -0.8234,  0.5684,  1.2563],
         ...,
         [ 0.0997, -0.5090, -1.6685,  ..., -1.3417,  0.9572, -1.2438],
         [ 1.3855, -1.5912, -0.4749,  ..., -0.7076, -1.3256,  1.3456],
         [-1.6904,  1.2213, -1.0067,  ...,  0.6749,  0.6811, -0.7741]],

        [[ 0.0595,  1.1538,  0.7556,  ...,  1.6057, -0.9945, -0.4999],
         [ 1.2773, -0.8449,  1.5972,  ..., -0.6990,  0.1230, -0.7146],
         [-1.3044, -0.2335,  0.4480,  ..., -0.7836, -0.0044,  0.3501],
         ...,
         [-0.1649, -0.2601,  0.1181,  ...,  0.3799,  0.6809,  1.3704],
         [ 0.1422,  1.2184,  0.6697,  ...,  0.3020, -1.6362, -1.5822],
         [-0.6141,  0.8250, -0.5338,  ..., -0.9620,  0.8839,  0.1555]],

        [[ 1.0771,  0.3747,  0.0286,  ..., -0.1873, -0.1480, -0.0377],
         [ 1.6353, -0.6508, -1.6860,  ...,  1

### 4.Encoder

In [None]:
#position-wise FFN
class PositionwiseFeedForward(nn.Module):
    def __init__(self, d_model, hidden, dropoutprob=0.1):
        super(PositionwiseFeedForward,self).__init__()
        self.fc1=nn.Linear(d_model,hidden)
        self.fc2=nn.Linear(hidden,d_model)
        self.dropout=nn.Dropout(dropoutprob)

    def forward(self,x):
        x=self.fc1(x)
        x=F.relu(x)
        x=self.dropout(x)
        x=self.fc2(x)
        return x
        



In [None]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, ffn_hidden, n_head, device,dropoutprob=0.1):
        super(EncoderLayer,self).__init__()
        self.attention=MultiHeadAttention(d_model=d_model,n_head=n_head)
        self.norm1=LayerNorm(d_model=d_model)
        self.dropout1=nn.Dropout(dropoutprob)
        self.ffn=PositionwiseFeedForward(d_model,ffn_hidden,dropoutprob)
        self.norm2=LayerNorm(d_model=d_model)
        self.dropout2=nn.Dropout(dropoutprob)

    def forward(self,x,mask=None):
        _x=x
        x=self.attention(x,x,x,mask)
        x=self.dropout1(x)
        x=self.norm1(x+_x)#残差连接---残差+LayerNorm
        _x=x
        x=self.ffn(x)
        x=self.dropout2(x)
        x=self.norm2(x+_x)#残差连接---残差 +LayerNorm
        return x




In [None]:
class Encoder(nn.Module):
    def __init__(self, enc_voc_size, max_len, d_model, ffn_hidden, n_head, n_layer, device, dropoutprob=0.1):
        # enc_voc_size:词汇表大小
        # max_len:序列最大长度
        # d_model:输入的特征维度
        # ffn_hidden:前馈神经网络隐藏层大小
        # n_head:
        super(Encoder).__init__()
        # embedding:将词汇表索引映射到高维向量空间
        self.embedding=TransformerEmbedding(enc_voc_size,d_model,max_len,dropoutprob,device)
        self.layers=nn.ModuleList(
            [
                EncoderLayer(d_model,ffn_hidden,n_head,device,dropoutprob)
                for _ in range(n_layer)#定义n_layer层EncoderLayer
            ]
        )

    def forward(self,x,s_mask):
        x=self.embedding(x)
        for layer in self.layers:#多层encoder
            x=layer(x,s_mask)
        return x



### 4.Decoder

In [None]:
class DecoderLayer(nn.Module):
    def __init__(self, d_model, ffn_hidden, n_head, dropoutprob=0.1):
        super(DecoderLayer).__init__()
        self.attention1=MultiHeadAttention(d_model,n_head)
        self.norm1=LayerNorm(d_model)
        self.dropout1=nn.Dropout(dropoutprob)
        self.cross_attention=MultiHeadAttention(d_model,n_head)
        self.norm2=LayerNorm(d_model)
        self.dropout2=nn.Dropout(dropoutprob)
        self.ffn=PositionwiseFeedForward(d_model,ffn_hidden,dropoutprob)
        self.norm3=LayerNorm(d_model)
        self.dropout3=nn.Dropout(dropoutprob)

    def forward(self,dec,enc,t_mask,s_mask):
        # t_mask:目标掩码
        # s_mask:源mask
        # dec:解码的输出
        # enc:编码的输出
        _x=dec
        x=self.attention1(dec,dec,dec,t_mask)
        x=self.dropout1(x)
        x=self.norm1(x+_x)
        _x=x

        x=self.cross_attention(x,enc,enc,s_mask)
        x=self.dropout2(x)
        x=self.norm2(x+_x)

        x=self.ffn(x)

        x=self.dropout3(x)
        x=self.norm3(x+_x)


In [None]:
class Decoder(nn.Module):
    def __init__(self, dec_voc_size, max_len, d_model, ffn_hidden, n_head, n_layer,device,dropoutprob):
        super(Decoder,self).__init__()
        self.embedding=TransformerEmbedding(dec_voc_size,d_model,max_len,dropoutprob,device)
        self.layers=nn.ModuleList(
            [
                DecoderLayer(d_model,ffn_hidden,n_head,device,dropoutprob)
                for _ in range(n_layer)#定义n_layer层EncoderLayer
            ]
        )
        self.fc=nn.Linear(d_model,dec_voc_size)

    def forward(self,dec,enc,t_mask,s_mask):
        dec=self.embedding(enc)
        for layer in self.layers:#多层decoder
            dec=layer(dec,enc,t_mask,s_mask)
        dec=self.fc(dec)
        return dec


### 5.Transformer

In [None]:
class Transformer(nn.Module):
    def __init__(self, 
                 src_pad_idx, 
                 trg_pad_idx,
                 enc_voc_size,
                 dec_voc_size,
                 d_model,
                 max_len,
                 n_head,
                 ffn_hidden,
                 n_layer,
                 device,
                 dropoutprob=0.1,
                 ):
        super(Transformer,self).__init__()
        self.encoder=Encoder(enc_voc_size, max_len, d_model, ffn_hidden, n_head, n_layer, device, dropoutprob)
        self.decoder=Decoder(dec_voc_size, max_len, d_model, ffn_hidden, n_head, n_layer,device,dropoutprob)
        self.src_pad_idx=src_pad_idx
        self.trg_pad_idx=trg_pad_idx
        self.device=device

    # padding 掩码  用于注意力里屏蔽掉 PAD 位置
    def make_pad_mask(self,q,k,pad_idx_q,pad_idx_k):
        len_q,len_k=q.size(1),k.size(1)
        q=q.ne(pad_idx_q).unsqueeze(1).unsqueeze(3)
        q=q.repeat(1,1,1,len_k)
        k=k.ne(pad_idx_k).unsqueeze(1).unsqueeze(2)
        k=k.repeat(1,1,len_q,1)
        mask=q&k
        return mask
    
    # 因果掩码（未来掩码）
    def make_causal_mask(self,q,k):
        len_q,len_k=q.size(1),k.size(1)
        mask=torch.tril(torch.ones(len_q,len_k)).type(torch.BoolTensor).to(self.device)
        return mask
    
    def forward(self, src, trg):
        src_mask=self.make_pad_mask(src,src,self.src_pad_idx,self.src_pad_idx)
        trg_mask=self.make_pad_mask(trg,trg,self.trg_pad_idx,self.trg_pad_idx)*self.make_causal_mask(trg,trg)
        enc=self.encoder(src,src_mask)
        out=self.decoder(trg,src,trg_mask,src_mask)
        return out



