In [1]:
import torch
from torch import nn
from d2l import torch as d2l

# BERT 来自Transformer的双向编码器表示

## 1. 输入表示

In [2]:
# 获取输入tokens和对应片段编号（A段为0，B段为1），适配BERT输入结构
def get_tokens_and_segments(tokens_a, tokens_b=None):
    """获取输入序列的词元及其片段索引"""
    tokens = ['<cls>'] + tokens_a + ['<sep>']  # 开头加<cls>，结尾加<sep>
    segments = [0] * (len(tokens_a) + 2)  # 对应tokens_a的全部是0
    if tokens_b is not None:  # 如果有第二句（B段）
        tokens += tokens_b + ['<sep>']  # 加B段和分隔符
        segments += [1] * (len(tokens_b) + 1)  # B段的全部用1表示
    return tokens, segments


## 2. BERT编码器实现

In [3]:
# BERT编码器，输入词元索引和片段信息，输出每个位置的上下文特征
class BERTEncoder(nn.Module):
    """BERT编码器"""

    def __init__(self, vocab_size, num_hiddens, norm_shape, ffn_num_input,
                 ffn_num_hiddens, num_heads, num_layers, dropout,
                 max_len=1000, key_size=768, query_size=768, value_size=768,
                 **kwargs):
        super(BERTEncoder, self).__init__(**kwargs)
        # 词嵌入层：把词编号映射到稠密向量
        self.token_embedding = nn.Embedding(vocab_size, num_hiddens)
        # 片段嵌入：区分A/B句
        self.segment_embedding = nn.Embedding(2, num_hiddens)
        # 编码器堆叠多层Transformer block
        self.blks = nn.Sequential()
        for i in range(num_layers):
            self.blks.add_module(f"{i}", d2l.EncoderBlock(
                key_size, query_size, value_size, num_hiddens, norm_shape,
                ffn_num_input, ffn_num_hiddens, num_heads, dropout, True))
        # 可学习的位置嵌入（不像transformer原版那样直接用三角函数）
        self.pos_embedding = nn.Parameter(torch.randn(1, max_len, num_hiddens))

    def forward(self, tokens, segments, valid_lens):
        # 加和词嵌入、片段嵌入、位置嵌入
        X = self.token_embedding(tokens) + self.segment_embedding(segments)
        X = X + self.pos_embedding.data[:, :X.shape[1], :]
        # 经过多层transformer block编码
        for blk in self.blks:
            X = blk(X, valid_lens)
        return X


## 3. 运行实例：初始化BERTEncoder并编码输入

In [4]:
# 设定超参数并实例化编码器
vocab_size, num_hiddens, ffn_num_hiddens, num_heads = 10000, 768, 1024, 4
norm_shape, ffn_num_input, num_layers, dropout = [768], 768, 2, 0.2
encoder = BERTEncoder(vocab_size, num_hiddens, norm_shape, ffn_num_input,
                      ffn_num_hiddens, num_heads, num_layers, dropout)

# 伪造一批tokens和片段索引输入（batch_size=2，长度=8）
tokens = torch.randint(0, vocab_size, (2, 8))
segments = torch.tensor([[0, 0, 0, 0, 1, 1, 1, 1], [0, 0, 0, 1, 1, 1, 1, 1]])
# 编码输出，每个token都获得一个特征向量
encoded_X = encoder(tokens, segments, None)
encoded_X.shape  # [2, 8, 768]


# 2. 预训练任务
## 4. 掩蔽语言模型（MLM，Masked Language Modeling）

In [6]:
# BERT的MLM头，用于预测被掩蔽的词
class MaskLM(nn.Module):
    """BERT的掩蔽语言模型任务"""
    def __init__(self, vocab_size, num_hiddens, num_inputs=768, **kwargs):
        super(MaskLM, self).__init__(**kwargs)
        # 一个两层MLP，最后输出为vocab_size分类
        self.mlp = nn.Sequential(
            nn.Linear(num_inputs, num_hiddens),  # 隐藏层
            nn.ReLU(),
            nn.LayerNorm(num_hiddens),  # 层归一化
            nn.Linear(num_hiddens, vocab_size))  # 输出vocab_size

    def forward(self, X, pred_positions):
        # X: [batch, seq_len, hidden]; pred_positions: [batch, num_pred]
        num_pred_positions = pred_positions.shape[1]
        pred_positions = pred_positions.reshape(-1)  # 展平成一维
        batch_size = X.shape[0]
        # batch_idx生成每个预测位置对应的batch号
        batch_idx = torch.arange(0, batch_size)
        batch_idx = torch.repeat_interleave(batch_idx, num_pred_positions)
        # 挑选所有被mask的位置的特征
        masked_X = X[batch_idx, pred_positions]
        masked_X = masked_X.reshape((batch_size, num_pred_positions, -1))
        # 送入MLP，输出词表上的概率
        mlm_Y_hat = self.mlp(masked_X)
        return mlm_Y_hat


