# transformer

    - 位置编码：使得输入序列并行训练
    - 多头自注意力机制：
    - 多头注意力机制：
    - add & Nom : 残差连接、层归一化
    - 

# 位置编码

## 正弦、余弦函数编码

In [2]:
# 正弦、余弦函数编码
import numpy as np
import torch
import torch.nn as nn

class PositionalEncodeing(nn.Module):
    def __init__(self,d_model,max_length = 1000):
        '''初始化方法
        
        parameter
        --------------
        d_model ：int
            嵌入向量维度
        max_length ：int
            最大序列长度    
        '''
        super().__init__()
        # 创建位置编码矩阵
        pe = torch.zeros(max_length,d_model)
        # 创建一个一维张量，其元素为从0到max_length-1，便是序列中的各个位置
        # 将形状转（max_length,)换为（max_length,1)，便于后续计算
        position = torch.arange(0,max_length,dtype=torch.float).unsqueeze(1)
        # exp(log(a)*b) = a^b

        div_trem = torch.exp(torch.arange(0,d_model,2) * (-np.log(10000.0)/d_model))
        # d_model必须为偶数，保证奇数长度与偶数长度相同
        # Position*div_trem.shape = (max_length,d_model/2)
        pe[:,0::2] = torch.sin(position * div_trem)
        pe[:,1::2] = torch.cos(position * div_trem)
        # 将pe注册为模型的缓冲区
        # 缓冲区时pytorch中的一种特殊属性，其不会被计算图追踪，不会更新梯度
        # 但是，成为缓冲区后，会成为state_dict的一部分，会随着模型一起保存和加载
        # 当注册缓冲区后，变量就会绑定当前对象，成为当前对象属性
        # 注册属性与绑定属性的区别:
            # 1、缓冲区会随着模型一起保存和加载，但是绑定属性无此功能
            # 2、缓冲区与模型参数一样，会随着模型一起迁移，但绑定属性无此功能
        self.register_buffer('pe',pe)
    
    def forward(self,x):
        # x.shape = (batch_size,seq_length,d_model)
        # 将词嵌入向量与位置张量相加
        x + self.pe[:x.size(1)]
        return x
    
def test():
    d_model = 2
    max_length = 5
    batch_szie = 2
    pos = PositionalEncodeing(d_model,max_length)
    print('位置编码张量形状',pos.pe.shape)

    x = torch.zeros(batch_szie,max_length,d_model)
    x_with_pos = pos(x)
    print(x_with_pos)

test()

位置编码张量形状 torch.Size([5, 2])
tensor([[[0., 0.],
         [0., 0.],
         [0., 0.],
         [0., 0.],
         [0., 0.]],

        [[0., 0.],
         [0., 0.],
         [0., 0.],
         [0., 0.],
         [0., 0.]]])


## 可训练位置编码  
优势：  
    - 灵活性，更具任务自适应调整  
    - 模型性能，可训练编码可以带给模型更好的性能

缺点：  
    - 外推能力，由于编码可训练，无法更好的泛化到训练时为见过的序列长度  
    - 计算成本

In [3]:
class PositionalEncodeing2(nn.Module):
    '''可训练位置编码
    通过可训练的嵌入层，实现位置编码
    '''

    def __init__(self,d_model,max_length):
        super().__init__()
        self.d_model = d_model
        self.max_length = max_length
        # 创建可学习的位置嵌入
        self.position_embedding = nn.Embedding(max_length,d_model)

    def forward(self,x):
        seq_len = x.size(1)
        # 生成位置索引
        pos_indices = torch.arange(seq_len,device =x.device,dtype=torch.long).unsqueeze(0)
        # 从嵌入层中获取位置编码
        pos_embedding = self.position_embedding(pos_indices)
        return x+pos_embedding
    
def test():
    d_model = 2
    max_length = 5
    batch_szie = 2
    pos = PositionalEncodeing2(d_model,max_length)
    print('位置编码张量形状',pos.position_embedding.weight.shape)

    x = torch.zeros(batch_szie,max_length,d_model)
    x_with_pos = pos(x)
    print(x_with_pos)

