## 数据预处理

In [None]:
# https://www.kaggle.com/competitions/icaif-24-finance-rag-challenge/data
def translate_FinanceRAG(from_dir, to_dir, llm_config):
    import pandas as pd
    import os
    from openai import OpenAI
    from functools import partial
    import sys; sys.path.append("..")

    client = OpenAI(base_url=llm_config['base_url'],api_key=llm_config["api_key"])
    gen_resp = partial(client.chat.completions.create,model=llm_config['model'],temperature=0.1, top_p=1, max_tokens=1000,)

    ## translate_text：翻译指定文本
    def translate_text(text):
        response = gen_resp(messages=[dict(role="system",content="将文字翻译成中文，直接输出翻译结果"),dict(role="user",content=text)])
        return response.choices[0].message.content  

    os.makedirs(to_dir,exist_ok=True)
    query_df = pd.read_json(os.path.join(from_dir,"queries.jsonl"),lines=True)
    query_df["text_zh"] = query_df["text"].apply(translate_text)
    query_df.to_json(os.path.join(to_dir, "queries.json"), orient="records", indent=4, force_ascii=False)
    corpus_df = pd.read_json(os.path.join(from_dir,"corpus.jsonl"),lines=True)
    corpus_df["text_zh"] = corpus_df["text"].apply(translate_text)
    corpus_df.to_json(os.path.join(to_dir, "corpus.json"), orient="records", indent=4, force_ascii=False)
    

In [None]:
translate_FinanceRAG(
    "resources/data/_raw/FinanceBench",
    "resources/data/ablation_rag/financebench_zh/0_origin",
    dict(model="judger", base_url="http://localhost:12235/v1",api_key="empty")
)

## 消融实验

In [None]:
## 文档主题分割
async def split_corpus(work_dirpath, spliter_config):
    import pandas as pd
    import os
    import sys; sys.path.append("..")
    from utils.spliters import init_spliter
    import json

    # 加载原始数据
    corpus_df = pd.read_json(os.path.join(work_dirpath, "corpus.json"))

    # 初始化分词器
    spliter = init_spliter(**spliter_config)

    chunk_list = []

    for idx, row in corpus_df.iterrows():
        sentence_df = spliter.split_text_to_sentences(row['text_zh'])
        sentence_df = spliter.add_buffered_sentences(sentence_df)
        chunk_df = spliter.cluster(sentence_df)

        for _, crow in chunk_df.iterrows():
            chunk_text = crow['chunk']
            chunk_list.append({
                "doc_id": row["_id"],
                "doc_text": row['text_zh'],
                "chunk_text": chunk_text
            })

    # 保存 chunk.json
    with open(os.path.join(work_dirpath, "chunk.json"), "w", encoding="utf-8") as f:
        json.dump(chunk_list, f, ensure_ascii=False, indent=4)


    print(f"切分完成，共生成 {len(chunk_list)} 个 chunk。")

## 片段摘要生成
async def summarize_chunks(work_dirpath, llm_config=None):
    import json
    import os
    import asyncio
    from openai import AsyncOpenAI
    from minirag.utils import compute_mdhash_id

    # 初始化 OpenAI 客户端
    client = AsyncOpenAI(base_url=llm_config['base_url'], api_key=llm_config["api_key"]) if llm_config else None
    semaphore = asyncio.Semaphore(4)

    async def summarize_text(text):
        async with semaphore:
            try:
                response = await client.chat.completions.create(
                    model=llm_config['model'],
                    messages=[
                        {"role": "system", "content": "为以下内容生成摘要，直接输出结果"},
                        {"role": "user", "content": text}
                    ],
                    temperature=0.1,
                    top_p=1,
                    max_tokens=1000
                )
                return response.choices[0].message.content
            except Exception as e:
                print(f"摘要失败: {e}")
                return text[:50]

    # 读取 chunk.json
    with open(os.path.join(work_dirpath, "chunk.json"), "r", encoding="utf-8") as f:
        chunk_data = json.load(f)

    tasks = []
    metadata = []

    for item in chunk_data:
        chunk_text = item["chunk_text"]
        if len(chunk_text) < 50 or client is None:
            future = asyncio.Future()
            future.set_result(chunk_text)
            tasks.append(future)
        else:
            tasks.append(summarize_text(chunk_text))
        metadata.append(item)

    summaries = await asyncio.gather(*tasks)

    chunk_sum_list = []
    for meta, summary in zip(metadata, summaries):
        chunk_sum_list.append({
            "doc_id": meta["doc_id"],
            "chunk_id": compute_mdhash_id(summary.strip(), prefix="chunk-"),
            "doc_text": meta["doc_text"],
            "chunk_text": meta["chunk_text"],
            "chunk_sum_text": summary,
        })

    with open(os.path.join(work_dirpath, "chunk_sum.json"), "w", encoding="utf-8") as f:
        json.dump(chunk_sum_list, f, ensure_ascii=False, indent=4)

    print(f"摘要完成，共处理 {len(chunk_sum_list)} 个 chunk。")