## 5. 掩蔽任务前向、损失计算示例

In [7]:
mlm = MaskLM(vocab_size, num_hiddens)
mlm_positions = torch.tensor([[1, 5, 2], [6, 1, 5]])  # batch中各自mask的位置
mlm_Y_hat = mlm(encoded_X, mlm_positions)  # 预测结果 shape: [2, 3, vocab_size]
mlm_Y_hat.shape

mlm_Y = torch.tensor([[7, 8, 9], [10, 20, 30]])  # 真实标签（被mask位置的正确词）
loss = nn.CrossEntropyLoss(reduction='none')
mlm_l = loss(mlm_Y_hat.reshape((-1, vocab_size)), mlm_Y.reshape(-1))  # loss shape: [6]
mlm_l.shape


torch.Size([2, 3, 10000])

## 6. 下一句预测（NSP，Next Sentence Prediction）

In [9]:
# BERT的NSP头（简单的二分类器）
class NextSentencePred(nn.Module):
    """BERT的下一句预测任务"""

    def __init__(self, num_inputs, **kwargs):
        super(NextSentencePred, self).__init__(**kwargs)
        # 只需一层线性输出2类
        self.output = nn.Linear(num_inputs, 2)

    def forward(self, X):
        # X: (batch_size, num_hiddens) 只用CLS特征
        return self.output(X)

In [10]:
encoded_X = torch.flatten(encoded_X, start_dim=1)  # flatten保证输入合适
nsp = NextSentencePred(encoded_X.shape[-1])
nsp_Y_hat = nsp(encoded_X)
nsp_Y_hat.shape
nsp_y = torch.tensor([0, 1])  # 0/1为标签
nsp_l = loss(nsp_Y_hat, nsp_y)
nsp_l.shape

torch.Size([2, 2])

## 7. BERT完整模型整合

In [12]:
# 组合编码器、MLM头和NSP头，组成完整BERT
class BERTModel(nn.Module):
    """BERT模型"""

    def __init__(self, vocab_size, num_hiddens, norm_shape, ffn_num_input,
                 ffn_num_hiddens, num_heads, num_layers, dropout,
                 max_len=1000, key_size=768, query_size=768, value_size=768,
                 hid_in_features=768, mlm_in_features=768,
                 nsp_in_features=768):
        super(BERTModel, self).__init__()
        # 编码器
        self.encoder = BERTEncoder(vocab_size, num_hiddens, norm_shape,
                                   ffn_num_input, ffn_num_hiddens, num_heads, num_layers,
                                   dropout, max_len=max_len, key_size=key_size,
                                   query_size=query_size, value_size=value_size)
        # 对CLS位置做一个线性映射再激活，为NSP做准备
        self.hidden = nn.Sequential(
            nn.Linear(hid_in_features, num_hiddens),
            nn.Tanh())
        # 掩码任务和下一句任务
        self.mlm = MaskLM(vocab_size, num_hiddens, mlm_in_features)
        self.nsp = NextSentencePred(nsp_in_features)

    def forward(self, tokens, segments, valid_lens=None, pred_positions=None):
        # 先编码
        encoded_X = self.encoder(tokens, segments, valid_lens)
        # 掩码任务
        if pred_positions is not None:
            mlm_Y_hat = self.mlm(encoded_X, pred_positions)
        else:
            mlm_Y_hat = None
        # 下一句预测用CLS位置（0号）的特征做二分类
        nsp_Y_hat = self.nsp(self.hidden(encoded_X[:, 0, :]))
        return encoded_X, mlm_Y_hat, nsp_Y_hat