test()

位置编码张量形状 torch.Size([5, 2])
tensor([[[-1.3116, -0.7478],
         [-0.5468,  0.0461],
         [-0.2335,  1.6065],
         [-0.7880,  0.4872],
         [ 0.4613,  0.4113]],

        [[-1.3116, -0.7478],
         [-0.5468,  0.0461],
         [-0.2335,  1.6065],
         [-0.7880,  0.4872],
         [ 0.4613,  0.4113]]], grad_fn=<AddBackward0>)


# 注意力机制
q,k,v

## 自注意力机制
当q,k,v都是同一个张量，就是自注意力机制    
为什么需要自注意力，因为输入的词，经过词嵌入、位置编码，但依然没有彼此之间的关系，自注意力机制，就是，词与词之间的关系  
self_attention.shape = (seq_length,seq_length)  
例：  
seq = [x1,x3,x3,x4].shape = (n)  
embedding_seq.shape = (n,d_model)  
self_attention_seq = [[w1*x1+w2*x2+w3*x3+w4*x4],
                      [w1*x1+w2*x2+w3*x3+w4*x4],
                      [w1*x1+w2*x2+w3*x3+w4*x4].
                      [w1*x1+w2*x2+w3*x3+w4*x4]]  
self_attention_seq.shape = (n,d_model)                         
序列x.shape = (n,d_model)  
q = x*wq wq.shape = (d_model,dq)  
k = x*wk wk.shape = (d_model,dk)  
v = x*wv wv.shape = (d_model,dv)

scores = q*k.T/sqrt(dk)  
weigths = softmax(scores)  
context = weigths*v


## 多头自注意力机制
将注意力矩阵中的QKV，切分成多份  
q_h = x*wq wq.shape = (d_model/h,dq)  
k_h = x*wk wk.shape = (d_model/h,dk)  
v_h = x*wv wv.shape = (d_model/h,dv)

In [4]:
class MultiHeadAttention(nn.Module):
    def __init__(self,d_model,num_head,p = 0.1):
        super().__init__()
        if d_model % num_head != 0:
            raise ValueError(f'd_model({d_model})需要能被num_head({num_head})整除')
        self.d_model = d_model
        self.num_head =num_head
        self.head_dim = d_model//num_head
        self.q_proj = nn.Linear(d_model,d_model)
        self.k_proj = nn.Linear(d_model,d_model)
        self.v_proj = nn.Linear(d_model,d_model)
        self.out_proj = nn.Linear(d_model,d_model)
        self.dropout = nn.Dropout(p)

    def forward(self,query,keys,values,atten_mask = None,key_padding_mask = None):
        '''parameters
        -----------------
        query:torh.tensor shape = (batch_size,traget_seq_len,d_model)
            查询张量，在编码器中，target就是srcoe
        keys:torh.tensor shape = (batch_size,src_seq_len,d_model)
        values:torh.tensor shape = (batch_size,src_seq_len,d_model)

        return
        ---------------------
        output : torch.tensor shape = (batch_szie,tgt_seq_length,d_model)
        atten_weigths : torch.tensor shape = (batch_szie,num_head,tgt_seq_length,src_seq_length)
        '''
        batch_szie = query.size(0)
        # 线性变换
        q = self.q_proj(query)
        k = self.k_proj(keys)
        v = self.v_proj(values)

        # 将qkv拆分为多个头
        # q.shape = (batch_szie,num_head,tgt_seq_length,head_dim)
        q = q.view(batch_szie,-1,self.num_head,self.head_dim).transpose(1,2)
        k = k.view(batch_szie,-1,self.num_head,self.head_dim).transpose(1,2)
        v = v.view(batch_szie,-1,self.num_head,self.head_dim).transpose(1,2)

        # 计算注意力权重
        scores = torch.matmul(q,k.transpose(-2,-1))/self.head_dim**0.5
        atten_weigths = torch.softmax(scores,dim=-1)
        atten_weigths = self.dropout(atten_weigths)

        # 计算注意力输出
        atten_output = torch.matmul(atten_weigths,v)

        # 合并多头
        atten_output = atten_output.transpose(1,2).contiguous().view(batch_szie,-1,self.d_model)
        # 线性变换输出
        output = self.out_proj(atten_output)
        return output,atten_weigths

