
## Scaled Dot-product Attention
代码实现了Scaled Dot-product Attention，并进行了测试。

In [19]:
import warnings
warnings.filterwarnings('ignore')

Too many parameters - -A


In [21]:
# 导入必要的库
from torch import nn  # 导入PyTorch的nn模块，用于定义神经网络层
from transformers import AutoConfig  # 导入自动配置类，用于获取预训练模型的配置
from transformers import AutoTokenizer  # 导入自动分词器类，用于文本的分词和编码

# 指定预训练的BERT模型
cache_dir = './pretrained_model'
model_ckpt = "bert-base-uncased"

# 初始化分词器
tokenizer = AutoTokenizer.from_pretrained(model_ckpt, cache_dir=cache_dir)  # 从预训练模型加载分词器

# 准备输入文本
text = "hello world"

# 使用分词器处理文本，返回特殊的tensor格式
inputs = tokenizer(text, return_tensors="pt", add_special_tokens=False)  # 不添加特殊标记
print(inputs.input_ids)  # 打印输入的token ids

# 获取模型的配置
config = AutoConfig.from_pretrained(model_ckpt)  # 从预训练模型加载配置

# 创建一个嵌入层，用于将token ids转换为词向量
# vocab_size是词汇表的大小，hidden_size是嵌入向量的维度
token_emb = nn.Embedding(config.vocab_size, config.hidden_size)
print(token_emb)  # 打印嵌入层

# 将输入的token ids通过嵌入层转换为词向量
inputs_embeds = token_emb(inputs.input_ids)  # 调用嵌入层的forward方法
print(inputs_embeds.size())  # 打印词向量的尺寸


tensor([[7592, 2088]])
Embedding(30522, 768)
torch.Size([1, 2, 768])


In [25]:
import torch  # 导入PyTorch库，用于进行张量运算
from math import sqrt  # 导入math库中的sqrt函数，用于计算平方根

# 假设inputs_embeds是之前步骤中得到的词嵌入向量，这里同时作为查询（Q）、键（K）和值（V）
Q = K = V = inputs_embeds  # 这里的Q, K, V是相同的词嵌入向量，但在实际应用中它们可能来自不同的输入

# 获取键（K）的维度大小，即键的嵌入维度
dim_k = K.size(-1)  # -1表示最后一个维度，这里是嵌入向量的维度

# 计算注意力分数
'''
为了计算Q和K的点积，我们需要调整K的维度，以便它与Q的维度兼容。原始的K矩阵具有形状(batch_size, sent_len, emb_dim)，
为了执行矩阵乘法，我们需要将K的第二个和第三个维度交换，这里我们可以使用转置操作transpose(1, 2)将K的维度变为(batch_size, emb_dim, sent_len)。

现在，Q和K的形状都是(batch_size, sent_len, emb_dim)，我们可以执行矩阵乘法。

Q 的形状：(batch_size, sent_len, emb_dim)

K 的转置的形状：(batch_size, emb_dim, sent_len)

注意力分数的形状：(batch_size, sent_len, sent_len)
'''
# 每个分数表示一个查询向量与所有键向量之间的相似度
scores = torch.bmm(Q, K.transpose(1, 2)) / sqrt(dim_k)  # 计算得到的分数矩阵并缩放
print(scores)  # 打印
# 这个矩阵中的每个元素表示一个查询向量与一个键向量之间的相似度分数。

# 打印注意力分数的形状
print(scores.size())  # 应该输出(batch_size, sent_len, sent_len)
# 具体来说，scores 的每个元素 scores[b, i, j] 表示第 b 个批次中第 i 个查询向量与第 j 个键向量之间的相似度分数。


tensor([[[25.9001, -0.7132],
         [-0.7132, 25.8847]]], grad_fn=<DivBackward0>)
torch.Size([1, 2, 2])


In [26]:
import torch.nn.functional as F  # 导入PyTorch的nn.functional模块，包含了许多神经网络操作的函数