## 构建图索引
async def build_rag_index(work_dirpath, embed_model_path, rag_llm_config):
    import os
    import shutil
    import pandas as pd
    import logging
    from functools import partial
    from transformers import AutoTokenizer, AutoModel
    from minirag import MiniRAG
    from minirag.utils import EmbeddingFunc
    from minirag.llm import openai_complete_if_cache, hf_embedding
    from minirag.prompt import PROMPTS

    import sys; sys.path.append("..")
    from utils.rag import prompts as custom_prompts

    ## 更新 Prompt 词库
    PROMPTS.update(custom_prompts)

    ## 日志设置
    logging.basicConfig(format="%(levelname)s: %(message)s", level=logging.INFO)


    ## 准备路径和目录
    rag_data_dir = os.path.join(work_dirpath, "rag_data")
    os.makedirs(rag_data_dir,exist_ok=True)

    ## 加载嵌入模型
    embed_tokenizer = AutoTokenizer.from_pretrained(embed_model_path, model_max_length=512)
    embed_model = AutoModel.from_pretrained(embed_model_path)

    ## 初始化 MiniRAG
    rag = MiniRAG(
        working_dir=rag_data_dir,
        llm_model_func=lambda prompt, **kwargs: openai_complete_if_cache(
            prompt=prompt, **rag_llm_config, **kwargs
        ),
        llm_model_name=rag_llm_config["model"],
        embedding_func=EmbeddingFunc(
            embedding_dim=embed_model.config.hidden_size,
            max_token_size=embed_model.config.max_position_embeddings,
            func=partial(hf_embedding, embed_model=embed_model, tokenizer=embed_tokenizer),
        )
    )

    ## 加载 chunk summary 文本
    chunk_sum_path = os.path.join(work_dirpath, "chunk_sum.json")
    chunk_sum_df = pd.read_json(chunk_sum_path)
    chunk_texts = chunk_sum_df["chunk_sum_text"].tolist()

    ## 多条异步插入
    batch_size=4
    for i in range(0, len(chunk_texts), batch_size):
        batch = chunk_texts[i:i+batch_size]
        await rag.ainsert(batch)

    print(f"✅ 异步 RAG 索引构建完成，共插入 {len(chunk_texts)} 个 chunk。")
    return rag

async def do_rag(work_dirpath, top_k, embed_model_path, rag_llm_config):
    import os
    import shutil
    import pandas as pd
    import logging
    import time
    from functools import partial
    from transformers import AutoTokenizer, AutoModel
    from minirag import MiniRAG
    from minirag.utils import EmbeddingFunc
    from minirag.llm import openai_complete_if_cache, hf_embedding
    from minirag.prompt import PROMPTS

    import sys; sys.path.append("..")
    from utils.rag import prompts as custom_prompts
    from utils.rag import get_rag_answer

    ## 更新 Prompt 词库
    PROMPTS.update(custom_prompts)

    ## 日志设置
    logging.basicConfig(format="%(levelname)s: %(message)s", level=logging.WARNING)

    ## 准备路径和目录
    rag_data_dir = os.path.join(work_dirpath, "rag_data")
    os.makedirs(rag_data_dir,exist_ok=True)

    ## 加载嵌入模型
    embed_tokenizer = AutoTokenizer.from_pretrained(embed_model_path, model_max_length=512)
    embed_model = AutoModel.from_pretrained(embed_model_path)

    ## 初始化 MiniRAG
    rag = MiniRAG(
        working_dir=rag_data_dir,
        llm_model_func=lambda prompt, **kwargs: openai_complete_if_cache(
            prompt=prompt, **rag_llm_config, **kwargs
        ),
        llm_model_name=rag_llm_config["model"],
        embedding_func=EmbeddingFunc(
            embedding_dim=embed_model.config.hidden_size,
            max_token_size=embed_model.config.max_position_embeddings,
            func=partial(hf_embedding, embed_model=embed_model, tokenizer=embed_tokenizer),
        )
    )

    ## 构建 chunk mapper
    chunk_sum_path = os.path.join(work_dirpath, "chunk_sum.json")
    chunk_sum_df = pd.read_json(chunk_sum_path)
    chunk_mapper = dict(zip(chunk_sum_df['chunk_id'], chunk_sum_df['chunk_sum_text']))

    ## 读取query列表
    query_path = os.path.join(work_dirpath, "queries.json")
    query_df = pd.read_json(query_path)

    answer_list = []
    start_time = time.time()
    for idx, row in query_df.iterrows():
        retrivals, answer = await get_rag_answer(rag, row['text_zh'],chunk_mapper, top_k)
        answer_list.append(dict(input=row['text_zh'],retrievals=retrivals,output=answer))
    answer_df = pd.DataFrame(answer_list)
    answer_df.to_json(os.path.join(work_dirpath,"answer.json"),orient="records",index=False, indent=4, force_ascii=False)
    elapsed = time.time() - start_time
    avg_time = elapsed / len(query_df) if len(query_df) > 0 else 0
    answer_stats = dict(query_num=len(query_df),tot_time=elapsed,avg_time=avg_time)
    pd.DataFrame([answer_stats]).to_csv(os.path.join(work_dirpath,"answer_stats.csv"), index=False, encoding="utf-8")
    print(f"✅ RAG执行完成，共回复 {len(query_df)} 个 query，耗时 {elapsed:.2f} 秒，平均响应时长为 {avg_time:.2f} 秒")

    return None

