<a href="https://colab.research.google.com/github/Dominique-Yiu/ColabCode/blob/main/Transformer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Transformer Realization
**Transformer Architecture** \
1. Encoder
- Input words embedding
  - Turn the sparse one-hot vector into the dense contiguouly vector by FFN without bias.
- Position encoding
- Multi-head self-attention
- Feed-forword network
2. Decoder
- Output words embedding
- Masked multi-head self-attention
- Multi-head cross-attention
- Feed-forword network
- Softmax

实现的难点： \
1. Word Embedding
2. Posotion Embedding
3. Encoder self-attention mask
4. Intra-attention ask
5. Decoder self-attention mask
6. Multi-head self-attention

In [None]:
import torch
import numpy
import torch.nn as nn
import torch.nn.functional as F

In [None]:
batch_size = 2
# the size of the word sheet
max_num_src_words = 8
max_num_tgt_words = 8
# 
model_dim = 8
# the max length of sequence
max_src_seq_len = 5
max_tgt_seq_len = 5
# 位置索引最大值
max_position_len = 5
# generate the sequence length randomly, its size is fixed
# src_len = torch.randint(2, 5, (batch_size,))
# tgt_len = torch.randint(2, 5, (batch_size,))
src_len = torch.Tensor([2, 4]).to(torch.int32)
tgt_len = torch.Tensor([4, 3]).to(torch.int32)

# generate the src/tgt sentence, and pad this sentence with default value '0'
src_seq = torch.cat([torch.unsqueeze(F.pad(torch.randint(1, max_num_src_words, (L,)), (0, max(src_len) - L)), 0) \
           for L in src_len])
tgt_seq = torch.cat([torch.unsqueeze(F.pad(torch.randint(1, max_num_tgt_words, (L,)), (0, max(tgt_len) - L)), 0) \
           for L in tgt_len])

"""Word Embedding"""
# 构造 Word Embedding
src_embedding_table = nn.Embedding(max_num_src_words + 1, model_dim)
tgt_embedding_table = nn.Embedding(max_num_tgt_words + 1, model_dim)
src_embedding = src_embedding_table(src_seq)
tgt_embedding = tgt_embedding_table(tgt_seq)

print(src_embedding_table.weight)
print(src_seq)
print(src_embedding)

Parameter containing:
tensor([[ 1.3525, -0.4454,  1.6057,  0.0904, -0.6940, -1.2095, -1.9863, -1.7586],
        [-0.6995, -0.6308,  0.6180, -0.7553,  2.1051,  0.4115, -0.8706, -0.7519],
        [ 1.7582,  1.5953, -1.1571, -0.7387, -0.3730, -0.2318, -1.1671,  1.8609],
        [-1.0063, -1.6284, -1.3913,  2.3693,  0.7328, -0.9040,  0.3566, -0.5074],
        [-0.5125,  1.3462, -0.2684,  0.1615,  0.8660, -0.5704, -1.0948,  1.6697],
        [ 0.7128, -0.2651, -0.4819, -0.4780,  2.1755,  0.4100, -0.7463, -1.3425],
        [ 0.0381,  1.3084, -0.6551, -1.2019,  1.9547, -0.8416, -0.3264,  0.1672],
        [ 0.3851,  0.8669,  1.4315, -0.7066, -2.2423, -0.7346,  1.0852, -0.4502],
        [ 0.2387, -0.4505, -0.8245, -0.3745,  0.3566,  0.3393, -0.4981,  0.4199]],
       requires_grad=True)
tensor([[1, 6, 0, 0],
        [1, 3, 2, 6]])