In [5]:
def test():
    d_model = 10
    batch_szie = 2
    src_seq_length = 4
    tgt_seq_length = 3
    num_head = 2


    q = torch.randn(batch_szie,tgt_seq_length,d_model)
    k = torch.randn(batch_szie,src_seq_length,d_model)
    v = torch.randn(batch_szie,src_seq_length,d_model)

    atten = MultiHeadAttention(d_model,num_head)
    output,atten_weigths = atten(q,k,v)
    print('输出形状',output.shape)
    print('注意力权重形状',atten_weigths.shape)
test()
    

输出形状 torch.Size([2, 3, 10])
注意力权重形状 torch.Size([2, 2, 3, 4])


## 掩码多头自注意力机制

    解码器中，当输入第一个开始字符后，无法全部计算注意力，因此，需要设置掩码。  
    在计算得到scores后，加一个M矩阵（上三角矩阵，值为-∞）  
    然后进行，softmax计算

      作用：
        1、防止信息泄露
        2、捕捉序列内部依赖关系
        3、      

In [26]:
class MultiHeadAttention(nn.Module):
    def __init__(self,d_model,num_head,p = 0.1):
        super().__init__()
        if d_model % num_head != 0:
            raise ValueError(f'd_model({d_model})需要能被num_head({num_head})整除')
        self.d_model = d_model
        self.num_head =num_head
        self.head_dim = d_model//num_head
        self.q_proj = nn.Linear(d_model,d_model)
        self.k_proj = nn.Linear(d_model,d_model)
        self.v_proj = nn.Linear(d_model,d_model)
        self.out_proj = nn.Linear(d_model,d_model)
        self.dropout = nn.Dropout(p)

    def forward(self,query,keys,values,atten_mask = None,key_padding_mask = None):
        '''parameters
        -----------------
        query:torh.tensor shape = (batch_size,traget_seq_len,d_model)
            查询张量，在编码器中，target就是srcoe
        keys:torh.tensor shape = (batch_size,src_seq_len,d_model)

        values:torh.tensor shape = (batch_size,src_seq_len,d_model)

        atten_mask : torch.tensor shape = (tgt_seq_len,src_seq_len)
            注意力掩码张量
        key_padding_mask : torch.tensor shape = (batch_size,src_seq_len) 


        return
        ---------------------
        output : torch.tensor shape = (batch_szie,tgt_seq_length,d_model)
        atten_weigths : torch.tensor shape = (batch_szie,num_head,tgt_seq_length,src_seq_length)
        '''
        batch_szie = query.size(0)
        # 线性变换
        q = self.q_proj(query)
        k = self.k_proj(keys)
        v = self.v_proj(values)

        # 将qkv拆分为多个头
        # q.shape = (batch_szie,num_head,tgt_seq_length,head_dim)
        q = q.view(batch_szie,-1,self.num_head,self.head_dim).transpose(1,2)
        k = k.view(batch_szie,-1,self.num_head,self.head_dim).transpose(1,2)
        v = v.view(batch_szie,-1,self.num_head,self.head_dim).transpose(1,2)

        # 计算注意力权重
        scores = torch.matmul(q,k.transpose(-2,-1))/self.head_dim**0.5

        # 掩码矩阵,在pytorch中，有float类型和bool类型
        # 填充掩码
        if key_padding_mask is not None:
           # 扩展与主力scores具有相同的形状
            mask = key_padding_mask.view(batch_szie,1,1,-1)
            if mask.dtype == torch.bool :
               scores = scores.masked_fill(mask,float('-inf'))
            else:
               scores += mask.to(scores.dtype)
        # 注意力掩码 
        if atten_mask is not None:
            # 省去对齐形状，因为可以自动广播
            # mask = key_padding_mask(1,1,tgt_seq_len,src_seq_len)
            if atten_mask.dtype == torch.bool:
                scores = scores.masked_fill(atten_mask,float('-inf'))
            else:
                scores += atten_mask.to(scores.dtype)


        atten_weigths = torch.softmax(scores,dim=-1)
        atten_weigths = self.dropout(atten_weigths)

        # 计算注意力输出
        atten_output = torch.matmul(atten_weigths,v)

        # 合并多头
        atten_output = atten_output.transpose(1,2).contiguous().view(batch_szie,-1,self.d_model)
        # 线性变换输出
        output = self.out_proj(atten_output)
        return output,atten_weigths

