# RAG 的分层索引

在这个笔记本中，我实现了一种用于 RAG 系统的分层索引方法。这种技术通过使用两层搜索方法来改进检索：首先通过摘要识别相关的文档部分，然后从这些部分检索具体细节。

传统的 RAG 方法平等对待所有文本块，这可能导致：

- 当块太小时丢失上下文
- 当文档集合很大时产生不相关的结果
- 在整个语料库中进行低效搜索

分层检索通过以下方式解决这些问题：

- 为较大的文档部分创建简洁的摘要
- 首先搜索这些摘要以识别相关部分
- 然后仅从这些部分检索详细信息
- 在保持具体细节的同时维护上下文

## 设置环境
我们首先导入必要的库。

In [1]:
import os
import numpy as np
import json
import fitz
from openai import OpenAI
import re
import pickle

## 设置 OpenAI API 客户端
我们初始化 OpenAI 客户端来生成嵌入向量和响应。

In [None]:
# 使用基础 URL 和 API 密钥初始化 OpenAI 客户端
client = OpenAI(
    base_url="https://api.studio.nebius.com/v1/",
    api_key=os.getenv("OPENAI_API_KEY")  # 从环境变量中获取 API 密钥
)

## 文档处理函数

In [3]:
def extract_text_from_pdf(pdf_path):
    """
    从 PDF 文件中提取文本内容，并按页面分离。
    
    Args:
        pdf_path (str): PDF 文件路径
        
    Returns:
        List[Dict]: 包含文本内容和元数据的页面列表
    """
    print(f"正在从 {pdf_path} 提取文本...")  # 打印正在处理的 PDF 路径
    pdf = fitz.open(pdf_path)  # 使用 PyMuPDF 打开 PDF 文件
    pages = []  # 初始化空列表来存储包含文本内容的页面
    
    # 遍历 PDF 中的每一页
    for page_num in range(len(pdf)):
        page = pdf[page_num]  # 获取当前页面
        text = page.get_text()  # 从当前页面提取文本
        
        # 跳过文本很少的页面（少于 50 个字符）
        if len(text.strip()) > 50:
            # 将页面文本和元数据追加到列表中
            pages.append({
                "text": text,
                "metadata": {
                    "source": pdf_path,  # 源文件路径
                    "page": page_num + 1  # 页码（从 1 开始的索引）
                }
            })
    
    print(f"提取了 {len(pages)} 页内容")  # 打印提取的页面数量
    return pages  # 返回包含文本内容和元数据的页面列表

In [4]:
def chunk_text(text, metadata, chunk_size=1000, overlap=200):
    """
    将文本分割为重叠的块，同时保留元数据。
    
    Args:
        text (str): 要分块的输入文本
        metadata (Dict): 要保留的元数据
        chunk_size (int): 每个块的字符大小
        overlap (int): 块之间的重叠字符数
        
    Returns:
        List[Dict]: 包含元数据的文本块列表
    """
    chunks = []  # 初始化空列表来存储块
    
    # 以指定的块大小和重叠遍历文本
    for i in range(0, len(text), chunk_size - overlap):
        chunk_text = text[i:i + chunk_size]  # 提取文本块
        
        # 跳过非常小的块（少于 50 个字符）
        if chunk_text and len(chunk_text.strip()) > 50:
            # 创建元数据副本并添加块特定信息
            chunk_metadata = metadata.copy()
            chunk_metadata.update({
                "chunk_index": len(chunks),  # 块的索引
                "start_char": i,  # 块的起始字符索引
                "end_char": i + len(chunk_text),  # 块的结束字符索引
                "is_summary": False  # 标志表示这不是摘要
            })
            
            # 将块及其元数据追加到列表中
            chunks.append({
                "text": chunk_text,
                "metadata": chunk_metadata
            })
    
    return chunks  # 返回包含元数据的块列表

## 简单向量存储实现