# 使用softmax函数对注意力分数进行归一化，得到注意力权重
# softmax函数将查询向量与键向量的相似度分数转换成一个概率分布。
# 这里的dim=-1指定了softmax操作的维度，即沿着最后一个维度（sent_len）进行，这个维度表示的是键向量的索引。
'''
简单讲下，假设
scores = [[[1.0, 2.0, 3.0],
           [4.0, 5.0, 6.0],
           [7.0, 8.0, 9.0]]]
1.计算每个元素的指数值：
exp_scores = [[[exp(1.0), exp(2.0), exp(3.0)],
               [exp(4.0), exp(5.0), exp(6.0)],
               [exp(7.0), exp(8.0), exp(9.0)]]]
2.计算每个查询向量的指数值之和：
sum_exp_scores = [[exp(1.0) + exp(2.0) + exp(3.0),
                   exp(4.0) + exp(5.0) + exp(6.0),
                   exp(7.0) + exp(8.0) + exp(9.0)]]
3.计算 softmax 值：
softmax_scores = [[[exp(1.0) / sum_exp_scores[0][0], exp(2.0) / sum_exp_scores[0][0], exp(3.0) / sum_exp_scores[0][0]],
                    [exp(4.0) / sum_exp_scores[0][1], exp(5.0) / sum_exp_scores[0][1], exp(6.0) / sum_exp_scores[0][1]],
                    [exp(7.0) / sum_exp_scores[0][2], exp(8.0) / sum_exp_scores[0][2], exp(9.0) / sum_exp_scores[0][2]]]]

至于为什么不能在查询向量上面做，主要是因为查询向量和键向量的作用不同。查询向量（Q）用于表示我们想要获取信息的请求，而键向量（K）用于表示与查询向量进行比较的键。
'''
weights = F.softmax(scores, dim=-1)
print(weights)  # 打印权重
# 打印权重的和，dim=-1表示沿着最后一个维度（即每个词的权重和）进行求和
# 由于softmax函数的输出是概率分布，每个维度的和应该接近1（如果不是1，可能是由于浮点数精度问题）
print(weights.sum(dim=-1))  # 打印每个查询词的权重和，理论上应该接近1

tensor([[[1.0000e+00, 2.7667e-12],
         [2.8098e-12, 1.0000e+00]]], grad_fn=<SoftmaxBackward0>)
tensor([[1., 1.]], grad_fn=<SumBackward1>)


In [28]:
attn_outputs = torch.bmm(weights, V)
print(attn_outputs)
print(attn_outputs.shape)


tensor([[[-0.1071, -1.8374, -0.2551,  ..., -0.2630,  0.3523,  0.1242],
         [-0.8179,  0.8697, -1.4724,  ...,  0.6556,  0.3608,  0.0020]]],
       grad_fn=<BmmBackward0>)
torch.Size([1, 2, 768])


## Scaled Dot-product Attention代码整合/封装

In [29]:
import torch  # 导入PyTorch库
import torch.nn.functional as F  # 导入PyTorch的nn.functional模块，包含了许多神经网络操作的函数
from math import sqrt  # 导入math库中的sqrt函数，用于计算平方根

# 定义Scaled Dot-product Attention函数
def scaled_dot_product_attention(query, key, value, query_mask=None, key_mask=None, mask=None):
    # 获取查询（query）的最后一个维度大小，即键（key）的维度
    dim_k = query.size(-1)
    
    # 计算查询和键的点积，并缩放，得到未归一化的注意力分数
    scores = torch.bmm(query, key.transpose(1, 2)) / sqrt(dim_k)
    
    # 如果提供了查询掩码（query_mask）和键掩码（key_mask），则计算掩码矩阵
    if query_mask is not None and key_mask is not None:
        mask = torch.bmm(query_mask.unsqueeze(-1), key_mask.unsqueeze(1))
    else:
        # 如果没有提供掩码，则使用之前传入的掩码（如果有的话）
        mask = mask
    
    # 如果存在掩码，则将分数矩阵中与掩码对应位置为0的分数替换为负无穷
    # 这样在应用softmax时，这些位置的权重会接近于0
    if mask is not None:
        scores = scores.masked_fill(mask == 0, -float("inf"))
    
    # 使用softmax函数对分数进行归一化，得到注意力权重
    weights = F.softmax(scores, dim=-1)
    
    # 计算加权后的输出，即将注意力权重与值（value）相乘
    # 这里的输出是经过注意力加权后的值向量，用于下游任务
    return torch.bmm(weights, value)


