# transformer 原理

In [1]:
import torch
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)
print(torch.__version__)

cpu
1.12.0+cu113


  from .autonotebook import tqdm as notebook_tqdm


## encoder 部分

In [2]:
# 1. wordEmbedding
# 2. PositionEncoding
# 3. encoding self-attention mask
#             (来得到表征有效， 不希望吸收到padding的key)
# 4. deocder 的masked Multi-head Attention
#             (保证因果性)
# 5. cross-attention 的mask 与encoder的mask类似，但是涉及到两个序列的mask
 
import torch.nn as nn
import numpy as np
import torch.nn.functional as F

### word embedding 

In [3]:
# 关于wordembedding, 以序列建模为例
# 考虑source sentense 和 target sentence

# 构建序列， 序列的字符以其在词表中的索引的形式表示
batch_size = 2
# src_len = torch.randint(2, 5, (batch_size, ))  # min, max, shape
# tgt_len = torch.randint(2, 5, (batch_size, ))

# 单词表大小
max_num_src_words = 8
max_num_tgt_words = 8

# 模型维度，embedding维度
model_dim = 8

# 序列最大长度
max_src_seq_len = 5
max_tgt_seq_len = 5

src_len = torch.Tensor([2, 4]).to(torch.int32)
tgt_len = torch.Tensor([4, 3]).to(torch.int32)

# 单词索引序列表示句子，源句子和目标句子
## pad 使一个batch的序列长度一样 在右边pad 默认pad 0.
src_seq = torch.cat(
    [torch.unsqueeze(F.pad(torch.randint(1, max_num_src_words, size=(L, )), (0, max(src_len)-L)), 0) 
    for L in src_len]
)
tgt_seq = torch.cat(
    [torch.unsqueeze(F.pad(torch.randint(1, max_num_tgt_words, size=(L, )), (0, max(tgt_len)-L)), 0)
    for L in tgt_len]

)


# 构造embedding
## （num_embedding, embed_dim）
 
src_embedding_table = \
    nn.Embedding(max_num_src_words+1, model_dim, padding_idx=0)
tgt_embedding_table = \
    nn.Embedding(max_num_tgt_words+1, model_dim, padding_idx=0)
## 每一行代表一个embedding 向量， 第0行是padding对应

## 尝试调用src_embedding_table求embedding(相当于调用forward方法)
src_embedding = src_embedding_table(src_seq)
tgt_embedding = tgt_embedding_table(tgt_seq)

### postion embedding

In [4]:
# 构造position_embedding
## sin/cos 1. 泛化能力比较强， 2. 具有确定性， 3. 可以用前面的构造后面的
## 奇数列用cos, 偶数列sin 
## gradient不用计算
max_position_len = 5
## pos维度是序列不同位置
## i_mat是特征维度
pos_mat = torch.arange(max_position_len).reshape((-1, 1))
i_mat = torch.pow(10000, torch.arange(0, 8, 2).reshape((1, -1))/model_dim)

# 计算position embedding
pe_embedding_table = torch.zeros(max_position_len, model_dim)
## 偶数特征维
pe_embedding_table[:, 0::2] = torch.sin(pos_mat / i_mat)
## 奇数特征维
pe_embedding_table[:, 1::2] = torch.cos(pos_mat / i_mat)

# 用nn.Embedding()封装 
pe_embedding = nn.Embedding(max_position_len, model_dim)
pe_embedding.weight = nn.Parameter(pe_embedding_table, requires_grad=False)

# 生成位置编码 就是 0， 1， 2， ... pos_idx
src_pos = torch.cat(
    [torch.unsqueeze(torch.arange(max(src_len)), 0) for _ in src_len], 0
).to(torch.int32)

tgt_pos = torch.cat(
    [torch.unsqueeze(torch.arange(max(tgt_len)), 0) for _ in tgt_len], 0
).to(torch.int32)

# 生成pe向量
src_pe = pe_embedding(src_pos)
tgt_pe = pe_embedding(tgt_pos)

# print(tgt_pe, tgt_pe.shape) # (bs, sl, fs)

## 为什么用sin/cos？
### 有泛化能力： 当inference阶段我们得到的序列长度更大，我们可以用较小的pe
### 线性组合构造出pos更大的pe

## 构造encoder的self-attn mask

In [7]:
# 难点！ 
# 构造encoder的self-attn mask
# mask的shape: [batch_size, max_src_len, max_src_len], 值为1或者-inf
# ···· 因为 我们要用 mask 与 (Q@K.T)/sqrt(d_k) 在softmax之前做elemet-wise
# .... 的乘法，乘到-inf的元素的值softmax之后将变成0，于是注意力为0 最后@V