In [5]:
class SimpleVectorStore:
    """
    使用 NumPy 的简单向量存储实现。
    """
    def __init__(self):
        self.vectors = []  # 存储向量嵌入的列表
        self.texts = []  # 存储文本内容的列表
        self.metadata = []  # 存储元数据的列表
    
    def add_item(self, text, embedding, metadata=None):
        """
        向向量存储中添加项目。
        
        Args:
            text (str): 文本内容
            embedding (List[float]): 向量嵌入
            metadata (Dict, optional): 附加元数据
        """
        self.vectors.append(np.array(embedding))  # 将嵌入作为 numpy 数组追加
        self.texts.append(text)  # 追加文本内容
        self.metadata.append(metadata or {})  # 追加元数据或空字典（如果为 None）
    
    def similarity_search(self, query_embedding, k=5, filter_func=None):
        """
        查找与查询嵌入最相似的项目。
        
        Args:
            query_embedding (List[float]): 查询嵌入向量
            k (int): 要返回的结果数量
            filter_func (callable, optional): 过滤结果的函数
            
        Returns:
            List[Dict]: 前 k 个最相似的项目
        """
        if not self.vectors:
            return []  # 如果没有向量，返回空列表
        
        # 将查询嵌入转换为 numpy 数组
        query_vector = np.array(query_embedding)
        
        # 使用余弦相似度计算相似性
        similarities = []
        for i, vector in enumerate(self.vectors):
            # 如果不通过过滤器则跳过
            if filter_func and not filter_func(self.metadata[i]):
                continue
                
            # 计算余弦相似度
            similarity = np.dot(query_vector, vector) / (np.linalg.norm(query_vector) * np.linalg.norm(vector))
            similarities.append((i, similarity))  # 追加索引和相似度分数
        
        # 按相似度排序（降序）
        similarities.sort(key=lambda x: x[1], reverse=True)
        
        # 返回前 k 个结果
        results = []
        for i in range(min(k, len(similarities))):
            idx, score = similarities[i]
            results.append({
                "text": self.texts[idx],  # 添加文本内容
                "metadata": self.metadata[idx],  # 添加元数据
                "similarity": float(score)  # 添加相似度分数
            })
        
        return results  # 返回前 k 个结果列表

## 创建嵌入向量

In [6]:
def create_embeddings(texts, model="BAAI/bge-en-icl"):
    """
    为给定的文本创建嵌入向量。
    
    Args:
        texts (List[str]): 输入文本
        model (str): 嵌入模型名称
        
    Returns:
        List[List[float]]: 嵌入向量
    """
    # 处理空输入
    if not texts:
        return []
        
    # 如果需要，分批处理（OpenAI API 限制）
    batch_size = 100
    all_embeddings = []
    
    # 分批遍历输入文本
    for i in range(0, len(texts), batch_size):
        batch = texts[i:i + batch_size]  # 获取当前批次的文本
        
        # 为当前批次创建嵌入向量
        response = client.embeddings.create(
            model=model,
            input=batch
        )
        
        # 从响应中提取嵌入向量
        batch_embeddings = [item.embedding for item in response.data]
        all_embeddings.extend(batch_embeddings)  # 将批次嵌入向量添加到列表中
    
    return all_embeddings  # 返回所有嵌入向量

## 摘要生成函数

In [7]:
def generate_page_summary(page_text):
    """
    生成页面的简洁摘要。
    
    Args:
        page_text (str): 页面的文本内容
        
    Returns:
        str: 生成的摘要
    """
    # 定义指导摘要模型的系统提示
    system_prompt = """您是一个专业的摘要系统。
    为提供的文本创建详细摘要。
    专注于捕获主要主题、关键信息和重要事实。
    您的摘要应该足够全面以理解页面包含的内容，
    但比原文更简洁。"""

    # 如果输入文本超过最大令牌限制，则截断
    max_tokens = 6000
    truncated_text = page_text[:max_tokens] if len(page_text) > max_tokens else page_text

    # 向 OpenAI API 发出请求以生成摘要
    response = client.chat.completions.create(
        model="meta-llama/Llama-3.2-3B-Instruct",  # 指定要使用的模型
        messages=[
            {"role": "system", "content": system_prompt},  # 指导助手的系统消息
            {"role": "user", "content": f"请总结这段文本：\n\n{truncated_text}"}  # 包含要总结文本的用户消息
        ],
        temperature=0.3  # 设置响应生成的温度
    )
    
    # 返回生成的摘要内容
    return response.choices[0].message.content