In [61]:
def test():
    batch_size = 1
    d_model = 10
    src_seq_len = 4
    tgt_seq_len = 3
    num_layer = 2

    query = torch.randn(batch_size,tgt_seq_len,d_model)
    keys = torch.randn(batch_size,src_seq_len,d_model)
    values = torch.randn(batch_size,src_seq_len,d_model)
    key_padding_mask = torch.zeros(batch_size,src_seq_len,dtype = torch.bool)
    key_padding_mask[:,-1] = True
    attn = MultiHeadAttention(d_model,num_layer)
    output,atten_weigth = attn(query,keys,values,key_padding_mask=key_padding_mask)
    print('输出形状',output.shape)
    print('注意力权重的形状',atten_weigth.shape)
    print('注意力权重',atten_weigth)

    # 生产上三角矩阵
    cansal_mask = torch.triu(torch.full((tgt_seq_len,src_seq_len),float('-inf')),diagonal = 1)
    print('因果掩码',cansal_mask)
    _,atten_weigths = attn(query,keys,values,atten_mask=cansal_mask,)
    print('加因果掩码的注意力权重',atten_weigths)

test()

输出形状 torch.Size([1, 3, 10])
注意力权重的形状 torch.Size([1, 2, 3, 4])
注意力权重 tensor([[[[0.1402, 0.5501, 0.0000, 0.0000],
          [0.5057, 0.0000, 0.3098, 0.0000],
          [0.5500, 0.2913, 0.2698, 0.0000]],

         [[0.3939, 0.3612, 0.3561, 0.0000],
          [0.3050, 0.5067, 0.2994, 0.0000],
          [0.4453, 0.3484, 0.3174, 0.0000]]]], grad_fn=<MulBackward0>)
因果掩码 tensor([[0., -inf, -inf, -inf],
        [0., 0., -inf, -inf],
        [0., 0., 0., -inf]])
加因果掩码的注意力权重 tensor([[[[0.0000, 0.0000, 0.0000, 0.0000],
          [0.7012, 0.4099, 0.0000, 0.0000],
          [0.5500, 0.2913, 0.2698, 0.0000]],

         [[0.0000, 0.0000, 0.0000, 0.0000],
          [0.4175, 0.6937, 0.0000, 0.0000],
          [0.4453, 0.0000, 0.0000, 0.0000]]]], grad_fn=<MulBackward0>)


In [23]:
cansal_mask = torch.triu(torch.full((3,4),float('inf')),diagonal=1)
cansal_mask

tensor([[0., inf, inf, inf],
        [0., 0., inf, inf],
        [0., 0., 0., inf]])

# add&norm

LayerNormalization:层归一化，更多用于序列处理  
batchNormalization:批归一化，更多用于图像处理  

作用：  
1、稳定训练过程  
2、提高模型泛化能力  
3、减轻内部协变量便宜

In [6]:
# add & normalization

