# 手工实现Scaled Dot-product Attention
首先需要将文本分词为token序列，然后将每个词语转为对应的embedding。

我们利用`torch.nn.Embedding`层来实现操作，去创建一个从token ID到token embedding的映射表：


In [2]:
from torch import nn
from transformers import AutoConfig
from transformers import AutoTokenizer

model_ckpt = "bert-base-uncased"
tokenizer = AutoTokenizer.from_pretrained(model_ckpt)

print(tokenizer)



PreTrainedTokenizerFast(name_or_path='bert-base-uncased', vocab_size=30522, model_max_len=512, is_fast=True, padding_side='right', truncation_side='right', special_tokens={'unk_token': '[UNK]', 'sep_token': '[SEP]', 'pad_token': '[PAD]', 'cls_token': '[CLS]', 'mask_token': '[MASK]'})


In [3]:
text = "time flies like an arrow"
inputs = tokenizer(text, return_tensors="pt", add_special_tokens=False)
print(inputs.input_ids)
print(inputs)

tensor([[ 2051, 10029,  2066,  2019,  8612]])
{'input_ids': tensor([[ 2051, 10029,  2066,  2019,  8612]]), 'token_type_ids': tensor([[0, 0, 0, 0, 0]]), 'attention_mask': tensor([[1, 1, 1, 1, 1]])}


In [4]:
config = AutoConfig.from_pretrained(model_ckpt)
token_emb = nn.Embedding(config.vocab_size, config.hidden_size)
print(token_emb)

Embedding(30522, 768)


In [5]:
inputs_embeds = token_emb(inputs.input_ids)
print(inputs_embeds.size())

torch.Size([1, 5, 768])


In [6]:
import torch
from math import sqrt

Q = K = V = inputs_embeds
dim_K = K.size(-1)
scores = torch.bmm(Q, K.transpose(1, 2)) / sqrt(dim_K)
print(scores)
print(scores.size())

tensor([[[29.9866, -0.0933, -0.7198, -2.8823, -0.4788],
         [-0.0933, 30.1352,  0.0881, -1.5191, -0.1235],
         [-0.7198,  0.0881, 26.9140,  0.8421, -0.2792],
         [-2.8823, -1.5191,  0.8421, 25.2472,  0.2540],
         [-0.4788, -0.1235, -0.2792,  0.2540, 25.7618]]],
       grad_fn=<DivBackward0>)
torch.Size([1, 5, 5])


In [7]:
# Softmax 标准化注意力权重
import torch.nn.functional as F 

weights = F.softmax(scores, dim=-1)
print(weights.sum(dim = -1))


tensor([[1., 1., 1., 1., 1.]], grad_fn=<SumBackward1>)


In [8]:
# 将注意力权重与value序列相乘
attn_outputs = torch.bmm(weights, V)
print(attn_outputs.shape)

torch.Size([1, 5, 768])


In [9]:
# 简化版的Scaled Dot-product Attention

import torch
import torch.nn.functional as F
from math import sqrt

def scaled_dot_product_attention(query, key, value, query_mask=None, key_mask=None, mask=None):
    dim_k = query.size(-1)  # query.shape: [batch_size, num_queries, d_k]
    scores = torch.bmm(query, key.transpose(1, 2))   # .transpose(1, 2): 将目标tensor的第1维度和第二维度进行互换转置
                                                    # .bmm： 在batch_size的层面上进行矩阵相乘
    if query_mask is not None and key_mask is not None:   # query_mask的维度通常为：[batch_size, num_queries]，key_mask同理
        mask = torch.bmm(query_mask.unsqueeze(-1), key_mask.unsqueeze(1))   # .unsqueeze(1): 在第0个维度后面，增加一个维度：1
                                                                            # .unsqueeze(-1): 在最后一个维度后面增加一个维度: 1
                                                                            # 目的是，为了用.bmm对一个batch进行运算。
    if mask is not None:   # mask 是填充字符，就是embedding后，分割不同词的填充字符(padding)
        scores = scores.masked_fill(mask == 0, -float("inf"))
    weights = F.softmax(scores, dim=-1)
    return torch.bmm(weights, value)


当Q和K的序列相同的时候，注意力机制会为上下文中的相同单词分配非常大的分数。

但在实践中，相关词往往比相同词更加重要，因此需要多头注意力机制。



# 多头注意力


In [10]:
from torch import nn 

class AttentionHead(nn.Module):   # 多头层的单个头的函数
    def __init__(self, embed_dim, head_dim):   # d_k和 d_k-
        super().__init__()
        self.q = nn.Linear(embed_dim, head_dim)  # 映射Q的线性映射矩阵
        self.k = nn.Linear(embed_dim, head_dim)
        self.v = nn.Linear(embed_dim, head_dim)

    def forward(self, query, key, value, query_mask=None, key_mask=None, mask=None):
        attn_outputs = scaled_dot_product_attention(self.q(query), self.k(key), self.v(value), query_mask, key_mask, mask)
        return attn_outputs

将单个头的输出拼接起来，然后通过一个线性变换embed_dim到embed_dim的线性层调整，得到最终的Multi-head Attention的输出

In [11]:
class MultiHeadAttention(nn.Module):
    def __init__(self, config):
        super().__init__()
        embed_dim = config.hidden_size  # embed向量的维度
        num_heads = config.num_attention_heads
        head_dim = embed_dim // num_heads

        self.heads = nn.ModuleList(
            [AttentionHead(embed_dim, head_dim) for _ in range(num_heads)]
        )

        self.output_linear = nn.Linear(embed_dim, embed_dim)
    
    def forward(self, query, key, value, query_mask=None, key_mask=None, mask=None):
        x = torch.cat(
            [h(query, key, value, query_mask, key_mask, mask) for h in self.heads]
        ,
        dim=-1)
        x = self.output_linear(x)
        return x