## Multi-Head Attention

In [30]:
from torch import nn

# 定义AttentionHead类，继承自nn.Module
class AttentionHead(nn.Module):
    # 初始化函数
    def __init__(self, embed_dim, head_dim):
        super().__init__()  # 调用基类的初始化方法
        # 定义线性层，用于将输入的词嵌入向量转换为查询（q）、键（k）和值（v）向量
        # embed_dim是输入嵌入的维度，head_dim是每个头输出的维度
        self.q = nn.Linear(embed_dim, head_dim)
        self.k = nn.Linear(embed_dim, head_dim)
        self.v = nn.Linear(embed_dim, head_dim)

    # 前向传播函数
    def forward(self, query, key, value, query_mask=None, key_mask=None, mask=None):
        # 调用scaled_dot_product_attention函数，传入通过线性层转换后的查询、键和值
        # 同时传入可选的掩码参数
        attn_outputs = scaled_dot_product_attention(
            self.q(query),  # 经过查询线性层转换的query
            self.k(key),     # 经过键线性层转换的key
            self.v(value),   # 经过值线性层转换的value
            query_mask,      # 查询掩码
            key_mask,        # 键掩码
            mask             # 已有的掩码（如果有的话）
        )
        # 返回注意力机制的输出
        return attn_outputs


In [31]:
from torch import nn

# 定义MultiHeadAttention类，继承自nn.Module
class MultiHeadAttention(nn.Module):
    # 初始化函数
    def __init__(self, config):
        super().__init__()  # 调用基类的初始化方法
        # 从配置中获取嵌入维度和注意力头的数量
        embed_dim = config.hidden_size
        num_heads = config.num_attention_heads
        # 计算每个头的维度大小
        head_dim = embed_dim // num_heads
        
        # 创建一个包含多个AttentionHead模块的列表
        # 每个头都使用相同的嵌入维度和头维度
        self.heads = nn.ModuleList(
            [AttentionHead(embed_dim, head_dim) for _ in range(num_heads)]
        )
        
        # 定义输出线性层，用于将多头注意力的输出合并
        self.output_linear = nn.Linear(embed_dim, embed_dim)

    # 前向传播函数
    def forward(self, query, key, value, query_mask=None, key_mask=None, mask=None):
        # 并行通过每个注意力头处理输入
        # 使用torch.cat将所有头的输出在最后一个维度上拼接起来
        x = torch.cat([
            h(query, key, value, query_mask, key_mask, mask) for h in self.heads
        ], dim=-1)
        
        # 通过输出线性层处理拼接后的输出
        x = self.output_linear(x)
        
        # 返回最终的输出
        return x

In [33]:
from transformers import AutoConfig
from transformers import AutoTokenizer

model_ckpt = "bert-base-uncased"
cache_dir = "./pretrained_model"
tokenizer = AutoTokenizer.from_pretrained(model_ckpt, cache_dir=cache_dir)

text = "hello world"
inputs = tokenizer(text, return_tensors="pt", add_special_tokens=False)
config = AutoConfig.from_pretrained(model_ckpt)
token_emb = nn.Embedding(config.vocab_size, config.hidden_size)
inputs_embeds = token_emb(inputs.input_ids)

multihead_attn = MultiHeadAttention(config)
query = key = value = inputs_embeds
attn_output = multihead_attn(query, key, value)
print(attn_output.size()) #torch.Size([1, 5, 768])

torch.Size([1, 2, 768])


In [35]:
from torch import nn

# 定义FeedForward类，继承自nn.Module
class FeedForward(nn.Module):
    # 初始化函数
    def __init__(self, config):
        super().__init__()  # 调用基类的初始化方法
        # 定义第一个线性层，将输入的隐藏状态映射到中间维度
        self.linear_1 = nn.Linear(config.hidden_size, config.intermediate_size)
        # 定义第二个线性层，将中间维度的表示映射回原始的隐藏状态维度
        self.linear_2 = nn.Linear(config.intermediate_size, config.hidden_size)
        # 定义GELU激活函数
        self.gelu = nn.GELU()
        # 定义Dropout层，用于防止过拟合
        self.dropout = nn.Dropout(config.hidden_dropout_prob)

    # 前向传播函数
    def forward(self, x):
        # 应用第一个线性层
        x = self.linear_1(x)
        # 应用GELU激活函数
        x = self.gelu(x)
        # 应用第二个线性层
        x = self.linear_2(x)
        # 应用Dropout
        x = self.dropout(x)
        # 返回最终的输出
        return x