## 分层文档处理

In [8]:
def process_document_hierarchically(pdf_path, chunk_size=1000, chunk_overlap=200):
    """
    将文档处理为分层索引。
    
    Args:
        pdf_path (str): PDF 文件路径
        chunk_size (int): 每个详细块的大小
        chunk_overlap (int): 块之间的重叠
        
    Returns:
        Tuple[SimpleVectorStore, SimpleVectorStore]: 摘要和详细向量存储
    """
    # 从 PDF 中提取页面
    pages = extract_text_from_pdf(pdf_path)
    
    # 为每个页面创建摘要
    print("正在生成页面摘要...")
    summaries = []
    for i, page in enumerate(pages):
        print(f"正在总结第 {i+1}/{len(pages)} 页...")
        summary_text = generate_page_summary(page["text"])
        
        # 创建摘要元数据
        summary_metadata = page["metadata"].copy()
        summary_metadata.update({"is_summary": True})
        
        # 将摘要文本和元数据追加到摘要列表中
        summaries.append({
            "text": summary_text,
            "metadata": summary_metadata
        })
    
    # 为每个页面创建详细块
    detailed_chunks = []
    for page in pages:
        # 对页面文本进行分块
        page_chunks = chunk_text(
            page["text"], 
            page["metadata"], 
            chunk_size, 
            chunk_overlap
        )
        # 用当前页面的块扩展 detailed_chunks 列表
        detailed_chunks.extend(page_chunks)
    
    print(f"创建了 {len(detailed_chunks)} 个详细块")
    
    # 为摘要创建嵌入向量
    print("正在为摘要创建嵌入向量...")
    summary_texts = [summary["text"] for summary in summaries]
    summary_embeddings = create_embeddings(summary_texts)
    
    # 为详细块创建嵌入向量
    print("正在为详细块创建嵌入向量...")
    chunk_texts = [chunk["text"] for chunk in detailed_chunks]
    chunk_embeddings = create_embeddings(chunk_texts)
    
    # 创建向量存储
    summary_store = SimpleVectorStore()
    detailed_store = SimpleVectorStore()
    
    # 将摘要添加到摘要存储
    for i, summary in enumerate(summaries):
        summary_store.add_item(
            text=summary["text"],
            embedding=summary_embeddings[i],
            metadata=summary["metadata"]
        )
    
    # 将块添加到详细存储
    for i, chunk in enumerate(detailed_chunks):
        detailed_store.add_item(
            text=chunk["text"],
            embedding=chunk_embeddings[i],
            metadata=chunk["metadata"]
        )
    
    print(f"创建了包含 {len(summaries)} 个摘要和 {len(detailed_chunks)} 个块的向量存储")
    return summary_store, detailed_store

## 分层检索

