In [None]:
# The code is written in Python 3.6.8
# -*- coding: utf-8 -*-
# @Time    : 2025/09/28 16:24
# @Author  : zhen Gao
# Transformers
# conda activate pytorch_GPU_cuda #
#  pip install  # 官方推荐：Hugging Face 官方文档和 GitHub 都首推 pip install transformers

In [1]:
%pwd

'c:\\Users\\zhen-\\Documents\\Code\\Python_Scripts\\code_snippets\\Transformers_LLM\\Sheng Xu'

In [2]:
from torch import nn
from transformers import AutoConfig 
from transformers import AutoTokenizer

In [3]:
model_ckpt = "bert-base-uncased"
tokenizer = AutoTokenizer.from_pretrained(model_ckpt)

text = "time flies like an arrow" # 词语序列 <<<<<  =========== =========================
inputs = tokenizer(text, return_tensors="pt", add_special_tokens=False) #pt:     pytorch tensor  format 
print(inputs.input_ids)

config = AutoConfig.from_pretrained(model_ckpt)
token_emb = nn.Embedding(config.vocab_size, config.hidden_size)
print(token_emb)

inputs_embeds = token_emb(inputs.input_ids)
print(inputs_embeds.size())


RepositoryNotFoundError: 401 Client Error. (Request ID: Root=1-68e08727-7995b12a3138b8c208010a57;797a622e-9359-44dd-8820-03ad63e1f125)

Repository Not Found for url: https://huggingface.co/api/models/bert-base-uncased/tree/main/additional_chat_templates?recursive=False&expand=False.
Please make sure you specified the correct `repo_id` and `repo_type`.
If you are trying to access a private or gated repo, make sure you are authenticated. For more details, see https://huggingface.co/docs/huggingface_hub/authentication
User Access Token "Iowa" is expired

In [4]:
import torch
from math import sqrt

In [5]:
Q = K = V = inputs_embeds
dim_k = K.size(-1)
scores = torch.bmm(Q, K.transpose(1,2)) / sqrt(dim_k)
print(scores.size())

torch.Size([1, 5, 5])


这里的序列长度都为5，因此生成了一个5*5的注意力分数矩阵，接下来就是应用 Softmax 标准化注意力权重：

In [6]:
import torch.nn.functional as F

weights = F.softmax(scores, dim=-1)
print(weights.sum(dim=-1))

tensor([[1., 1., 1., 1., 1.]], grad_fn=<SumBackward1>)


最后将注意力权重与 value 序列相乘：

In [7]:
attn_outputs = torch.bmm(weights, V)
print(attn_outputs.shape)

torch.Size([1, 5, 768])


In [9]:

print(attn_outputs.size())

torch.Size([1, 5, 768])


至此就实现了一个简化版的 Scaled Dot-product Attention。可以将上面这些操作封装为函数以方便后续调用：

In [10]:
import torch
import torch.nn.functional as F
from math import sqrt

def scaled_dot_product_attention(query, key, value, query_mask=None, key_mask=None, mask=None):
    dim_k = query.size(-1)
    scores = torch.bmm(query, key.transpose(1, 2)) / sqrt(dim_k)
    if query_mask is not None and key_mask is not None:
        mask = torch.bmm(query_mask.unsqueeze(-1), key_mask.unsqueeze(1))
    if mask is not None:
        scores = scores.masked_fill(mask == 0, -float("inf"))
    weights = F.softmax(scores, dim=-1)
    return torch.bmm(weights, value)

上面的代码还考虑了 
 序列的 Mask。填充 (padding) 字符不应该参与计算，因此将对应的注意力分数设置为 $\infty$
，这样 softmax 之后其对应的注意力权重就为 0 了

Markdown cells are used for text and rich formatting. To render the infinity symbol with LaTeX syntax.  

注意！上面的做法会带来一个问题：注意力机制会为上下文中的相同单词分配非常大的分数（点积为 1），而在实践中，相关词往往比相同词更重要。例如对于上面的例子，只有关注“time”和“arrow”才能够确认“flies”的含义。

因此，多头注意力 (Multi-head Attention) 出现了！

注意！上面的做法会带来一个问题：当 
 和 
 序列相同时，注意力机制会为上下文中的相同单词分配非常大的分数（点积为 1），而在实践中，相关词往往比相同词更重要。例如对于上面的例子，只有关注“time”和“arrow”才能够确认“flies”的含义。

因此，多头注意力 (Multi-head Attention) 出现了！


Multi-head Attention 首先通过线性映射将 
 序列映射到特征空间，每一组线性投影后的向量表示称为一个头 (head)，然后在每组映射后的序列上再应用 
 

