# 微调 LLM：实现抽取式问答

> 指导文章：[22a. 微调 LLM：实现抽取式问答](../Guide/22a.%20微调%20LLM：实现抽取式问答.md) | 作业文章：[22b. 作业 - Bert 微调抽取式问答](../Guide/22b.%20作业%20-%20Bert%20微调抽取式问答.md)

**在线链接**：[Kaggle](https://www.kaggle.com/code/aidemos/21a-llm) | [Colab](https://colab.research.google.com/drive/1jgdoO7fKk7Tsn2yi28ytsDQ8VXdthnIm?usp=sharing)

## 前言

**预训练 + 微调**是一个非常主流的范式，适用于各种下游任务，如文本分类、命名实体识别、机器翻译等。在这篇文章中，我们将以**抽取式问答任务**为例，再次尝试微调预训练模型。

首先，了解什么是**抽取式问答**：根据「给定的问题」和「**包含**答案的文本」，从中**抽取**出对应的答案片段，**不需要生成新的词语**。

**举例说明**：

- **文本**：`BERT 是由 Google 提出的预训练语言模型，它在多个 NLP 任务上取得了 SOTA 的成绩。`
- **问题**：`谁提出了 BERT？`
- **答案**：`Google`

> 如果去掉“抽取式”的限定，广义上的“问答”更接近于**生成式问答（Generative Question Answering）**，即答案并非固定的文本片段，模型基于理解进行**生成**，最终的答案不拘泥于特定的文本。
>
> **举例说明**：
>
> - **文本**：同上。
> - **问题**：同上。
> - **答案**：`BERT 是由 Google 提出的预训练语言模型。具体来说，它是由 Jacob Devlin 等研究人员在 2018 年的论文《BERT: Pre-training of Deep Bidirectional Transformers for Language Understanding》中首次介绍的。BERT 在多个 NLP 任务上取得了 SOTA（State-of-the-Art）的成绩，推动了自然语言处理领域的快速发展。`（该答案由 GPT-4o 生成）

#### Q: 模型怎么完成抽取式问答任务？输出是什么？

停下来思考一下，是直接生成答案对应的词或句子吗？

**不是**，输出的是**答案在文本中的起始和结束位置**。通过下图进行理解：

> ![Extractive-QA-model](../Guide/assets/Extractive-QA-model.png)

模型的最终输出为两个向量：起始位置得分向量 $\mathbf{s} \in \mathbb{R}^N$ 和结束位置得分向量 $\mathbf{e} \in \mathbb{R}^N$，其中 $N$ 是输入序列的长度。

对于每个位置 $i$，模型计算其作为答案起始位置和结束位置的得分：

$$
\begin{aligned}
s_i &= \mathbf{w}_{\text{start}} \mathbf{h}_i + b_{\text{start}} \\
e_i &= \mathbf{w}_{\text{end}} \mathbf{h}_i + b_{\text{end}}
\end{aligned}
$$

其中, $\mathbf{h}_i \in \mathbb{R}^H$ 是编码器在位置 $i$ 的隐藏状态输出 ($\mathbf{h}$ 就是 BERT 模型的最终输出), $H$ 是隐藏层的维度。$\mathbf{w}_{\text{start}} \in \mathbb{R}^H$ 和 $\mathbf{w}_{\text{end}} \in \mathbb{R}^H$ 是权重向量（对应于 `nn.Linear(H, 1)`，这里写成了常见的数学形式，了解线性层代码的同学可以当做 $\mathbf{h}\mathbf{w}^\top$）, $b_{\text{start}}$ 和 $b_{\text{end}}$ 是偏置项。

然后，对得分向量进行 softmax 操作，得到每个位置作为起始和结束位置的概率分布：

$$
\begin{aligned}
P_{\text{start}}(i) &= \frac{e^{s_i}}{\sum_{j=1}^{N} e^{s_j}} \\
P_{\text{end}}(i) &= \frac{e^{e_i}}{\sum_{j=1}^{N} e^{e_j}}
\end{aligned}
$$

在推理时，选择具有最高概率的起始位置 $\hat{s}$ 和结束位置 $\hat{e}$。为了保证答案的合理性，通常要求 $\hat{s} \leq \hat{e}$，并且答案的长度不超过预设的最大长度 $L_{\text{max}}$。此时的行为称为后处理（Postprocessing），根据实际需求进行。

最终，答案就是输入序列中从位置 $\hat{s}$ 到 $\hat{e}$ 的片段，即：

$$
\text{Answer} = \text{Input}[\hat{s}:\hat{e}]
$$


## 前置准备

### 下载数据集

In [None]:
# 通过以下命令下载数据集
!wget https://github.com/Hoper-J/HUNG-YI_LEE_Machine-Learning_Homework/raw/refs/heads/master/HW07/ml2023spring-hw7.zip
!unzip ml2023spring-hw7.zip

### 安装库 

In [None]:
!uv add transformers
!uv add accelerate

### 导入库



In [None]:
# ========== 标准库模块 ==========
import json
import random

# ========== 第三方库 ==========
import numpy as np
from tqdm.auto import tqdm

# ========== 深度学习相关库 ==========
import torch
from torch.utils.data import DataLoader, Dataset
from torch.optim.lr_scheduler import LambdaLR

# Transformers (Hugging Face)
from transformers import (
    AdamW,
    AutoTokenizer,
    AutoModelForQuestionAnswering,
    get_linear_schedule_with_warmup
)

# 加速库
from accelerate import Accelerator

### 设置设备和随机数种子

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
# For Mac M1, M2...
# device = "mps" if torch.backends.mps.is_available() else ("cuda" if torch.cuda.is_available() else "cpu")

def same_seeds(seed):
    """
    设置随机种子以确保结果的可复现性。

    参数：
    - seed (int): 要设置的随机种子值。
    """
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
    np.random.seed(seed)
    random.seed(seed)
    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.deterministic = True

## 加载模型和分词器


In [None]:
model = AutoModelForQuestionAnswering.from_pretrained("bert-base-chinese").to(device)
tokenizer = AutoTokenizer.from_pretrained("bert-base-chinese")

# 预训练模型也可以换成其他的
# model = AutoModelForQuestionAnswering.from_pretrained("luhua/chinese_pretrain_mrc_macbert_large").to(device)
# tokenizer = AutoTokenizer.from_pretrained("luhua/chinese_pretrain_mrc_macbert_large")

# 你可以忽略警告消息（它弹出是因为新的 QA 预测头是随机初始化的）

### Q: 什么是 `AutoModelForQuestionAnswering`？

`AutoModelForQuestionAnswering` 是 Hugging Face 提供的自动模型加载类，除了加载指定模型的预训练权重之外，**它会在模型的顶层添加一个用于问答任务的输出层**，用于预测答案的起始位置和结束位置。如果模型本身已经包含用于问答任务的输出层，则会加载相应的权重；如果没有，则会在顶层添加一个**新的输出层**，随机初始化权重（这时候会有警告信息）。

不妨打印看看这个输出层到底是什么：

In [None]:
print(model.qa_outputs)
# print(model)  # 如果感兴趣，也可以打印整个模型

## 数据部分

> 两个繁体中文阅读理解数据集：[DRCD](https://github.com/DRCKnowledgeTeam/DRCD) 和 [ODSQA](https://github.com/Chia-Hsuan-Lee/ODSQA)。

- **训练集（DRCD + DRCD-backtrans）**：包含 15,329 个段落和 26,918 个问题。一个段落可能对应多个问题。
- **开发集（DRCD + DRCD-backtrans）**：包含 1,255 个段落和 2,863 个问题。用于验证。
- **测试集（DRCD + ODSQA）**：包含 1,606 个段落和 3,504 个问题。测试集的段落没有提供答案，需要模型进行预测。

所有数据集的格式相同：

- `id`：问题编号
- `paragraph_id`：段落编号
- `question_text`：问题文本
- `answer_text`：答案文本
- `answer_start`：答案在段落中的起始字符位置
- `answer_end`：答案在段落中的结束字符位置

![数据集格式](../Guide/assets/4215768313590de87aab01adcad78c90.png)

### 读取数据

In [None]:
def read_data(file):
    with open(file, 'r', encoding="utf-8") as reader:
        data = json.load(reader)
    return data["questions"], data["paragraphs"]

train_questions, train_paragraphs = read_data("hw7_train.json")
dev_questions, dev_paragraphs = read_data("hw7_dev.json")
test_questions, test_paragraphs = read_data("hw7_test.json")

### 分词处理

In [None]:
# 分别对问题和段落进行分词
# 设置 add_special_tokens=False，因为在自定义数据集 QA_Dataset 的 __getitem__ 中会手动添加特殊标记

train_questions_tokenized = tokenizer(
    [q["question_text"] for q in train_questions], add_special_tokens=False
)
dev_questions_tokenized = tokenizer(
    [q["question_text"] for q in dev_questions], add_special_tokens=False
)
test_questions_tokenized = tokenizer(
    [q["question_text"] for q in test_questions], add_special_tokens=False
)

train_paragraphs_tokenized = tokenizer(train_paragraphs, add_special_tokens=False)
dev_paragraphs_tokenized = tokenizer(dev_paragraphs, add_special_tokens=False)
test_paragraphs_tokenized = tokenizer(test_paragraphs, add_special_tokens=False)

### 自定义数据集处理

以下代码定义了一个 `QA_Dataset` 类，用于处理问答数据。

In [None]:
class QA_Dataset(Dataset):
    """
    自定义的问答数据集类，用于处理问答任务的数据。

    参数：
    - split (str): 数据集的类型，'train'、'dev' 或 'test'。
    - questions (list): 问题列表，每个元素是一个字典，包含问题的详细信息。
    - tokenized_questions (BatchEncoding): 分词后的问题，由 tokenizer 生成。
    - tokenized_paragraphs (BatchEncoding): 分词后的段落列表，由 tokenizer 生成。
    
    属性（即 __init_() 中的 self.xxx）：
    - max_question_len (int): 问题的最大长度（以分词后的 token 数计）。
    - max_paragraph_len (int): 段落的最大长度（以分词后的 token 数计）。
    - doc_stride (int): 段落窗口滑动步长。
    - max_seq_len (int): 输入序列的最大长度。
    """
    
    def __init__(self, split, questions, tokenized_questions, tokenized_paragraphs):
        self.split = split
        self.questions = questions
        self.tokenized_questions = tokenized_questions
        self.tokenized_paragraphs = tokenized_paragraphs
        self.max_question_len = 60
        self.max_paragraph_len = 150

        # 设置段落窗口滑动步长为段落最大长度的 10%
        self.doc_stride = int(self.max_paragraph_len * 0.1)

        # 输入序列长度 = [CLS] + question + [SEP] + paragraph + [SEP]
        self.max_seq_len = 1 + self.max_question_len + 1 + self.max_paragraph_len + 1

    def __len__(self):
        """
        返回数据集中样本的数量。

        返回：
        - (int): 数据集的长度。
        """
        return len(self.questions)

    def __getitem__(self, idx):
        """
        获取数据集中指定索引的样本。

        参数：
        - idx (int): 样本的索引。

        返回：
        - 对于训练集，返回一个输入张量和对应的答案位置：
          (input_ids, token_type_ids, attention_mask, answer_start_token, answer_end_token)
        - 对于验证和测试集，返回包含多个窗口的输入张量列表：
          (input_ids_list, token_type_ids_list, attention_mask_list)
        """
        question = self.questions[idx]
        tokenized_question = self.tokenized_questions[idx]
        tokenized_paragraph = self.tokenized_paragraphs[question["paragraph_id"]]

        ##### 预处理 #####
        if self.split == "train":
            # 将答案在段落文本中的起始/结束位置转换为在分词后段落中的起始/结束位置
            answer_start_token = tokenized_paragraph.char_to_token(question["answer_start"])
            answer_end_token = tokenized_paragraph.char_to_token(question["answer_end"])

            # 防止模型学习到「答案总是位于中间的位置」，加入随机偏移
            mid = (answer_start_token + answer_end_token) // 2
            max_offset = self.max_paragraph_len // 2   # 最大偏移量为段落长度的1/2，这是可调的
            random_offset = np.random.randint(-max_offset, max_offset)  # 在 [-max_offset, +max_offset] 范围内随机选择偏移量
            paragraph_start = max(0, min(mid + random_offset - self.max_paragraph_len // 2, len(tokenized_paragraph) - self.max_paragraph_len))
            paragraph_end = paragraph_start + self.max_paragraph_len
            
            # 切片问题/段落，并添加特殊标记（101：CLS，102：SEP）
            input_ids_question = [101] + tokenized_question.ids[:self.max_question_len] + [102]
            # ... = [tokenizer.cls_token_id] + tokenized_question.ids[: self.max_question_len] + [tokenizer.sep_token_id]
            input_ids_paragraph = tokenized_paragraph.ids[paragraph_start : paragraph_end] + [102]
            # ... = ... + [tokenizer.sep_token_id]

            # 将答案在分词后段落中的起始/结束位置转换为窗口中的起始/结束位置
            answer_start_token += len(input_ids_question) - paragraph_start
            answer_end_token += len(input_ids_question) - paragraph_start

            # 填充序列，生成模型的输入
            input_ids, token_type_ids, attention_mask = self.padding(input_ids_question, input_ids_paragraph)
            
            return torch.tensor(input_ids), torch.tensor(token_type_ids), torch.tensor(attention_mask), answer_start_token, answer_end_token

        else:
            # 验证集和测试集的处理
            input_ids_list, token_type_ids_list, attention_mask_list = [], [], []

            # 段落被分割成多个窗口，每个窗口的起始位置由步长 "doc_stride" 分隔
            for i in range(0, len(tokenized_paragraph), self.doc_stride):
                # 切片问题/段落并添加特殊标记（101：CLS，102：SEP）
                input_ids_question = [101] + tokenized_question.ids[:self.max_question_len] + [102]
                # ... = [tokenizer.cls_token_id] + tokenized_question.ids[: self.max_question_len] + [tokenizer.sep_token_id]
                input_ids_paragraph = tokenized_paragraph.ids[i : i + self.max_paragraph_len] + [102]
                # ... = ... + [tokenizer.sep_token_id]

                # 填充序列，生成模型的输入
                input_ids, token_type_ids, attention_mask = self.padding(input_ids_question, input_ids_paragraph)

                input_ids_list.append(input_ids)
                token_type_ids_list.append(token_type_ids)
                attention_mask_list.append(attention_mask)

            return torch.tensor(input_ids_list), torch.tensor(token_type_ids_list), torch.tensor(attention_mask_list)

    def padding(self, input_ids_question, input_ids_paragraph):
        """
        对输入的序列进行填充，生成统一长度的模型输入。

        参数：
        - input_ids_question (list): 问题部分的输入 ID 列表。
        - input_ids_paragraph (list): 段落部分的输入 ID 列表。

        返回：
        - input_ids (list): 填充后的输入 ID 列表。
        - token_type_ids (list): 区分问题和段落的标记列表。
        - attention_mask (list): 注意力掩码列表，指示哪些位置是有效的输入。
        """
        # 计算需要填充的长度
        padding_len = self.max_seq_len - len(input_ids_question) - len(input_ids_paragraph)
        # 填充输入序列
        input_ids = input_ids_question + input_ids_paragraph + [0] * padding_len
        # 构造区分问题和段落的 token_type_ids
        token_type_ids = [0] * len(input_ids_question) + [1] * len(input_ids_paragraph) + [0] * padding_len
        # 构造注意力掩码，有效位置为 1，填充位置为 0
        attention_mask = [1] * (len(input_ids_question) + len(input_ids_paragraph)) + [0] * padding_len

        return input_ids, token_type_ids, attention_mask

train_set = QA_Dataset("train", train_questions, train_questions_tokenized, train_paragraphs_tokenized)
dev_set = QA_Dataset("dev", dev_questions, dev_questions_tokenized, dev_paragraphs_tokenized)
test_set = QA_Dataset("test", test_questions, test_questions_tokenized, test_paragraphs_tokenized)

**解释**：

- **训练集处理**：定位答案的起始和结束位置，将包含答案的段落部分截取为一个窗口（引入随机偏移，防止模型过拟合于答案总在中间的位置）。然后将问题和段落合并为一个输入序列，并进行填充。

- **验证/测试集处理**：将段落分成多个窗口，每个窗口之间的步长由 `self.doc_stride` 决定，然后将每个窗口作为模型的输入。验证和测试时不需要答案位置，因此只需生成多个窗口作为输入。

  - `self.doc_stride` 通过控制窗口之间的滑动步长。

  - **训练阶段**不需要使用 `doc_stride`，因为训练时我们已经知道答案的位置，可以直接截取包含答案的窗口。但在**验证和测试**阶段，由于模型并不知道答案的位置，`doc_stride` 保证每个窗口之间有足够的重叠（overlap），减少遗漏答案。

## 评估函数

In [None]:
def evaluate(data, output):
    """
    对模型的输出进行后处理，获取预测的答案文本。

    参数：
    - data (tuple): 包含输入数据的元组，(input_ids, token_type_ids, attention_mask)。
    - output (transformers.modeling_outputs.QuestionAnsweringModelOutput): 模型的输出结果。

    返回：
    - answer (str): 模型预测的答案文本。
    """
    answer = ''
    max_prob = float('-inf')
    num_of_windows = data[0].shape[1]

    for k in range(num_of_windows):
        # 通过选择最可能的起始位置/结束位置来获得答案
        start_prob, start_index = torch.max(output.start_logits[k], dim=0)
        end_prob, end_index = torch.max(output.end_logits[k], dim=0)

        # 确保起始位置索引小于或等于结束位置索引，避免选择错误的起始和结束位置对
        if start_index <= end_index:
            # 答案的概率计算为 start_prob 和 end_prob 的和
            prob = start_prob + end_prob

            # 如果当前窗口的答案具有更高的概率，则更新结果
            if prob > max_prob:
                max_prob = prob
                # 将标记转换为字符（例如，[1920, 7032] --> "大 金"）
                answer = tokenizer.decode(data[0][0][k][start_index : end_index + 1])
        else:
            # 如果起始位置索引 > 结束位置索引，则跳过此对（可能是错误情况）
            continue
    # 移除答案中的空格（例如，"大 金" --> "大金"）
    return answer.replace(' ','')

## 训练部分

### 设置超参数

In [None]:
# 超参数
num_epoch = 1  # 训练的轮数
validation = True  # 是否在每个 epoch 结束后进行验证
logging_step = 100  # 每隔多少步打印一次训练日志
learning_rate = 1e-5  # 学习率
train_batch_size = 8  # 训练时的批次大小

# 优化器
optimizer = AdamW(model.parameters(), lr=learning_rate)

# 数据加载器
# 注意：不要更改 dev_loader / test_loader 的批次大小！
# 虽然批次大小=1，但它实际上是由同一对 QA 的多个窗口组成的批次
train_loader = DataLoader(train_set, batch_size=train_batch_size, shuffle=True, pin_memory=True)
dev_loader = DataLoader(dev_set, batch_size=1, shuffle=False, pin_memory=True)
test_loader = DataLoader(test_set, batch_size=1, shuffle=False, pin_memory=True)

### 学习率调度器

#### 带有 Warmup 的线性衰减

使用调度器一般可以加速模型的收敛速度，不同的 Warmup 比例对学习率的影响：
![不同 Warmup 比例的学习率曲线](../Guide/assets/20240920020716.png)

In [None]:
# 总训练步数
total_steps = len(train_loader) * num_epoch
num_warmup_steps = int(0.2 * total_steps)  # TODO: 调整 warmup 步数的比率

# [Hugging Face] 应用带有 warmup 的线性学习率衰减
scheduler = get_linear_schedule_with_warmup(
    optimizer, num_warmup_steps=num_warmup_steps, num_training_steps=total_steps
)

# # [PyTorch] 替代方法：应用不带 warmup 的线性学习率衰减
# # lr_lambda 自定义学习率随时间衰减（此处是简单的线性衰减）
# lr_lambda = lambda step: max(0.0, 1.0 - step / total_steps)
# scheduler = LambdaLR(optimizer, lr_lambda=lr_lambda)

### 设置 Accelerator


In [None]:
#### 梯度累积（可选）####
# 注意：train_batch_size * gradient_accumulation_steps = 有效批次大小
# 如果 CUDA 内存不足，你可以降低 train_batch_size 并提高 gradient_accumulation_steps
# 文档：https://huggingface.co/docs/accelerate/usage_guides/gradient_accumulation
gradient_accumulation_steps = 1

# 将 "fp16_training" 更改为 True 以支持自动混合精度训练（fp16）
fp16_training = True
if fp16_training:
    accelerator = Accelerator(mixed_precision="fp16", gradient_accumulation_steps=gradient_accumulation_steps)
else:
    accelerator = Accelerator(gradient_accumulation_steps=gradient_accumulation_steps)

model, optimizer, train_loader, scheduler = accelerator.prepare(model, optimizer, train_loader, scheduler)

### 开始训练

In [None]:
model.train()

print("开始训练...")

for epoch in range(num_epoch):
    step = 1
    train_loss = train_acc = 0

    for data in tqdm(train_loader):
        with accelerator.accumulate(model):
            # 数据已经通过 accelerator.prepare() 移动到设备
            #data = [i.to(device) for i in data]

            # 模型输入：input_ids, token_type_ids, attention_mask, start_positions, end_positions（注意：只有 "input_ids" 是必需的）
            # 模型输出：start_logits, end_logits, loss（提供 start_positions/end_positions 时返回）
            output = model(
                input_ids=data[0],
                token_type_ids=data[1],
                attention_mask=data[2],
                start_positions=data[3],
                end_positions=data[4]
            )
            # 选择最可能的起始位置/结束位置
            start_index = torch.argmax(output.start_logits, dim=1)
            end_index = torch.argmax(output.end_logits, dim=1)

            # 只有当 start_index 和 end_index 都正确时，预测才正确
            train_acc += ((start_index == data[3]) & (end_index == data[4])).float().mean()

            train_loss += output.loss

            accelerator.backward(output.loss)

            step += 1
            optimizer.step()
            scheduler.step()
            optimizer.zero_grad()

            # 每经过 logging_step，打印训练损失和准确率（实际上，使用了梯度累积后的 step 应该除以对应的数量进行校正）
            if step % logging_step == 0:
                print(f"Epoch {epoch + 1} | Step {step} | loss = {train_loss.item() / logging_step:.3f}, acc = {train_acc / logging_step:.3f}")
                train_loss = train_acc = 0

    if validation:
        print("评估开发集...")
        model.eval()
        with torch.no_grad():
            dev_acc = 0
            for i, data in enumerate(tqdm(dev_loader)):
                # 这里保留了 device 的使用
                output = model(
                    input_ids=data[0].squeeze(0).to(device),
                    token_type_ids=data[1].squeeze(0).to(device),
                    attention_mask=data[2].squeeze(0).to(device),
                )
                # 只有当答案文本完全匹配时，预测才正确
                dev_acc += evaluate(data, output) == dev_questions[i]["answer_text"]
            print(f"Validation | Epoch {epoch + 1} | acc = {dev_acc / len(dev_loader):.3f}")
        model.train()

# 将模型及其配置文件保存到目录「saved_model」
# 即，在目录「saved_model」下有两个文件：「pytorch_model.bin」和「config.json」
# 可以使用「model = BertForQuestionAnswering.from_pretrained("saved_model")」重新加载保存的模型
print("保存模型...")
model_save_dir = "saved_model"
model.save_pretrained(model_save_dir)
#tokenizer.save_pretrained(model_save_dir)

## 测试部分

In [None]:
model.eval()

print("评估测试集...")

result = []

with torch.no_grad():
    for data in tqdm(test_loader):
        output = model(
            input_ids=data[0].squeeze(dim=0).to(device),
            token_type_ids=data[1].squeeze(dim=0).to(device),
            attention_mask=data[2].squeeze(dim=0).to(device)
        )
        result.append(evaluate(data, output))

result_file = "result.csv"
with open(result_file, 'w') as f:
    f.write("ID,Answer\n")
    for i, test_question in enumerate(test_questions):
        # 将答案中的逗号替换为空字符串（因为 csv 以逗号分隔），这一步处理只是为了作业提交到 Kaggle 上，实际上不一定需要保存为 .csv 文件
        f.write(f"{test_question['id']},{result[i].replace(',','')}\n")

print(f"完成！结果保存在 {result_file}")