In [9]:
def retrieve_hierarchically(query, summary_store, detailed_store, k_summaries=3, k_chunks=5):
    """
    使用分层索引检索信息。
    
    Args:
        query (str): 用户查询
        summary_store (SimpleVectorStore): 文档摘要存储
        detailed_store (SimpleVectorStore): 详细块存储
        k_summaries (int): 要检索的摘要数量
        k_chunks (int): 每个摘要要检索的块数量
        
    Returns:
        List[Dict]: 带有相关性分数的检索块
    """
    print(f"正在对查询进行分层检索：{query}")
    
    # 创建查询嵌入向量
    query_embedding = create_embeddings(query)
    
    # 首先，检索相关摘要
    summary_results = summary_store.similarity_search(
        query_embedding, 
        k=k_summaries
    )
    
    print(f"检索到 {len(summary_results)} 个相关摘要")
    
    # 从相关摘要中收集页面
    relevant_pages = [result["metadata"]["page"] for result in summary_results]
    
    # 创建过滤函数，只保留来自相关页面的块
    def page_filter(metadata):
        return metadata["page"] in relevant_pages
    
    # 然后，仅从这些相关页面检索详细块
    detailed_results = detailed_store.similarity_search(
        query_embedding, 
        k=k_chunks * len(relevant_pages),
        filter_func=page_filter
    )
    
    print(f"从相关页面检索到 {len(detailed_results)} 个详细块")
    
    # 为每个结果添加它来自哪个摘要/页面
    for result in detailed_results:
        page = result["metadata"]["page"]
        matching_summaries = [s for s in summary_results if s["metadata"]["page"] == page]
        if matching_summaries:
            result["summary"] = matching_summaries[0]["text"]
    
    return detailed_results

## 响应生成

In [10]:
def generate_response(query, retrieved_chunks):
    """
    基于检索到的块生成响应。
    
    Args:
        query (str): 用户查询
        retrieved_chunks (List[Dict]): 检索到的文本块
        
    Returns:
        str: 生成的响应
    """
    # 构建上下文
    context_parts = []
    for chunk in retrieved_chunks:
        # 包含页面信息和摘要（如果可用）
        page_info = f"页面 {chunk['metadata']['page']}"
        if 'summary' in chunk:
            context_parts.append(f"[{page_info} - 摘要: {chunk['summary'][:200]}...]")
        context_parts.append(f"[{page_info}] {chunk['text']}")
    
    context = "\n\n".join(context_parts)
    
    # 限制上下文长度
    max_context = 12000
    if len(context) > max_context:
        context = context[:max_context] + "... [已截断]"
    
    # 生成响应
    system_message = """您是一个有用的 AI 助手。基于提供的上下文回答用户的问题。
上下文包含来自不同页面的信息，每个部分都标有页面号。
在您的答案中引用特定页面，并确保您的响应准确且有帮助。"""

    response = client.chat.completions.create(
        model="meta-llama/Llama-3.2-3B-Instruct",
        messages=[
            {"role": "system", "content": system_message},
            {"role": "user", "content": f"上下文：\n{context}\n\n问题：{query}"}
        ],
        temperature=0.2
    )
    
    return response.choices[0].message.content

## 完整的分层检索 RAG 流水线

In [11]:
def hierarchical_rag(query, pdf_path, chunk_size=1000, chunk_overlap=200, 
                    k_summaries=3, k_chunks=5, regenerate=False):
    """
    完整的分层 RAG 流水线。
    
    Args:
        query (str): 用户查询
        pdf_path (str): PDF 文档路径
        chunk_size (int): 每个详细块的大小
        chunk_overlap (int): 块之间的重叠
        k_summaries (int): 要检索的摘要数量
        k_chunks (int): 每个摘要要检索的块数量
        regenerate (bool): 是否重新生成向量存储
        
    Returns:
        Dict: 包括响应和检索块的结果
    """
    # 为缓存创建存储文件名
    summary_store_file = f"{os.path.basename(pdf_path)}_summary_store.pkl"
    detailed_store_file = f"{os.path.basename(pdf_path)}_detailed_store.pkl"
    
    # 如果需要，处理文档并创建存储
    if regenerate or not os.path.exists(summary_store_file) or not os.path.exists(detailed_store_file):
        print("正在处理文档并创建向量存储...")
        # 处理文档以创建分层索引和向量存储
        summary_store, detailed_store = process_document_hierarchically(
            pdf_path, chunk_size, chunk_overlap
        )
        
        # 将摘要存储保存到文件以供将来使用
        with open(summary_store_file, 'wb') as f:
            pickle.dump(summary_store, f)
        
        # 将详细存储保存到文件以供将来使用
        with open(detailed_store_file, 'wb') as f:
            pickle.dump(detailed_store, f)
    else:
        # 从文件加载现有的摘要存储
        print("正在加载现有的向量存储...")
        with open(summary_store_file, 'rb') as f:
            summary_store = pickle.load(f)
        
        # 从文件加载现有的详细存储
        with open(detailed_store_file, 'rb') as f:
            detailed_store = pickle.load(f)
    
    # 使用查询分层检索相关块
    retrieved_chunks = retrieve_hierarchically(
        query, summary_store, detailed_store, k_summaries, k_chunks
    )
    
    # 基于检索到的块生成响应
    response = generate_response(query, retrieved_chunks)
    
    # 返回包括查询、响应、检索块以及摘要和详细块计数的结果
    return {
        "query": query,
        "response": response,
        "retrieved_chunks": retrieved_chunks,
        "summary_count": len(summary_store.texts),
        "detailed_count": len(detailed_store.texts)
    }

