# 用于处理多模态RAG图文问答挑战赛训练集JSON文件，提取问题与答案构建QA对
本notebook将演示如何将原始训练集转换为标准QA格式，便于后续微调。

In [1]:
#

In [None]:
import json
import os
import pandas as pd
import re
from typing import Dict, List, Any
from transformers import pipeline

# 金融领域实体词表（可根据实际数据扩展）
FINANCIAL_ENTITIES = {
    "指标": ["营收", "净利润", "毛利率", "资产负债率", "ROE", "同比增长率", "环比增长率"],
    "主体": ["公司", "企业", "银行", "基金", "股票", "债券"],
    "文档类型": ["财报", "研报", "年报", "季报", "公告"]
}

def extract_finance_entities(text: str) -> Dict[str, List[str]]:
    """提取文本中的金融实体，增强领域对齐"""
    entities = {"指标": [], "主体": [], "文档类型": []}
    for category, keywords in FINANCIAL_ENTITIES.items():
        for keyword in keywords:
            if re.search(rf"\b{re.escape(keyword)}\b", text):
                entities[category].append(keyword)
    # 去重并保持顺序
    for k in entities:
        entities[k] = list(dict.fromkeys(entities[k]))
    return entities

def is_answer_consistent(question: str, answer: str, context: str = "") -> bool:
    """增强版答案校验：确保答案非空、含来源且与上下文一致"""
    if not answer.strip():
        return False

    # 检查是否包含来源引用（匹配文件名和页码格式）
    if not re.search(r"(.+?\.pdf).*?(第\d+页)", answer):
        # 若未直接引用，检查是否能从上下文中找到依据
        if context and not any(ent in answer for ent in extract_finance_entities(context).get("指标", [])):
            return False
        return True  # 允许无显式引用但有实体关联的情况
    return True

def augment_data(context):
    """基于财报内容生成问题，增强训练数据"""
    try:
        qg_pipeline = pipeline("text2text-generation", model="lmqg/t5-base-finance-qg")
        questions = qg_pipeline(context, max_length=100, num_return_sequences=2)  # 生成2个问题
        return [{"context": context, "question": q["generated_text"], "answer": ""} for q in questions]
    except Exception as e:
        print(f"问题生成失败: {str(e)}")
        return []

def build_finance_sft_data(raw_data: List[Dict[str, Any]], chunk_data_path: str = "all_pdf_page_chunks.json") -> List[Dict[str, Any]]:
    """构建增强版金融SFT数据，关联PDF上下文"""
    # 加载PDF分页内容（从步骤1处理结果获取）
    chunk_context = {}
    if os.path.exists(chunk_data_path):
        with open(chunk_data_path, "r", encoding="utf-8") as f:
            chunks = json.load(f)
            for chunk in chunks:
                key = f"{chunk['metadata']['file_name']}_page_{chunk['metadata']['page']}"
                chunk_context[key] = chunk['content']
        print(f"已加载 {len(chunk_context)} 条PDF分页上下文")

    sft_data = []
    # 先处理原始数据
    for item in raw_data:
        # 1. 金融助手角色强化
        system_prompt = (
            "你是专业金融分析师助手，需基于财报、研报等金融文档内容回答问题。"
            "要求：1. 严格使用规范金融术语；2. 数据需标注来源（如XX.pdf第X页）；"
            "3. 区分事实陈述与分析观点；4. 图表相关问题需结合数据趋势解读。"
        )

        # 2. 细化金融任务类型
        question = item.get("question", "").lower()
        task_type = "金融问答"
        if any(ind in question for ind in FINANCIAL_ENTITIES["指标"]):
            task_type = "财务指标提取"
        elif "同比" in question or "环比" in question or "趋势" in question:
            task_type = "数据趋势分析"
        elif "解释" in question or "含义" in question:
            task_type = "金融术语解读"
        elif "图表" in question or "图形" in question or "表格" in question:
            task_type = "图文分析"  # 适配多模态场景

        # 3. 获取关联的PDF上下文
        file_name = item.get("file_name", "未知")
        page = item.get("page", "未知")
        context_key = f"{file_name}_page_{page}"
        context = chunk_context.get(context_key, "")

        # 4. 答案一致性校验（结合上下文）
        answer = item.get("answer", "")
        if not is_answer_consistent(question, answer, context):
            print(f"过滤不一致样本: {question[:30]}...")
            continue

        # 5. 提取金融实体
        entities = extract_finance_entities(question + " " + answer)

        sft_data.append({
            "system": system_prompt,
            "instruction": question,
            "context": context,  # 新增：关联的PDF上下文
            "output": answer,
            "task_type": task_type,
            "entities": entities,  # 新增：提取的金融实体
            "metadata": {
                "file_name": file_name,
                "page": page,
                "has_image": "图表" in question or "图形" in question  # 标记多模态样本
            }
        })

    # 数据增强：基于上下文生成新问题
    print("开始进行数据增强...")
    augmented_samples = []
    for item in sft_data:
        if item["context"]:  # 只有有上下文的样本才进行增强
            generated = augment_data(item["context"])
            for gen in generated:
                # 复用原样本的元数据和上下文
                augmented_samples.append({
                    "system": item["system"],
                    "instruction": gen["question"],
                    "context": item["context"],
                    "output": item["output"],  # 使用原样本答案作为参考
                    "task_type": item["task_type"],
                    "entities": item["entities"],
                    "metadata": item["metadata"]
                })

    print(f"数据增强完成，新增 {len(augmented_samples)} 个样本")
    sft_data.extend(augmented_samples)

    return sft_data