## 要用到src_len, max_src_seq_len
### 构建有效的编码器的位置
valid_encoder_pos = torch.cat(
[torch.unsqueeze(F.pad(torch.ones(L), (0, max(src_len)-L)), 0)
 for L in src_len]
) # 得到有效位置的序列信息
## 扩充维度
valid_encoder_pos = torch.unsqueeze(valid_encoder_pos, -1)
## 做外积得到有效相关性的矩阵
valid_encoder_mat = torch.bmm(valid_encoder_pos, valid_encoder_pos.transpose(-2, -1))
# print(valid_encoder_mat)
invalid_encoder_mat = (1 - valid_encoder_mat).to(torch.bool)
## 假想一个score(Q@K.T)/sqrt(d_k)
score = torch.randn(batch_size, max(src_len), max(src_len))
## 在score的invalid_encoder_mat为True的地方填充-inf(1e-9)
score = score.masked_fill(invalid_encoder_mat, -1e9)
prob = F.softmax(score, -1)
## pad的地方score被填充成-1e9经过softmax之后注意力变成0
# print(src_len)
# print(invalid_encoder_mat)
# print(score)
# print(prob)
## 如果每一个位置都被mask掉，那么输出概率是一个均匀分布，
## 但是在计算loss的时候，我们也会传入一个mask。



In [6]:
# softmax 演示
# scaled dot-product attn 中 scale的重要性

## q 和 k 的点乘， 方差变小一点 1. softmax出来的概率分布就不会特别尖锐
##                2. 反向求导的jacobbi矩阵的数值会比较稳定，不会接近于0

alpha1 = 0.1
alpha2 = 10

score = torch.randn(2, 5)

# 概率分布
prob1 = F.softmax(score*alpha1, -1)
prob2 = F.softmax(score*alpha2, -1)
# print (prob1)
# print (prob2) # 发现概率分布差距变得很大（尖锐）

# jacobbi 矩阵
def softmax_func(score):
    return F.softmax(score, -1)

jaco_mat1 = torch.autograd.functional.jacobian(softmax_func, score*alpha1)
jaco_mat2 = torch.autograd.functional.jacobian(softmax_func, score*alpha2)
# print(jaco_mat1)
# print(jaco_mat2) # 发现jaco_mat2的数值变得很小

## Decoder 部分

## intro-mask的decoder 的mask

In [16]:
# 交叉attn mask
## decoder的输出作为query，encoder的内容作为key和value


# 构造intro-attn的mask部分, 需要考虑encoder和decoder的mask部分
## Q@K.T shape：[batch_size, tgt_seq, src_seq]

### 构建有效的编码器的位置
valid_encoder_pos = torch.cat(
[torch.unsqueeze(F.pad(torch.ones(L), (0, max(src_len)-L)), 0)
 for L in src_len]
) # 得到有效位置的序列信息
## 扩充维度
valid_encoder_pos = torch.unsqueeze(valid_encoder_pos, -1)

valid_decoder_pos = torch.cat(
[torch.unsqueeze(F.pad(torch.ones(L), (0, max(tgt_len)-L)), 0)
 for L in tgt_len]
)
valid_decoder_pos = torch.unsqueeze(valid_decoder_pos, -1)
# print(valid_encoder_pos)
# print(valid_decoder_pos)

#做bmm 做矩阵相乘
valid_cross_pos = torch.bmm(valid_decoder_pos, valid_encoder_pos.transpose(-2, -1))
# print(valid_cross_pos)


invalid_cross_pos_mat = 1-valid_cross_pos
mask_cross_attn = invalid_cross_pos_mat.to(torch.bool)

## 假想有一个score shape:[batch_size, tgt_seq, src_seq]
score = torch.randn(batch_size, max(tgt_len), max(src_len))
masked_score = score.masked_fill(mask_cross_attn, -1e9)
prob = F.softmax(masked_score, -1)
print(score)
print(masked_score)
print(prob)

tensor([[[-1.6556, -1.4813,  0.0426, -0.3118],
         [ 0.2175,  0.5590, -1.3931, -0.3189],
         [-1.1638, -0.0730, -1.0885, -0.6744],
         [-0.6843,  0.6765, -1.0622, -0.4397]],

        [[ 0.8388,  0.2593,  0.1865, -0.0054],
         [-1.3871,  1.5679,  0.0312, -1.0996],
         [-0.0787, -0.7787,  0.3698,  0.3719],
         [-1.3029, -1.0814,  0.4262,  0.8410]]])