每个注意力头负责关注某一方面的语义相似性，多个头就可以让模型同时关注多个方面。因此与简单的 Scaled Dot-product Attention 相比，Multi-head Attention 可以捕获到更加复杂的特征信息。

形式化表示为：

首先实现一个注意力头：每个头都会初始化三个独立的线性层，负责将序列映射到尺寸为 [batch_size, seq_len, head_dim] 的张量，其中 head_dim 是映射到的向量维度。

最后只需要拼接多个注意力头的输出就可以构建出 Multi-head Attention 层了（这里在拼接后还通过一个线性变换来生成最终的输出张量）：

In [11]:
from torch import nn

class AttentionHead(nn.Module):
    def __init__(self, embed_dim, head_dim):
        super().__init__()
        self.q = nn.Linear(embed_dim, head_dim)
        self.k = nn.Linear(embed_dim, head_dim)
        self.v = nn.Linear(embed_dim, head_dim)

    def forward(self, query, key, value, query_mask=None, key_mask=None, mask=None):
        attn_outputs = scaled_dot_product_attention(
            self.q(query), self.k(key), self.v(value), query_mask, key_mask, mask)
        return attn_outputs

In [12]:
class MultiHeadAttention(nn.Module):
    def __init__(self, config):
        super().__init__()
        embed_dim = config.hidden_size
        num_heads = config.num_attention_heads
        head_dim = embed_dim // num_heads
        self.heads = nn.ModuleList(
            [AttentionHead(embed_dim, head_dim) for _ in range(num_heads)]
        )
        self.output_linear = nn.Linear(embed_dim, embed_dim)

    def forward(self, query, key, value, query_mask=None, key_mask=None, mask=None):
        x = torch.cat([
            h(query, key, value, query_mask, key_mask, mask) for h in self.heads
        ], dim=-1)
        x = self.output_linear(x)
        return x


In [13]:
# 这里使用 BERT-base-uncased 模型的参数初始化 Multi-head Attention 层，并且将之前构建的输入送入模型以验证是否工作正常：

from transformers import AutoConfig
from transformers import AutoTokenizer

model_ckpt = "bert-base-uncased"
tokenizer = AutoTokenizer.from_pretrained(model_ckpt)

text = "time flies like an arrow"
inputs = tokenizer(text, return_tensors="pt", add_special_tokens=False)
config = AutoConfig.from_pretrained(model_ckpt)
token_emb = nn.Embedding(config.vocab_size, config.hidden_size)
inputs_embeds = token_emb(inputs.input_ids)

multihead_attn = MultiHeadAttention(config)
query = key = value = inputs_embeds
attn_output = multihead_attn(query, key, value)
print(attn_output.size())

torch.Size([1, 5, 768])


输入的词语首先被转换为词向量。由于注意力机制无法捕获词语之间的位置关系，因此还通过 positional embeddings 向输入中添加位置信息；
Encoder 由一堆 encoder layers (blocks) 组成，类似于图像领域中的堆叠卷积层。同样地，在 Decoder 中也包含有堆叠的 decoder layers；
Encoder 的输出被送入到 Decoder 层中以预测概率最大的下一个词，然后当前的词语序列又被送回到 Decoder 中以继续生成下一个词，重复直至出现序列结束符 EOS 或者超过最大输出长度

In [14]:
# Transformer Encoder/Decoder 前馈子层实际上就是两层全连接神经网络，它单独地处理序列中的每一个词向量，也被称为 position-wise feed-forward layer。常见做法是让第一层的维度是词向量大小的 4 倍，然后以 GELU 作为激活函数。
# 下面实现一个简单的 Feed-Forward Layer：

