In [None]:
def reciprocal_rank_fusion(results: list[list], k=60):
    """Reciprocal_rank_fusion that takes multiple lists of ranked documents
    and an optional parameter k used in the RRF formula"""

    # Initialize a dictionary to hold fused scores for each unique document
    fused_scores = {}

    # Iterate through each list of ranked documents
    for docs in results:
        # Iterate through each document in the list, with its rank (position in the list)
        for rank, doc in enumerate(docs):
            # Convert the document to a string format to use as a key (assumes documents can be serialized to JSON)
            doc_str = doc.page_content
            # If the document is not yet in the fused_scores dictionary, add it with an initial score of 0
            if doc_str not in fused_scores:
                fused_scores[doc_str] = 0
            # Update the score of the document using the RRF formula: 1 / (rank + k)
            fused_scores[doc_str] += 1 / (rank + k)

    # Sort the documents based on their fused scores in descending order to get the final reranked results
    reranked_results = [
        (doc, score)
        for doc, score in sorted(fused_scores.items(), key=lambda x: x[1], reverse=True)
    ]

    # Return the reranked results as a list of tuples, each containing the document and its fused score
    return reranked_results

# 向量存储实现

这个单元格实现了SimpleVectorStore类，它是一个基于NumPy的轻量级向量存储系统，支持文档添加和相似性搜索功能。

In [None]:
import numpy as np


class SimpleVectorStore:
    """
    一个使用 NumPy 的轻量级向量存储实现。
    """

    def __init__(self, dimension=1536):
        """
        初始化向量存储。
        参数:
            dimension (int): 嵌入向量的维度
        """
        self.dimension = dimension
        self.vectors = []  # 存储向量数据
        self.documents = []  # 存储文档内容
        self.metadata_list = []  # 存储元数据

    def add_documents(self, documents, vectors=None, metadata_list=None):
        """
        向向量存储中添加文档。
        参数:
            documents (List[str]): 文档分块列表
            vectors (List[List[float]], 可选): 嵌入向量列表
            metadata_list (List[Dict], 可选): 元数据字典列表
        """
        if vectors is None:
            vectors = [None] * len(documents)
        if metadata_list is None:
            metadata_list = [{} for _ in range(len(documents))]
        for doc, vec, metadata in zip(documents, vectors, metadata_list):
            self.documents.append(doc)
            self.vectors.append(vec)
            self.metadata_list.append(metadata)

    def search(self, query_vector, top_k=5):
        """
        搜索最相似的文档。
        参数:
            query_vector (List[float]): 查询嵌入向量
            top_k (int): 返回的结果数量
        返回:
            List[Dict]: 包含文档、相似度分数和元数据的结果列表
        """
        if not self.vectors or not self.documents:
            return []
        # 将查询向量转换为 NumPy 数组
        query_array = np.array(query_vector)
        # 计算相似度
        similarities = []
        for index, vector in enumerate(self.vectors):
            if vector is not None:
                # 计算余弦相似度
                similarity = np.dot(query_array, vector) / (
                    np.linalg.norm(query_array) * np.linalg.norm(vector)
                )
                similarities.append((index, similarity))
        # 根据相似度排序（降序）
        similarities.sort(key=lambda x: x[1], reverse=True)
        # 获取 top-k 结果
        results = []
        for index, score in similarities[:top_k]:
            results.append(
                {
                    "document": self.documents[index],
                    "score": float(score),
                    "metadata": self.metadata_list[index],
                }
            )
        return results

# 文档处理函数

这个单元格包含了处理文档的函数，用于将PDF文档转换为文本块并生成相应的向量表示。

In [None]:
from typing import List, Dict, Tuple


def process_document(
    pdf_path: str, chunk_size: int = 800
) -> Tuple[List[str], SimpleVectorStore, Dict]:
    """
    处理文档以便与 RSE（检索增强生成）一起使用。

    参数:
        pdf_path(str): PDF 文档的路径
        chunk_size(int): 每个文本块的大小（字符数）

    返回:
        Tuple[List[str], SimpleVectorStore, Dict]: 包含文本块列表、向量存储实例和文档信息的元组
    """
    print("从文档中提取文本...")
    # 从 PDF 文件中提取文本内容
    document_text = extract_text_from_pdf(pdf_path)

    print("将文本分割为非重叠片段...")
    # 将提取的文本分割为非重叠的文本块
    text_chunks = chunk_text(document_text, chunk_size=chunk_size, overlap=0)
    print(f"共创建了 {len(text_chunks)} 个文本块")

    print("为文本块生成嵌入向量...")
    # 为每个文本块生成嵌入向量
    chunk_embeddings = create_embeddings(text_chunks)

    # 创建一个 SimpleVectorStore 实例用于存储向量数据
    vector_store = SimpleVectorStore()

    # 添加带有元数据的文档（包含文本块索引，用于后续重建）
    metadata_list = [
        {"chunk_index": index, "source": pdf_path} for index in range(len(text_chunks))
    ]
    vector_store.add_documents(text_chunks, chunk_embeddings, metadata_list)

    # 记录原始文档结构以供后续拼接使用
    document_info = {
        "chunks": text_chunks,
        "source": pdf_path,
    }

    return text_chunks, vector_store, document_info