tensor([[[-1.6556e+00, -1.4813e+00, -1.0000e+09, -1.0000e+09],
         [ 2.1748e-01,  5.5904e-01, -1.0000e+09, -1.0000e+09],
         [-1.1638e+00, -7.3021e-02, -1.0000e+09, -1.0000e+09],
         [-6.8430e-01,  6.7645e-01, -1.0000e+09, -1.0000e+09]],

        [[ 8.3881e-01,  2.5926e-01,  1.8649e-01, -5.3785e-03],
         [-1.3871e+00,  1.5679e+00,  3.1219e-02, -1.0996e+00],
         [-7.8655e-02, -7.7867e-01,  3.6984e-01,  3.7194e-01],
         [-1.0000e+09, -1.0000e+09, -1.0000e+09, -1.0000e+09]]])
tensor([[[0.4565, 0.5435, 0.0000, 0.0000],
         [0.4154, 0.5846, 0.0000, 0.0000],
         [0.2515, 0.7485, 

### decoder mask 实现

In [42]:
## decoder self-attn的mask
### output 不是一次性的把所有目标序列输出出来，而是自回归的

### 为了在训练阶段和inference阶段保持一致，
### 所以要在训练阶段的每一个位置的答案mask，每次只放出要预测的时间步之前的
### 所以这个mask应该是一个三角形的
valid_decoder_tril_mat = torch.cat(
[ 
    torch.unsqueeze(F.pad(torch.tril(torch.ones(L, L)), 
                     (0, max(tgt_len)-L, 0, max(tgt_len)-L)), 0)
for L in tgt_len
])
# print(valid_decoder_tril_mat)
# print(valid_decoder_tril_mat.shape)
# 获得掩码矩阵
invalid_decoder_pos_mat = (1-valid_decoder_tril_mat).to(torch.bool)
# print(invalid_decoder_pos_mat)

# 假想score
score = torch.randn(batch_size, max(tgt_len), max(tgt_len))
masked_score = score.masked_fill(invalid_decoder_pos_mat, -1e9)
prob = F.softmax(masked_score, -1)
print(score)
print(masked_score)
print(prob)

tensor([[[-1.5939, -1.4200,  0.5034, -1.7348],
         [-2.1207,  1.1539,  0.2158, -0.3148],
         [-0.2288,  0.7570, -2.2965,  0.1419],
         [ 0.1953,  0.5820,  0.2122,  0.7060]],

        [[ 0.5149,  1.8388, -0.1670, -0.6262],
         [-1.2218, -1.0102, -1.7788, -0.1309],
         [ 1.4810, -0.9104,  0.3862, -0.3399],
         [-1.7931,  0.6619,  0.4371, -0.3657]]])
tensor([[[-1.5939e+00, -1.0000e+09, -1.0000e+09, -1.0000e+09],
         [-2.1207e+00,  1.1539e+00, -1.0000e+09, -1.0000e+09],
         [-2.2882e-01,  7.5699e-01, -2.2965e+00, -1.0000e+09],
         [ 1.9528e-01,  5.8200e-01,  2.1218e-01,  7.0597e-01]],

        [[ 5.1494e-01, -1.0000e+09, -1.0000e+09, -1.0000e+09],
         [-1.2218e+00, -1.0102e+00, -1.0000e+09, -1.0000e+09],
         [ 1.4810e+00, -9.1041e-01,  3.8615e-01, -1.0000e+09],
         [-1.0000e+09, -1.0000e+09, -1.0000e+09, -1.0000e+09]]])
tensor([[[1.0000, 0.0000, 0.0000, 0.0000],
         [0.0365, 0.9635, 0.0000, 0.0000],
         [0.2627, 0.7041, 

## attn 实现函数

In [None]:
# 构建self_attn
def scaled_dot_product_attn(Q, K, V, attn_mask):
    # shape of QKV : [batch_size*num_head, seq_len, model_dim/num_head]
    # shape of attn_mask : (sl, sl)
    # bs = batch_size*num_head
    score = torch.bmm(Q, K.transpose(-1, -2)) / torch.sqrt(model_dim) # (bs, sl, sl)
    masked_score = score.masked_fill(attn_mask, -1e9) # (bs, sl, sl)
    prob = F.softmax(masked_score, -1) # (bs, sl, sl)
    context = torch.bmm(prob, V) # (bs, sl, model_dim/num_head)
    return context



In [None]:
## pytorch 源码中
#### attn_mask用的加法加到attn上，attn_mask 有效的地方是0， 无效的地方是-1e9