## 用于比较的标准（非分层）RAG

In [12]:
def standard_rag(query, pdf_path, chunk_size=1000, chunk_overlap=200, k=15):
    """
    不使用分层检索的标准 RAG 流水线。
    
    Args:
        query (str): 用户查询
        pdf_path (str): PDF 文档路径
        chunk_size (int): 每个块的大小
        chunk_overlap (int): 块之间的重叠
        k (int): 要检索的块数量
        
    Returns:
        Dict: 包括响应和检索块的结果
    """
    # 从 PDF 文档中提取页面
    pages = extract_text_from_pdf(pdf_path)
    
    # 直接从所有页面创建块
    chunks = []
    for page in pages:
        # 对页面文本进行分块
        page_chunks = chunk_text(
            page["text"], 
            page["metadata"], 
            chunk_size, 
            chunk_overlap
        )
        # 用当前页面的块扩展块列表
        chunks.extend(page_chunks)
    
    print(f"为标准 RAG 创建了 {len(chunks)} 个块")
    
    # 创建向量存储来保存块
    store = SimpleVectorStore()
    
    # 为块创建嵌入向量
    print("正在为块创建嵌入向量...")
    texts = [chunk["text"] for chunk in chunks]
    embeddings = create_embeddings(texts)
    
    # 将块添加到向量存储
    for i, chunk in enumerate(chunks):
        store.add_item(
            text=chunk["text"],
            embedding=embeddings[i],
            metadata=chunk["metadata"]
        )
    
    # 为查询创建嵌入向量
    query_embedding = create_embeddings(query)
    
    # 基于查询嵌入向量检索最相关的块
    retrieved_chunks = store.similarity_search(query_embedding, k=k)
    print(f"使用标准 RAG 检索了 {len(retrieved_chunks)} 个块")
    
    # 基于检索到的块生成响应
    response = generate_response(query, retrieved_chunks)
    
    # 返回包括查询、响应和检索块的结果
    return {
        "query": query,
        "response": response,
        "retrieved_chunks": retrieved_chunks
    }

## 评估函数

In [13]:
def compare_approaches(query, pdf_path, reference_answer=None):
    """
    比较分层和标准 RAG 方法。
    
    Args:
        query (str): 用户查询
        pdf_path (str): PDF 文档路径
        reference_answer (str, optional): 用于评估的参考答案
        
    Returns:
        Dict: 比较结果
    """
    print(f"\n=== 比较查询的 RAG 方法：{query} ===")
    
    # 运行分层 RAG
    print("\n运行分层 RAG...")
    hierarchical_result = hierarchical_rag(query, pdf_path)
    hier_response = hierarchical_result["response"]
    
    # 运行标准 RAG
    print("\n运行标准 RAG...")
    standard_result = standard_rag(query, pdf_path)
    std_response = standard_result["response"]
    
    # 比较分层和标准 RAG 的结果
    comparison = compare_responses(query, hier_response, std_response, reference_answer)
    
    # 返回包含比较结果的字典
    return {
        "query": query,  # 原始查询
        "hierarchical_response": hier_response,  # 分层 RAG 的响应
        "standard_response": std_response,  # 标准 RAG 的响应
        "reference_answer": reference_answer,  # 用于评估的参考答案
        "comparison": comparison,  # 比较分析
        "hierarchical_chunks_count": len(hierarchical_result["retrieved_chunks"]),  # 分层 RAG 检索的块数量
        "standard_chunks_count": len(standard_result["retrieved_chunks"])  # 标准 RAG 检索的块数量
    }