def main():
    # 1. 定义文件路径（使用提供的绝对路径）
    train_data_path = r"D:\AI读pdf\spark_multi_rag\data\train.json"
    test_data_path = r"D:\AI读pdf\spark_multi_rag\data\test.json"
    chunk_data_path = r"D:\AI读pdf\spark_multi_rag\all_pdf_page_chunks.json"  # 步骤1处理结果
    output_dir = r"D:\AI读pdf\spark_multi_rag\data"

    # 2. 加载原始数据集（比赛训练集）
    if not os.path.exists(train_data_path):
        raise FileNotFoundError(f"未找到训练集: {train_data_path}，请检查文件路径")

    with open(train_data_path, "r", encoding="utf-8") as f:
        raw_data = json.load(f)
    print(f"原始样本数量: {len(raw_data)}")
    if raw_data:
        print("示例原始样本:", json.dumps(raw_data[0], ensure_ascii=False)[:200] + "...")

    # 3. 检查PDF上下文文件
    if not os.path.exists(chunk_data_path):
        print(f"警告：未找到PDF上下文文件 {chunk_data_path}，请先运行步骤1的PDF处理脚本")

    # 4. 构建金融SFT数据
    finance_sft_data = build_finance_sft_data(raw_data, chunk_data_path)
    print(f"处理后SFT样本数量: {len(finance_sft_data)}")
    if finance_sft_data:
        print("示例SFT样本:", json.dumps(finance_sft_data[0], ensure_ascii=False)[:300] + "...")

    # 5. 保存处理结果
    os.makedirs(output_dir, exist_ok=True)

    # 保存为JSON（供微调使用）
    sft_json_path = os.path.join(output_dir, "finance_sft_train.json")
    with open(sft_json_path, "w", encoding="utf-8") as f:
        json.dump(finance_sft_data, f, ensure_ascii=False, indent=2)
    print(f"已保存金融SFT JSON文件: {sft_json_path}")

    # 保存为CSV（便于数据分析）
    sft_df = pd.DataFrame(finance_sft_data)
    sft_csv_path = os.path.join(output_dir, "finance_sft_train.csv")
    sft_df.to_csv(sft_csv_path, index=False, encoding="utf-8-sig")  # 使用utf-8-sig避免中文乱码
    print(f"已保存金融SFT CSV文件: {sft_csv_path}")

if __name__ == "__main__":
    main()

## 2. 加载JSON数据集
读取多模态RAG图文问答挑战赛训练集.json文件，解析为Python对象。

## 4. 构建QA对列表
遍历数据集，将每条数据的question和answer字段提取出来，构建QA对列表。指令字段统一设置为“你是一名专业的财报数据问答助手”。

## 5. 保存处理后的QA数据
将处理后的QA对列表保存为新的JSON或CSV文件，便于后续模型训练或分析。