In [None]:
import itertools
import gc
import os
import shutil
import logging
logging.getLogger("httpx").setLevel(logging.WARNING)

stage_funcs = [split_corpus,summarize_chunks,build_rag_index,do_rag]
embed_model_path = "resources/open_models/bge-large-zh-v1.5"

# 各阶段方法配置项
split_methods = {
    "seq": (dict(method="doc_seq_model_spliter", model_path="resources/open_models/nlp_bert_document-segmentation_chinese-base")),
    "sim": (dict(method="cos_sim_spliter", model_path="resources/open_models/bge-large-zh-v1.5"))
}

sum_methods = {
    "q25_b3_sft_sum": (dict(model="lora", base_url="http://localhost:12239/v1", api_key="empty", max_tokens=1000)),
    "q25_b3_base_sum": (dict(model="base", base_url="http://localhost:12239/v1", api_key="empty", max_tokens=1000)),
    "q25_b14_sum": (dict(model="judger", base_url="http://localhost:12235/v1", api_key="empty", max_tokens=1000)),
}

index_methods = {
    "q25_b3_base_index": (
        embed_model_path,
        dict(model="base", base_url="http://localhost:12239/v1", api_key="empty", max_tokens=1000)
    ),
}

r_methods = {
    "ours": ({"entity": 1, "chunk": 5, "final": 5},),
    "naive": ({"entity": 0, "chunk": 5, "final": 5},),
    "lightrag" : ({"entity": 5, "chunk": 5, "final": 5},)
}

g_methods = {
    "q25_b3_base_gen": (
        embed_model_path,
        dict(model="base", base_url="http://localhost:12239/v1", api_key="empty", max_tokens=1000)
    ),
    "q25_b14_gen":  (
        embed_model_path,
        dict(model="judger", base_url="http://localhost:12235/v1", api_key="empty", max_tokens=1000)
    ),
}

rag_methods = {
    f"{r_tag}@{g_tag}": r_conf + g_conf
    for r_tag, r_conf in r_methods.items() 
    for g_tag, g_conf in g_methods.items()
}

core_group = ("seq", "q25_b3_sft_sum","q25_b3_base_index", "ours","q25_b3_base_gen")

for method_ls in itertools.product(
        split_methods.items(), sum_methods.items(), index_methods.items(), rag_methods.items()
    ):
    current_group_name = "@".join([m[0] for m in method_ls])
    current_group = tuple(current_group_name.split("@"))
    core_diff = sum(a != b for a, b in zip(current_group, core_group))
    if core_diff > 1: continue  
    
    print(f"\nRunning: {current_group}")
    for i in range(len(method_ls)):
        print(f"stage{i+1}")
        base_dirpath = "resources/data/ablation_rag/financebench_zh"
        input_dirname = "@".join([m[0] for m in method_ls[:i]]) if i > 0 else "0_source"
        output_dirname = "@".join([m[0] for m in method_ls[:i+1]])
        input_dirpath = os.path.join(base_dirpath, input_dirname)
        output_dirpath = os.path.join(base_dirpath, output_dirname)
        if os.path.exists(output_dirpath):
            print(f"已存在{output_dirpath}，跳过")
            continue
        else:
            print(f"复制上阶段数据到{output_dirpath}")
            shutil.copytree(input_dirpath, output_dirpath,dirs_exist_ok=True)
        print(method_ls[i])
        await stage_funcs[i](output_dirpath,*method_ls[i][1])
        gc.collect()