In [36]:
from torch import nn

# 定义TransformerEncoderLayer类，继承自nn.Module
class TransformerEncoderLayer(nn.Module):
    # 初始化函数
    def __init__(self, config):
        super().__init__()  # 调用基类的初始化方法
        # 定义第一个层归一化，用于注意力机制之前
        self.layer_norm_1 = nn.LayerNorm(config.hidden_size)
        # 定义第二个层归一化，用于前馈网络之前
        self.layer_norm_2 = nn.LayerNorm(config.hidden_size)
        # 定义多头注意力机制
        self.attention = MultiHeadAttention(config)
        # 定义前馈神经网络
        self.feed_forward = FeedForward(config)

    # 前向传播函数
    def forward(self, x, mask=None):
        # 应用第一个层归一化
        hidden_state = self.layer_norm_1(x)
        # 应用注意力机制，并将结果与输入进行残差连接
        # 注意力机制的输出将与输入x相加，得到更新后的x
        x = x + self.attention(hidden_state, hidden_state, hidden_state, mask=mask)
        # 应用第二个层归一化
        # 注意这里的self.layer_norm_2(x)实际上是对更新后的x进行归一化
        hidden_state = self.layer_norm_2(x)
        # 应用前馈网络，并将结果与更新后的x进行残差连接
        x = x + self.feed_forward(hidden_state)
        # 返回最终的输出x
        return x


In [37]:
encoder_layer = TransformerEncoderLayer(config)
print(inputs_embeds.shape)
print(encoder_layer(inputs_embeds).size())
#torch.Size([1, 5, 768])
#torch.Size([1, 5, 768])

torch.Size([1, 2, 768])
torch.Size([1, 2, 768])


## 绝对位置编码

In [40]:
from torch import nn, LongTensor, arange

# 定义Embeddings类，继承自nn.Module
class Embeddings(nn.Module):
    # 初始化函数
    def __init__(self, config):
        super().__init__()  # 调用基类的初始化方法
        # 定义词嵌入层，将词ID映射到词向量
        self.token_embeddings = nn.Embedding(config.vocab_size, config.hidden_size)
        # 定义位置嵌入层，为序列中的每个位置生成一个唯一的位置向量
        self.position_embeddings = nn.Embedding(config.max_position_embeddings, config.hidden_size)
        # 定义层归一化，用于稳定训练过程
        self.layer_norm = nn.LayerNorm(config.hidden_size, eps=1e-12)
        # 定义Dropout层，用于防止过拟合
        self.dropout = nn.Dropout(config.hidden_dropout_prob)

    # 前向传播函数
    def forward(self, input_ids):
        # 根据输入序列的长度创建位置ID
        seq_length = input_ids.size(1)  # 获取序列长度
        position_ids = torch.arange(seq_length, dtype=torch.long).unsqueeze(0)  # 创建位置ID序列
        # 创建词嵌入和位置嵌入
        token_embeddings = self.token_embeddings(input_ids)  # 通过词嵌入层获取词嵌入
        position_embeddings = self.position_embeddings(position_ids)  # 通过位置嵌入层获取位置嵌入
        # 将词嵌入和位置嵌入相加，得到最终的嵌入表示
        embeddings = token_embeddings + position_embeddings
        # 应用层归一化
        embeddings = self.layer_norm(embeddings)
        # 应用Dropout
        embeddings = self.dropout(embeddings)
        # 返回最终的嵌入表示
        return embeddings

# 创建Embeddings层的实例，并使用config配置
embedding_layer = Embeddings(config)

# 使用embedding_layer处理输入的词ID，并打印输出的大小
# 这里假设inputs.input_ids是之前通过tokenizer得到的词ID序列
print(embedding_layer(inputs.input_ids).size()) #torch.Size([1, 5, 768])


torch.Size([1, 2, 768])
