## basic Transformer code

[资料](https://uvadlc-notebooks.readthedocs.io/en/latest/tutorial_notebooks/tutorial6/Transformers_and_MHAttention.html)

https://www.kaggle.com/code/arunmohan003/transformer-from-scratch-using-pytorch

model figure are form *Attention is all you need*

![](../doc_images/learn/model.png)

In [94]:
import torch
import torch.nn as nn
import math
import numpy as np

### Postional Encoding

In [95]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len, dropout=0.0):
        """
        :param d_model: length of vector
        :param max_len: max sequence length
        :param dropout: dropout rate
        """
        super().__init__()
        
        self.d_model = d_model
        self.dropout = nn.Dropout(dropout)
        self.encoding = torch.zeros(1,max_len, d_model)
        self.encoding.requires_grad = False
        
        pos = torch.arange(0,max_len)
        pos = pos.float().unsqueeze(dim=1)
        
        _2i = torch.arange(0, d_model, step=2).float()
        self.encoding[:, :, 0::2] = torch.sin(pos / (10000 ** (_2i / d_model)))
        self.encoding[:, :, 1::2] = torch.cos(pos / (10000 ** (_2i / d_model)))
        
    def forward(self,x):
        x = x * math.sqrt(self.d_model)
        x = x + self.encoding.to(x.device)
        return self.dropout(x)

这个模块完成的模型输入部分，主要是包括编码向量和位置编码的相加。同时也考虑了dropout层和bacth_size

### Scale Dot-Product attention
![](../doc_images/learn/Scaled_Dot_Product_attention.png)

注意力机制中是需要相应的计算方式的，Transformer使用的是点积注意力机制。

In [96]:
def Scale_Dot_Produce_attention(query,key,value,mask=None):
    d_k = query.size(-1)
    scores = torch.matmul(query,key.transpose(-2,-1)) \
                                / math.sqrt(d_k)
    
    if mask is not None:
        scores = scores.masked_fill(mask == 0,"-inf")
    attention = torch.matmul(nn.softmax(dim = -1)(scores),value)
    return attention

### multi head attention

![](../doc_images/learn/multihead_attention.png)

论文的描述是通过多个W（线性层）映射到子空间里，但是实际操作时是通过一个映射然后切分，并且在代码操作中也没有切分，而是直接通过改变形状实现了并行运算。

In [97]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()
        '''
        q,k,v.shape = (b:batch_size,l:seq_len,d_m: dim of input)
        '''
        assert d_model % num_heads == 0
        
        self.d_model = d_model
        self.num_heads = num_heads
        self.head_dim = d_model // num_heads
        
        self.query_linear = nn.Linear(d_model, d_model)
        self.key_linear = nn.Linear(d_model, d_model)
        self.value_linear = nn.Linear(d_model, d_model)
        self.output_linear = nn.Linear(d_model, d_model)
        
    def forward(self, query,key,value, mask=None):
        batch_size = query.size(0)
        
        # Linear transformation for query, key, and value
        query = self.query_linear(query)   # (b,l,d_m) -> (b,l,d_m)
        key = self.key_linear(key)
        value = self.value_linear(value)
        
        # Splitting heads
        query = query.view(batch_size, -1, self.num_heads, self.head_dim).transpose(1, 2) 
        key = key.view(batch_size, -1, self.num_heads, self.head_dim).transpose(1, 2)
        value = value.view(batch_size, -1, self.num_heads, self.head_dim).transpose(1, 2)
        '''
        .view(): (b,l,d_m) -> (b,l,h,d_m/h)
        .transpose(1,2): (b,l,h,d_m/h) -> (b,h,l,d_m/h)
        '''
        # Attention scores and scaled dot-product attention
        scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(self.head_dim) #q*transpose(k) (b,h,l,d_m/h) * (b,h,d_m/h,l) = (b,h,l,l)
       
        if mask is not None:
            scores = scores.masked_fill(mask == 0, float('-inf')) # make zero to -inf
        
        attention = nn.Softmax(dim=-1)(scores)
        output = torch.matmul(attention, value) # (b,h,l,l) * (b,h,l,d_m/h) = (b,h,l,d_m/h)
        
        # Concatenating heads and linear transformation
        output = output.transpose(1, 2).view(batch_size, -1, self.d_model) 
        '''
        .transpose(1,2): (b,h,l,d_m/h) -> (b,l,h,d_m/h)
        .view() : (b,l,h,d_m/h) -> (b,l,d_m)
        '''
        output = self.output_linear(output)
        
        return output,attention


### Encoder block

接下来实现它的Encoder部分，这部分要考虑的就是残差连接，层标准化和前馈神经网络。

先残差了连接，然后在层标准化。

In [98]:
class EncoderBlock(nn.Module):
    def __init__(self,d_model,num_heads,dim_FFN,dropout=0.0):
        super().__init__()
        self.self_attention = MultiHeadAttention(d_model,num_heads)
        
        #Feed forward
        self.FFW = torch.nn.Sequential(
            nn.Linear(d_model,dim_FFN),
            nn.Dropout(dropout),
            nn.ReLU(),
            nn.Linear(dim_FFN,d_model))
        
        #LayerNorm
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self,x,mask = None):
        atten_out,atten = self.self_attention(x,x,x,mask)
        x = x + self.dropout(atten_out)
        x = self.norm1(x)
        x = self.FFW(x)
        x = x + self.dropout(atten_out)
        x = self.norm2(x)
        
        return x