class layernorm(nn.Module):
    def __init__(self,normalized_shape,eps = 1e-5):
        '''
        parameter
        ---------------
        normalized_shape:int or tuple
            需要归一化的特征维度，例如，d_model或（seq_length,d_model)
        eps :float
            一个很小的值，防止分母为0
        '''
        super().__init__()
        if isinstance(normalized_shape,int):
            normalized_shape = (normalized_shape, )
        self.normalized_shape = tuple(normalized_shape)
        self.eps = eps
        # 初始化可学习的参数
        self.gamma = nn.Parameter(torch.ones(*self.normalized_shape))
        self.beta = nn.Parameter(torch.zeros(*self.normalized_shape))
    
    def forward(self,x):
        # 确定需要归一化的维度（最后len(normalized_shape)个维度）。
        dims = list(range(-len(self.normalized_shape),0))
        # 沿着特征维度，计算均值、标准差
        # mean,var,shape = (batch_szie,seq_len,1) [假设，normalized_shape长度为1]
        mean = x.mean(dim = dims,keepdim = True)
        # unbiase,是否进行无偏差估计。（x_i - x^）/(n)为有偏差，（n-1）为无偏差
        var = x.var(dim = dims,keepdim = True , unbiased = False)

        # x_normalized shape = (batch_size,seq_len,d_model)
        x_normalized = (x-mean)/torch.sqrt(var+self.eps)

        return self.gamma * x_normalized + self.beta


In [7]:
def test():
    batch_size = 2
    seq_len = 3
    d_model = 5

    x = torch.randint(100,200,size=(batch_size,seq_len,d_model)).float()
    print(x)

    ln = layernorm(d_model)
    print(ln(x))

test()

tensor([[[179., 155., 158., 110., 110.],
         [126., 188., 100., 101., 103.],
         [180., 175., 102., 158., 174.]],

        [[137., 126., 134., 103., 107.],
         [124., 100., 198., 107., 191.],
         [108., 123., 123., 164., 101.]]])
tensor([[[ 1.3205,  0.4546,  0.5628, -1.1690, -1.1690],
         [ 0.0714,  1.9166, -0.7024, -0.6726, -0.6131],
         [ 0.7692,  0.5960, -1.9334,  0.0069,  0.5613]],

        [[ 1.1205,  0.3304,  0.9050, -1.3216, -1.0343],
         [-0.4759, -1.0470,  1.2850, -0.8805,  1.1184],
         [-0.7232, -0.0366, -0.0366,  1.8399, -1.0435]]],
       grad_fn=<AddBackward0>)


In [8]:
# add
class addNorm(nn.Module):
    '''残差链接，层归一化'''
    def __init__(self,d_model,p=0.1):
        super().__init__()
        self.norm = layernorm(d_model)
        self.dropout = nn.Dropout(p)

    def forward(self,x,sublayer_output):
        '''
        parameter:
        --------------
        x ：toech.tensor shape = (batch_size,seq_len,d_model)
            上一个子层的输入(自注意力层、前馈神经网络层)
        sublayer_output ：torch.tensor shape = (batch_size,seq_len,d_model)
            上一个子层的输出
        '''
        # add
        output = x + self.dropout(sublayer_output)
        # norm
        output = self.norm(output)
        return output

# 前馈神经网络层（feed forward）

ffn(x) = relu(xW1 + b1)W2 + b2  
x:输入矩阵
w1和w2:线性变换权重矩阵，w1.shape =（d_model,dff）,w2.shape = (dff,d_model),要对原始数据进行升维和降维操作，因此，dff = 4*d_model  
b1,b2:偏执，（1，dff）,(1,d_model)  

提神模型的表达能力，通过引入更高维度的中间维度，可以捕捉到输入的更多复杂特征


In [9]:
class FFN(nn.Module):
    def __init__(self,d_model,dff,p=0.1):
        '''
        parameter
        --------------
        d_model : int

        '''
        super().__init__()
        if dff is None:
            dff = 4*d_model

        self.ffn = nn.Sequential(nn.Linear(d_model,dff),
                                 nn.ReLU(),
                                 nn.Dropout(p),
                                 nn.Linear(dff,d_model)
        )

    def forward(self,x):
        output = self.ffn(x)
        return output

# 编码器-解码器注意力子层