用BERT-base-uncased模型的参数来初始化MHA层，并将之前构建的输入送入模型来验证是否工作正常

In [12]:
from transformers import AutoConfig
from transformers import AutoTokenizer

model_ckpt = "bert-base-uncased"
tokenizer = AutoTokenizer.from_pretrained(model_ckpt)

text = "time flies like an arrow"
inputs = tokenizer(text, return_tensors="pt", add_special_tokens=False)
config = AutoConfig.from_pretrained(model_ckpt)
token_emb = nn.Embedding(config.vocab_size, config.hidden_size)
inputs_embeds = token_emb(inputs.input_ids)

multihead_attn = MultiHeadAttention(config)
query = key = value = inputs_embeds
attn_outputs = multihead_attn(query, key, value)
print(attn_outputs.size())

torch.Size([1, 5, 768])


# 实现FFN层
该层单独地处理sequence中的每一个embedding

In [13]:
class FeedForwar(nn.Module):
    def __init__(self,config, *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self.linear1 = nn.Linear(config.hidden_size, config.intermediate_size)
        self.linear2 = nn.Linear(config.intermediate_size, config.hidden_size)
        self.gelu = nn.GELU()
        self.dropout = nn.Dropout(config.hidden_dropout_prob)

    def forward(self, x):
        x = self.linear1(x)
        x = self.gelu(x)
        x = self.linear2(x)
        x = self.dropout(x)
        return x

In [14]:
feed_forward = FeedForwar(config)
ff_outputs = feed_forward(attn_outputs)
print(ff_outputs.size())

torch.Size([1, 5, 768])


# layer Normalization

通常的做法是，先进行layerNorm，再进行MHA和残差链接。

In [15]:
class TransformerEncoderLayer(nn.Module):
    def __init__(self, config,*args, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self.layer_norm_1 = nn.LayerNorm(config.hidden_size)
        self.layer_norm_2 = nn.LayerNorm(config.hidden_size)
        self.attention = MultiHeadAttention(config)
        self.feed_forward = FeedForwar(config)

    def forward(self, x, mask=None):
        # apply layer normalization and then copy input into query, key, value
        hidden_state = self.layer_norm_1(x)
        # apply attention with a skip connection
        x = x + self.attention(hidden_state, hidden_state, hidden_state, mask=mask)
        # Apply ffn layer with a skip connection
        x = x + self.feed_forward(self.layer_norm_2(x))
        return x

In [16]:
encoder_layer = TransformerEncoderLayer(config)
print(inputs_embeds.shape)
print(encoder_layer(inputs_embeds).size())

torch.Size([1, 5, 768])
torch.Size([1, 5, 768])


# Positional Embeddings
使用Positional Embeddings 来添加词语的位置信息。
基于一个简单的想法：**使用与位置相关的值模型来增强词向量**。

当预训练数据集够大的时候，最简单的方法就是让模型自动学习Positional Embeddings。

In [17]:
class Embeddings(nn.Module):
    def __init__(self,config, *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self.token_embeddings = nn.Embedding(config.vocab_size, config.hidden_size)
        self.position_embeddings = nn.Embedding(config.max_position_embeddings, config.hidden_size)
        self.layer_norm = nn.LayerNorm(config.hidden_size, eps=1e-12)
        self.dropout = nn.Dropout()

    def forward(self, input_ids):
        # Create position IDs for input sequence
        seq_length = input_ids.size(1)
        position_ids = torch.arange(seq_length, dtype=torch.long).unsqueeze(0)
        # Create token and position embeddings
        token_embeddings = self.token_embeddings(input_ids)
        position_embeddings = self.position_embeddings(position_ids)
        # Combine token and position embeddings
        embeddings = token_embeddings + position_embeddings
        embeddings = self.layer_norm(embeddings)
        embeddings = self.dropout(embeddings)
        return embeddings


In [18]:
embeddings_layer = Embeddings(config)
print(embeddings_layer(inputs.input_ids).size())

torch.Size([1, 5, 768])


In [19]:
# 结合起来，作为完整的Transformer Encoder：
class TransformerEncoder(nn.Module):
    def __init__(self, config, *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self.embeddings = Embeddings(config)
        self.layers = nn.ModuleList(
            [TransformerEncoderLayer(config)
             for _ in range(config.num_hidden_layers)]
        )
    def forward(self, x, mask=None):
        x = self.embeddings(x)
        for layer in self.layers:
            x = layer(x, mask=mask)
        return x

In [20]:
encoder = TransformerEncoder(config)
print(encoder(inputs.input_ids).size())


torch.Size([1, 5, 768])


In [21]:
seq_len = inputs.input_ids.size(-1)
mask = torch.tril(torch.ones(seq_len, seq_len)).unsqueeze(0)
print(mask[0])

tensor([[1., 0., 0., 0., 0.],
        [1., 1., 0., 0., 0.],
        [1., 1., 1., 0., 0.],
        [1., 1., 1., 1., 0.],
        [1., 1., 1., 1., 1.]])


In [22]:
scores.masked_fill(mask==0, -float("inf"))


tensor([[[29.9866,    -inf,    -inf,    -inf,    -inf],
         [-0.0933, 30.1352,    -inf,    -inf,    -inf],
         [-0.7198,  0.0881, 26.9140,    -inf,    -inf],
         [-2.8823, -1.5191,  0.8421, 25.2472,    -inf],
         [-0.4788, -0.1235, -0.2792,  0.2540, 25.7618]]],
       grad_fn=<MaskedFillBackward0>)