# 文档切片价值计算

这个单元格实现了计算文档切片价值值的函数，结合相关性得分和位置信息来评估每个文本块的重要性。

In [None]:
def calculate_chunk_values(
    query: str, chunks: List[str], vector_store, irrelevant_chunk_penalty: float = 0.2
) -> List[float]:
    """
    计算每个文档切片的价值值（value），结合其相关性得分和位置信息。

    参数:
        query(str): 用户输入的查询文本
        chunks(List[str]): 文档被切分后的文本块列表
        vector_store: 向量数据库，包含文档块的向量表示
        irrelevant_chunk_penalty(float): 对无关文档块施加的惩罚值，默认是0.2

    返回:
        List[float]: 每个文档块对应的价值值列表（浮点数）
    """

    # 将用户查询转换成嵌入向量以进行语义匹配
    query_embedding = create_embeddings([query])[0]

    # 获取所有文本块数量并搜索相似度结果
    total_chunks = len(chunks)
    search_results = vector_store.search(query_embedding, top_k=total_chunks)

    # 构建 chunk_index 到相关性得分的映射字典
    relevance_scores = {
        result["metadata"]["chunk_index"]: result["score"] for result in search_results
    }

    # 根据相关性得分计算价值值，并应用不相关块的惩罚机制
    chunk_values = []
    for i in range(total_chunks):
        score = relevance_scores.get(i, 0.0)
        value = score - irrelevant_chunk_penalty
        chunk_values.append(value)

    return chunk_values

# 段落重建与格式化

这个单元格包含了重建文本段落和格式化上下文的函数，用于将最佳文本块重新组合成连贯的段落并为语言模型生成合适的输入格式。

In [None]:
def reconstruct_segments(
    document_chunks: List[str], best_segment_indices: List[Tuple[int, int]]
) -> List[Dict]:
    """
    根据最佳切片索引重建文本段落。

    参数:
        document_chunks(List[str]): 原始文档的所有文本块
        best_segment_indices(List[Tuple[int, int]]): 最佳段落的起始和结束索引列表

    返回:
        List[Dict]: 包含重建段落及其范围的字典列表
    """
    reconstructed_segments = []

    for start_idx, end_idx in best_segment_indices:
        segment_text = " ".join(document_chunks[start_idx:end_idx])
        reconstructed_segments.append(
            {
                "text": segment_text,
                "segment_range": (start_idx, end_idx),
            }
        )

    return reconstructed_segments


def format_segments_for_context(segments: List[Dict]) -> str:
    """
    将文本段落格式化为语言模型可用的上下文字符串。

    参数:
        segments(List[Dict]): 包含段落文本和索引范围的字典列表

    返回:
        str: 格式化的上下文文本
    """
    context_lines = []

    for index, segment in enumerate(segments):
        header = f"SEGMENT {index + 1} (Chunks {segment['segment_range'][0]}-{segment['segment_range'][1] - 1}):"
        context_lines.append(header)
        context_lines.append(segment["text"])
        context_lines.append("-" * 80)

    return "\n\n".join(context_lines)

# 完整的RAG流程实现

这个单元格实现了完整的RAG（检索增强生成）流程，使用相关段落提取（RSE）策略来筛选最有用的文档内容并生成最终回答。

In [None]:
def rag_with_rse(pdf_path: str, query: str, chunk_size: int = 800, penalty: float = 0.2) -> Dict:
    """
    完整的 RAG 流程，使用相关段落提取（RSE）策略筛选最有用的文档内容。
    
    参数:
        pdf_path(str): PDF 文档路径
        query(str): 用户查询
        chunk_size(int): 文本切片大小
        penalty(float): 不相关切片的惩罚系数
        
    返回:
        Dict: 包含查询、选中的段落以及生成回答的结果字典
    """
    print("\n=== 开始执行基于相关段落提取的 RAG 流程 ===")
    print(f"查询内容: {query}")

    # 步骤 1：处理文档并生成向量存储
    text_chunks, vector_store, doc_info = process_document(pdf_path, chunk_size)

    # 步骤 2：计算每个文本块的相关性得分与价值值
    print("\n正在计算文本块相关性得分与价值值...")
    chunk_values = calculate_chunk_values(query, text_chunks, vector_store, penalty)

    # 步骤 3：根据价值值选择最优段落
    best_segments, scores = find_best_segments(
        chunk_values=chunk_values,
        max_segment_length=20,
        total_max_length=30,
        min_segment_value=0.2
    )

    # 步骤 4：重建最佳段落
    print("\n正在重建最佳文本段落...")
    selected_segments = reconstruct_segments(text_chunks, best_segments)

    # 步骤 5：格式化上下文供大模型使用
    formatted_context = format_segments_for_context(selected_segments)

    # 步骤 6：调用大模型生成最终回复
    response = generate_response(query, formatted_context)

    # 整理输出结果
    result = {
        "query": query,
        "segments": selected_segments,
        "response": response
    }

    print("\n=== 最终回复如下 ===")
    print(response)

    return result