tensor([[[-0.6995, -0.6308,  0.6180, -0.7553,  2.1051,  0.4115, -0.8706,
          -0.7519],
         [ 0.0381,  1.3084, -0.6551, -1.2019,  1.9547, -0.8416, -0.3264,
 

In [None]:
"""Position Embedding"""
# 构造Position Embedding
pos_mat  =  torch.arange(max_position_len).reshape((-1, 1))
i_mat = torch.pow(10000, torch.arange(0, 8, 2).reshape((1, -1)) / model_dim)
pe_embedding_table = torch.zeros(max_position_len, model_dim)
pe_embedding_table[:, 0::2] = torch.sin(pos_mat / i_mat)
pe_embedding_table[:, 1::2] = torch.cos(pos_mat / i_mat)

pe_embedding = nn.Embedding(max_position_len, model_dim)
pe_embedding.weight = nn.Parameter(pe_embedding_table, requires_grad = False)
src_pos = torch.cat([torch.unsqueeze(torch.arange(max(src_len)), 0) for _ in src_len]).to(torch.int32)
tgt_pos = torch.cat([torch.unsqueeze(torch.arange(max(tgt_len)), 0) for _ in tgt_len]).to(torch.int32)

src_pos_embedding = pe_embedding(src_pos)
tgt_pos_embedding = pe_embedding(tgt_pos)
print(src_pos_embedding)
print(tgt_pos_embedding)

tensor([[[ 0.0000e+00,  1.0000e+00,  0.0000e+00,  1.0000e+00,  0.0000e+00,
           1.0000e+00,  0.0000e+00,  1.0000e+00],
         [ 8.4147e-01,  5.4030e-01,  9.9833e-02,  9.9500e-01,  9.9998e-03,
           9.9995e-01,  1.0000e-03,  1.0000e+00],
         [ 9.0930e-01, -4.1615e-01,  1.9867e-01,  9.8007e-01,  1.9999e-02,
           9.9980e-01,  2.0000e-03,  1.0000e+00],
         [ 1.4112e-01, -9.8999e-01,  2.9552e-01,  9.5534e-01,  2.9995e-02,
           9.9955e-01,  3.0000e-03,  1.0000e+00]],

        [[ 0.0000e+00,  1.0000e+00,  0.0000e+00,  1.0000e+00,  0.0000e+00,
           1.0000e+00,  0.0000e+00,  1.0000e+00],
         [ 8.4147e-01,  5.4030e-01,  9.9833e-02,  9.9500e-01,  9.9998e-03,
           9.9995e-01,  1.0000e-03,  1.0000e+00],
         [ 9.0930e-01, -4.1615e-01,  1.9867e-01,  9.8007e-01,  1.9999e-02,
           9.9980e-01,  2.0000e-03,  1.0000e+00],
         [ 1.4112e-01, -9.8999e-01,  2.9552e-01,  9.5534e-01,  2.9995e-02,
           9.9955e-01,  3.0000e-03,  1.0000e+00]

In [None]:
import numpy as np
"""Encoder: Self-Attention Mask"""
# 构造encoder的self-attention mask
# mask的shape: [batch_size, max_src_len, max_src_len]，数值为1/-inf
valid_encoder_pos = torch.unsqueeze(torch.cat([torch.unsqueeze(F.pad(torch.ones(L), (0, max(src_len) - L)), 0) for L in src_len]), 2)
valid_encoder_pos_matrix = torch.bmm(valid_encoder_pos, valid_encoder_pos.transpose(1, 2))
invalid_encoder_pos_matrix = 1 - valid_encoder_pos_matrix
mask_encoder_self_attention = invalid_encoder_pos_matrix.to(torch.bool)
score = torch.randn(batch_size, max(src_len), max(src_len))
masked_score = score.masked_fill(mask_encoder_self_attention, -1e9)
prob = F.softmax(masked_score, -1)

print(score)
print(masked_score)
print(prob)

tensor([[[ 1.7397, -0.5672,  1.3452, -0.3177],
         [-0.4246,  1.7285,  0.1224, -0.8866],
         [ 0.7788, -1.4620,  1.6338, -0.9117],
         [-0.8076, -0.8892, -1.3644, -1.0709]],

        [[ 1.6645,  1.1345, -0.1216, -1.1251],
         [-0.7066,  0.6371,  1.2838,  1.0437],
         [ 0.1213, -1.7839,  0.8772,  0.6051],
         [-1.6138,  1.5712, -1.2977,  2.5202]]])
tensor([[[ 1.7397e+00, -5.6720e-01, -1.0000e+09, -1.0000e+09],
         [-4.2460e-01,  1.7285e+00, -1.0000e+09, -1.0000e+09],
         [-1.0000e+09, -1.0000e+09, -1.0000e+09, -1.0000e+09],
         [-1.0000e+09, -1.0000e+09, -1.0000e+09, -1.0000e+09]],

        [[ 1.6645e+00,  1.1345e+00, -1.2160e-01, -1.1251e+00],
         [-7.0658e-01,  6.3712e-01,  1.2838e+00,  1.0437e+00],
         [ 1.2126e-01, -1.7839e+00,  8.7725e-01,  6.0507e-01],
         [-1.6138e+00,  1.5712e+00, -1.2977e+00,  2.5202e+00]]])
tensor([[[0.9094, 0.0906, 0.0000, 0.0000],
         [0.1040, 0.8960, 0.0000, 0.0000],
         [0.2500, 0.2500, 

以上主要实现了词向量，位置编码，编码器子注意力的掩码

In [None]:
"""Corss-Attention"""
# Q @ K^T shape: [batch_size, tht_seq_len, src_seq_len]
valid_encoder_pos = torch.unsqueeze(torch.cat([torch.unsqueeze(F.pad(torch.ones(L), (0, max(src_len) - L)), 0) for L in src_len]), 2)
valid_decoder_pos = torch.unsqueeze(torch.cat([torch.unsqueeze(F.pad(torch.ones(L), (0, max(tgt_len) - L)), 0) for L in tgt_len]), 2)
# 目标序列位置对原始序列的关系（有效性）
valid_cross_pos_matrix = torch.bmm(valid_encoder_pos, valid_decoder_pos.transpose(1, 2))
invalid_cross_pos_matrix = 1 - valid_cross_pos_matrix
mask_cross_attention = invalid_cross_pos_matrix.to(torch.bool)
score = torch.randn(batch_size, max(tgt_len), max(src_len))
masked_score = score.masked_fill(mask_cross_attention, -1e9)
prob = F.softmax(masked_score, -1)

print(valid_encoder_pos)
print(valid_decoder_pos)
print(valid_cross_pos_matrix)
print(masked_score)
print(prob)

tensor([[[1.],
         [1.],
         [0.],
         [0.]],

        [[1.],
         [1.],
         [1.],
         [1.]]])
tensor([[[1.],
         [1.],
         [1.],
         [1.]],

        [[1.],
         [1.],
         [1.],
         [0.]]])
tensor([[[1., 1., 1., 1.],
         [1., 1., 1., 1.],
         [0., 0., 0., 0.],
         [0., 0., 0., 0.]],

        [[1., 1., 1., 0.],
         [1., 1., 1., 0.],
         [1., 1., 1., 0.],
         [1., 1., 1., 0.]]])
tensor([[[ 9.2010e-01, -9.7814e-01, -1.2373e+00, -5.1125e-01],
         [ 6.7158e-01, -1.3036e+00, -6.0340e-01,  6.7199e-01],
         [-1.0000e+09, -1.0000e+09, -1.0000e+09, -1.0000e+09],
         [-1.0000e+09, -1.0000e+09, -1.0000e+09, -1.0000e+09]],

        [[-1.6189e-01,  1.2645e-01, -1.0282e-02, -1.0000e+09],
         [ 2.8165e-01,  1.7807e+00,  1.8699e-02, -1.0000e+09],
         [ 2.4964e+00,  4.9022e-01, -4.9626e-01, -1.0000e+09],
         [-1.3185e+00,  4.3000e-01, -1.3165e+00, -1.0000e+09]]])
tensor([[[0.6647, 0.0996

In [None]:
"""Decoder Self-Attention Mask"""
# 因果Mask
valid_decoder_tri_matrix = torch.cat([torch.unsqueeze(F.pad(torch.tril(torch.ones((L, L))), (0, max(tgt_len) - L, 0, max(tgt_len) - L)), 0) for L in tgt_len])
invalid_decoder_tri_matrix = 1 - valid_decoder_tri_matrix
mask_invalid_decoder_tri_matrix = invalid_decoder_tri_matrix.to(torch.bool)

score = torch.randn(batch_size, max(tgt_len), max(tgt_len))
masked_score = score.masked_fill(mask_invalid_decoder_tri_matrix, -1e9)
prob = F.softmax(masked_score, -1)
print(tgt_len)
print(prob)

tensor([4, 3], dtype=torch.int32)
tensor([[[1.0000, 0.0000, 0.0000, 0.0000],
         [0.2886, 0.7114, 0.0000, 0.0000],
         [0.3299, 0.4095, 0.2605, 0.0000],
         [0.2985, 0.1735, 0.2143, 0.3137]],

        [[1.0000, 0.0000, 0.0000, 0.0000],
         [0.4326, 0.5674, 0.0000, 0.0000],
         [0.3997, 0.1348, 0.4655, 0.0000],
         [0.2500, 0.2500, 0.2500, 0.2500]]])


$Attention(Q,K,V)=softmax(\frac{QK^T}{\sqrt{d_k}})V$

In [None]:
"""Scaled Self-Attention"""
def scaled_dot_product_attention(Q, K, V, attn_mask):
    score = torch.bmm(Q, K.transpose(-2, -1)) / torch.sqrt(model_dim)
    masked_score = torch.masked_fill(score * attn_mask, -1e9)
    prob = F.softmax(masked_score, -1)
    context = torch.bmm(prob, V)
    return context


**多头注意力**
希望模型可以基于相同的注意力机制学习到不同的行为，然后将不同的行为作为知识组合起来，捕获序列内各种范围的依赖关系。
可以独立学习得到h组不同的线性投影连变换Q,K和V。然后，这h组变换后的Q,K,V将并行送到注意力汇聚中。最后将这h个注意力汇聚的输出拼接在一起，并且通过一个可以学习的线性投影进行变换产生最终的输出。
**自注意力机制**
查询、键和值都来自于同一组输入。