In [14]:
def compare_responses(query, hierarchical_response, standard_response, reference=None):
    """
    比较分层和标准 RAG 的响应。
    
    Args:
        query (str): 用户查询
        hierarchical_response (str): 分层 RAG 的响应
        standard_response (str): 标准 RAG 的响应
        reference (str, optional): 参考答案
        
    Returns:
        str: 比较分析
    """
    # 定义指导模型如何评估响应的系统提示
    system_prompt = """您是信息检索系统的专家评估员。
比较对同一查询的两个响应，一个使用分层检索生成，
另一个使用标准检索生成。

基于以下方面评估它们：
1. 准确性：哪个响应提供了更多事实正确的信息？
2. 全面性：哪个响应更好地涵盖了查询的所有方面？
3. 连贯性：哪个响应具有更好的逻辑流程和组织？
4. 页面引用：任一响应是否更好地使用了页面引用？

在分析每种方法的优缺点时要具体。"""

    # 创建包含查询和两个响应的用户提示
    user_prompt = f"""查询：{query}

分层 RAG 的响应：
{hierarchical_response}

标准 RAG 的响应：
{standard_response}"""

    # 如果提供了参考答案，将其包含在用户提示中
    if reference:
        user_prompt += f"""

参考答案：
{reference}"""

    # 向用户提示添加最终指令
    user_prompt += """

请提供这两个响应的详细比较，突出哪种方法表现更好以及原因。"""

    # 向 OpenAI API 发出请求以生成比较分析
    response = client.chat.completions.create(
        model="meta-llama/Llama-3.2-3B-Instruct",
        messages=[
            {"role": "system", "content": system_prompt},  # 指导助手的系统消息
            {"role": "user", "content": user_prompt}  # 包含查询和响应的用户消息
        ],
        temperature=0  # 设置响应生成的温度
    )
    
    # 返回生成的比较分析
    return response.choices[0].message.content

## 示例使用和评估

以下代码演示了如何使用分层 RAG 系统并与标准 RAG 进行比较。

In [15]:
# 示例使用
if __name__ == "__main__":
    # PDF 文档路径
    pdf_path = "data/AI_Information.pdf"
    
    # 测试查询
    query = "Transformer 模型在自然语言处理中的关键应用有哪些？"
    
    # 运行分层 RAG
    result = hierarchical_rag(query, pdf_path)
    
    print(f"查询：{result['query']}")
    print(f"响应：{result['response']}")
    print(f"检索的块数量：{len(result['retrieved_chunks'])}")
    print(f"摘要数量：{result['summary_count']}")
    print(f"详细块数量：{result['detailed_count']}")

## 总结

分层索引 RAG 通过以下方式改进了传统的检索方法：

1. **两层检索策略**：首先在摘要级别进行粗粒度搜索，然后在详细块级别进行细粒度搜索
2. **上下文保持**：通过页面级摘要保持更大的上下文，同时仍能访问具体细节
3. **效率提升**：通过首先过滤相关页面来减少搜索空间
4. **更好的相关性**：通过分层过滤提高检索内容的相关性

这种方法特别适用于：
- 大型文档集合
- 需要上下文感知的查询
- 多主题文档
- 需要在概述和细节之间平衡的应用

分层索引代表了 RAG 系统设计中的重要进步，为更智能、更高效的信息检索提供了框架。