class FeedForward(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.linear_1 = nn.Linear(config.hidden_size, config.intermediate_size)
        self.linear_2 = nn.Linear(config.intermediate_size, config.hidden_size)
        self.gelu = nn.GELU()
        self.dropout = nn.Dropout(config.hidden_dropout_prob)

    def forward(self, x):
        x = self.linear_1(x)
        x = self.gelu(x)
        x = self.linear_2(x)
        x = self.dropout(x)
        return x

In [15]:
feed_forward = FeedForward(config)
ff_outputs = feed_forward(attn_output)
print(ff_outputs.size())

torch.Size([1, 5, 768])


In [17]:
# Pre layer normalization：目前主流的做法，将 Layer Normalization 放置于 Skip Connections 的范围内。这种做法通常训练过程会更加稳定，并且不需要任何学习率预热。


class TransformerEncoderLayer(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.layer_norm_1 = nn.LayerNorm(config.hidden_size)
        self.layer_norm_2 = nn.LayerNorm(config.hidden_size)
        self.attention = MultiHeadAttention(config)
        self.feed_forward = FeedForward(config)

    def forward(self, x, mask=None):
        # Apply layer normalization and then copy input into query, key, value
        hidden_state = self.layer_norm_1(x)
        # Apply attention with a skip connection
        x = x + self.attention(hidden_state, hidden_state, hidden_state, mask=mask)
        # Apply feed-forward layer with a skip connection
        x = x + self.feed_forward(self.layer_norm_2(x))
        return x

In [18]:
# 同样地，这里将之前构建的输入送入到该层中进行测试：

encoder_layer = TransformerEncoderLayer(config)
print(inputs_embeds.shape)
print(encoder_layer(inputs_embeds).size())

torch.Size([1, 5, 768])
torch.Size([1, 5, 768])


In [19]:
# 前面讲过，由于注意力机制无法捕获词语之间的位置信息，因此 Transformer 模型还使用 Positional Embeddings 添加了词语的位置信息。

# Positional Embeddings 基于一个简单但有效的想法：使用与位置相关的值模式来增强词向量。

# 如果预训练数据集足够大，那么最简单的方法就是让模型自动学习位置嵌入。下面本章就以这种方式创建一个自定义的 Embeddings 模块，它同时将词语和位置映射到嵌入式表示，最终的输出是两个表示之和：

class Embeddings(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.token_embeddings = nn.Embedding(config.vocab_size,
                                             config.hidden_size)
        self.position_embeddings = nn.Embedding(config.max_position_embeddings,
                                                config.hidden_size)
        self.layer_norm = nn.LayerNorm(config.hidden_size, eps=1e-12)
        self.dropout = nn.Dropout()

    def forward(self, input_ids):
        # Create position IDs for input sequence
        seq_length = input_ids.size(1)
        position_ids = torch.arange(seq_length, dtype=torch.long).unsqueeze(0)
        # Create token and position embeddings
        token_embeddings = self.token_embeddings(input_ids)
        position_embeddings = self.position_embeddings(position_ids)
        # Combine token and position embeddings
        embeddings = token_embeddings + position_embeddings
        embeddings = self.layer_norm(embeddings)
        embeddings = self.dropout(embeddings)
        return embeddings

embedding_layer = Embeddings(config)
print(embedding_layer(inputs.input_ids).size())

torch.Size([1, 5, 768])


In [20]:
# 下面将所有这些层结合起来构建完整的 Transformer Encoder：

class TransformerEncoder(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.embeddings = Embeddings(config)
        self.layers = nn.ModuleList([TransformerEncoderLayer(config)
                                     for _ in range(config.num_hidden_layers)])

    def forward(self, x, mask=None):
        x = self.embeddings(x)
        for layer in self.layers:
            x = layer(x, mask=mask)
        return x

In [25]:
# 同样地，我们对该层进行简单的测试：

encoder = TransformerEncoder(config)
print(encoder(inputs.input_ids).size())

torch.Size([1, 5, 768])


In [21]:
# Masked multi-head self-attention layer：确保在每个时间步生成的词语仅基于过去的输出和当前预测的词，否则 Decoder 相当于作弊了；
# Encoder-decoder attention layer：以解码器的中间表示作为 queries，对 encoder stack 的输出 key 和 value 向量执行 Multi-head Attention。通过这种方式，Encoder-Decoder Attention Layer 就可以学习到如何关联来自两个不同序列的词语，例如两种不同的语言。 解码器可以访问每个 block 中 Encoder 的 keys 和 values。
# 与 Encoder 中的 Mask 不同，Decoder 的 Mask 是一个下三角矩阵：

seq_len = inputs.input_ids.size(-1)
mask = torch.tril(torch.ones(seq_len, seq_len)).unsqueeze(0)
print(mask[0])

tensor([[1., 0., 0., 0., 0.],
        [1., 1., 0., 0., 0.],
        [1., 1., 1., 0., 0.],
        [1., 1., 1., 1., 0.],
        [1., 1., 1., 1., 1.]])


In [22]:
# 这里使用 PyTorch 自带的 tril() 函数来创建下三角矩阵，然后同样地，通过 Tensor.masked_fill() 将所有零替换为负无穷大来防止注意力头看到未来的词语而造成信息泄露：

scores.masked_fill(mask == 0, -float("inf"))

tensor([[[25.8183,    -inf,    -inf,    -inf,    -inf],
         [ 1.7578, 24.9309,    -inf,    -inf,    -inf],
         [ 0.8195, -1.0490, 25.2169,    -inf,    -inf],
         [-0.3305,  1.2924, -0.5281, 28.2054,    -inf],
         [ 0.1386,  0.8815,  0.5444,  0.4177, 29.1536]]],
       grad_fn=<MaskedFillBackward0>)