In [99]:
class TransformerEncoder(nn.Module):
    
    def __init__(self,n_layers,max_len,d_model,num_heads,dim_FNN,dropout):
        super().__init__()
        self.input = PositionalEncoding(d_model,max_len)
        self.layers = nn.ModuleList([EncoderBlock(d_model,num_heads,dim_FNN,dropout) for _ in range(n_layers)])
    
    def forward(self,x,mask=None):
        x = self.input(x)
        for l in self.layers:
            x = l(x,mask = mask)
        return x
    
    def get_attention_maps(self,x,mask=None):
        attention_maps = []
        for l in self.layers:
            _,atten_map = l.self_attention(x,x,x,mask)
            attention_maps.append(atten_map)
            x = l(x)
        return attention_maps

### Dcoder block

Docoder相较于Encoder比较麻烦，存在一个mask操作，mask的含义是，在t时刻的输入，decoder的输入是无法看到t+1时刻的内容的，那么在实际操作的时候，是通过一个三角矩阵（右上角为0）将后面的内容变为0，在代码中替换为-inf。~~因此好像这一步操作不是在模型代码中构建的，而是在训练过程中构造的。~~

在attention模块中，已经实现了mask的操作，即对query，key计算得到的权重进行mask:
```
if mask is not None:
    scores = scores.masked_fill(mask == 0, float('-inf'))
```
他使用的一个下三角矩阵，相当于覆盖在权重矩阵上，将三角矩阵为零的对应权重矩阵的位置替换为负无穷（需要理解masked_fill的原理）。

因此在构建Decoder时直接堆叠已经建好的模块即可。

在Decoder中的decoder-encoder attntion的q,k,v：其中query来自于Decoder的self atention，k,v来自于encoder的输出

In [100]:
class DecoderBlock(nn.Module):
    def __init__(self,d_model,num_heads,dim_FFN,dropout=0.0):
        super().__init__()
        self.self_attention = MultiHeadAttention(d_model,num_heads)
        
        #Feed forward
        self.FFW = torch.nn.Sequential(
            nn.Linear(d_model,dim_FFN),
            nn.Dropout(dropout),
            nn.ReLU(),
            nn.Linear(dim_FFN,d_model))
        
        #LayerNorm
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self,x,memory,src_mask = None,tar_mask = None):
        self_atten_out,_ = self.self_attention(x,x,x,tar_mask)
        x = x + self.dropout(self_atten_out)
        x = self.norm1(x)
        encoder_decoder_atten,_ = self.self_attention(x,memory,memory,src_mask)
        x = x + self.dropout(encoder_decoder_atten)
        x = self.norm2(x)
        x = self.FFW(x)
        x = x + self.dropout(x)
        x = self.norm3(x)
        
        return x

In [101]:
class TransformerDecoder(nn.Module):
    
    def __init__(self,n_layers,max_len,d_model,num_heads,dim_FNN,dropout):
        super().__init__()
        self.input = PositionalEncoding(d_model,max_len)
        self.layers = nn.ModuleList([DecoderBlock(d_model,num_heads,dim_FNN,dropout) for _ in range(n_layers)])
    
    def forward(self,x,memory,src_mask = None,tar_mask = None):
        x = self.input(x)
        for layer in self.layers:
            x = layer(x,memory,src_mask,tar_mask)
        return x

### 最终的Transformer

In [102]:
class Transformer(nn.Module):
    def __init__(self,encoder,decoder):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
    
    def forward(self,x,src_mask, tar_mask, tar):
        x = self.encoder(x,src_mask)
        return self.decoder(tar,x,src_mask,tar_mask)
            

In [103]:
n_layers = 4
max_len = 30
d_model= 512
num_heads = 8
dim_FNN = 128
dropout = 0.0

In [104]:
encoder = TransformerEncoder(n_layers,max_len,d_model,num_heads,dim_FNN,dropout)
decoder = TransformerDecoder(n_layers,max_len,d_model,num_heads,dim_FNN,dropout)
model = Transformer(encoder,decoder)
model

Transformer(
  (encoder): TransformerEncoder(
    (input): PositionalEncoding(
      (dropout): Dropout(p=0.0, inplace=False)
    )
    (layers): ModuleList(
      (0-3): 4 x EncoderBlock(
        (self_attention): MultiHeadAttention(
          (query_linear): Linear(in_features=512, out_features=512, bias=True)
          (key_linear): Linear(in_features=512, out_features=512, bias=True)
          (value_linear): Linear(in_features=512, out_features=512, bias=True)
          (output_linear): Linear(in_features=512, out_features=512, bias=True)
        )
        (FFW): Sequential(
          (0): Linear(in_features=512, out_features=128, bias=True)
          (1): Dropout(p=0.0, inplace=False)
          (2): ReLU()
          (3): Linear(in_features=128, out_features=512, bias=True)
        )
        (norm1): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
        (norm2): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
        (dropout): Dropout(p=0.0, inplace=False)
      )
 

## Summary

基本上已经完全里清楚Transformer的模型框架了，上面的代码目前看起来没有问题，不过还需要实际测试才能知道。还有就是对于输入的嵌入并没有定义，因为直接使用`nn.embedding()`层可行是可行，不过直观觉得也需要自己来定义编码方式。其次，原始的模型框架是用来进行翻译的会有soruce_sequence和target_sequence。但是我并不做机器翻译，现有的框架并不适合我解决问题，需要根据自己的内容来进行修改。