Running: ('seq', 'q25_b3_sft_sum', 'q25_b3_base_index', 'ours', 'q25_b3_base_gen')
stage1
已存在resources/data/ablation_rag/financebench_zh/seq，跳过
stage2
已存在resources/data/ablation_rag/financebench_zh/seq@q25_b3_sft_sum，跳过
stage3
已存在resources/data/ablation_rag/financebench_zh/seq@q25_b3_sft_sum@q25_b3_base_index，跳过
stage4
已存在resources/data/ablation_rag/financebench_zh/seq@q25_b3_sft_sum@q25_b3_base_index@ours@q25_b3_base_gen，跳过

Running: ('seq', 'q25_b3_sft_sum', 'q25_b3_base_index', 'ours', 'q25_b14_gen')
stage1
已存在resources/data/ablation_rag/financebench_zh/seq，跳过
stage2
已存在resources/data/ablation_rag/financebench_zh/seq@q25_b3_sft_sum，跳过
stage3
已存在resources/data/ablation_rag/financebench_zh/seq@q25_b3_sft_sum@q25_b3_base_index，跳过
stage4
已存在resources/data/ablation_rag/financebench_zh/seq@q25_b3_sft_sum@q25_b3_base_index@ours@q25_b14_gen，跳过

Running: ('seq', 'q25_b3_sft_sum', 'q25_b3_base_index', 'naive', 'q25_b3_base_gen')
stage1
已存在resources/data/ablation_rag/financebench_zh/seq，跳过
st

## 评估答案质量

In [None]:
async def do_eval(work_dirpath, judge_llm_config):
    import sys; sys.path.append("..")
    from utils.eval import eval_rag_performance
    import pandas as pd
    import os
    answer_file_path  = os.path.join(work_dirpath,"answer.json")
    eval_input_df  = pd.read_json(answer_file_path)
    eval_result_df, eval_stats_sr = await eval_rag_performance(eval_input_df,judge_llm_config)
    eval_result_df.to_json(os.path.join(work_dirpath,"eval_result.json"),orient="records",index=False, indent=4, force_ascii=False)
    eval_stats_sr.to_json(os.path.join(work_dirpath, "eval_stats.json"), force_ascii=False, indent=4)
    eval_stats_sr.to_frame().T.to_csv(os.path.join(work_dirpath, "eval_stats.csv"), header=False, encoding="utf-8")

In [None]:
import os
import logging
logging.getLogger("httpx").setLevel(logging.WARNING)

base_dirpath = "resources/data/ablation_rag/financebench_zh"
eval_dirs = []
for d in os.listdir(base_dirpath):
    full_path = os.path.join(base_dirpath, d)
    if os.path.isdir(full_path) and d.count('@') == 4:
        eval_dirs.append(full_path)

for eval_dir in eval_dirs:
    if os.path.exists(os.path.join(eval_dir,"eval_result.json")): continue
    print("当前评估对象：", eval_dir)
    await do_eval(eval_dir, dict(model="judger", base_url="http://localhost:12235/v1", api_key="empty", max_tokens=1000))

当前评估对象： resources/data/ablation_rag/financebench_zh/seq@q25_b3_base_sum@q25_b3_base_index@naive@q25_b3_base_gen
Mean scores:
relevance_score       3.50
faithfulness_score    3.77
dtype: float64
当前评估对象： resources/data/ablation_rag/financebench_zh/seq@q25_b3_base_sum@q25_b3_base_index@ours@q25_b3_base_gen
Mean scores:
relevance_score       3.28
faithfulness_score    3.50
dtype: float64
当前评估对象： resources/data/ablation_rag/financebench_zh/seq@q25_b3_base_sum@q25_b3_base_index@lightrag@q25_b3_base_gen
Mean scores:
relevance_score       3.33
faithfulness_score    3.71
dtype: float64
当前评估对象： resources/data/ablation_rag/financebench_zh/seq@q25_b3_base_sum@q25_b3_base_index@naive@q25_b14_gen
Mean scores:
relevance_score       3.55
faithfulness_score    3.84
dtype: float64
当前评估对象： resources/data/ablation_rag/financebench_zh/seq@q25_b3_base_sum@q25_b3_base_index@ours@q25_b14_gen
Mean scores:
relevance_score       3.54
faithfulness_score    3.68
dtype: float64
当前评估对象： resources/data/ablation_rag/f