# 使用上下文检索增强RAG

> 注意：有关上下文检索的更多背景信息，包括在各种数据集上的附加性能评估，我们建议阅读我们的配套[博客文章](https://www.anthropic.com/news/contextual-retrieval)。

检索增强生成（RAG）使Claude能够在提供响应时利用您的内部知识库、代码库或任何其他文档语料库。企业越来越多地构建RAG应用程序来改善客户支持、内部公司文档问答、财务和法律分析、代码生成等领域的流程。

在[单独指南](https://github.com/anthropics/anthropic-cookbook/blob/main/capabilities/retrieval_augmented_generation/guide.ipynb)中，我们介绍了如何设置基本检索系统，演示了如何评估其性能，然后概述了几种改进性能的技巧。在本指南中，我们介绍了一种改进检索性能的技术：上下文嵌入。

在传统的RAG中，文档通常被分割成更小的块以便高效检索。虽然这种方法对许多应用程序都很有效，但当单独的块缺乏足够的上下文时，可能会导致问题。上下文嵌入通过在嵌入之前为每个块添加相关上下文来解决这个问题。这种方法提高了每个嵌入块的质量，从而实现更准确的检索和更好的整体性能。在我们测试的所有数据源中平均计算，上下文嵌入将前20块检索失败率降低了35%。

相同的块特定上下文也可以与BM25搜索结合使用，进一步提高检索性能。我们在"上下文BM25"部分中介绍这种技术。

在本指南中，我们将演示如何使用包含9个代码库的数据集作为知识库来构建和优化上下文检索系统。我们将介绍：

1) 设置基本检索管道以建立性能基线。

2) 上下文嵌入：它是什么，为什么有效，以及提示缓存如何使其在实际生产用例中实用。

3) 实现上下文嵌入并演示性能改进。

4) 上下文BM25：使用*上下文* BM25混合搜索改进性能。

5) 通过重新排序改进性能。

### 评估指标和数据集：

我们使用9个代码库的预分块数据集 - 所有这些都是根据基本字符分割机制进行分块的。我们的评估数据集包含248个查询 - 每个查询都包含一个'黄金块'。我们将使用称为Pass@k的指标来评估性能。Pass@k检查每个查询的前k个检索文档中是否存在'黄金文档'。在这种情况下，上下文嵌入帮助我们将Pass@10性能从~87%提高到~95%。

您可以在`data/codebase_chunks.json`中找到代码文件及其块，在`data/evaluation_set.jsonl`中找到评估数据集

#### 附加说明：

在使用此检索方法时，提示缓存有助于管理成本。此功能目前在Anthropic的第一方API上可用，即将很快在AWS Bedrock和GCP Vertex的第三方合作伙伴环境中提供。我们知道许多客户在构建RAG解决方案时利用AWS Knowledge Bases和GCP Vertex AI API，通过一些定制，此方法可以在任一平台上使用。考虑联系Anthropic或您的AWS/GCP账户团队以获取此方面的指导！

为了更容易在Bedrock上使用此方法，AWS团队为我们提供了代码，您可以使用它来实现向每个文档添加上下文的Lambda函数。如果您部署此Lambda函数，则可以在配置[Bedrock Knowledge Base](https://docs.aws.amazon.com/bedrock/latest/userguide/knowledge-base-create.html)时选择它作为自定义分块选项。您可以在`contextual-rag-lambda-function`中找到此代码。主要lambda函数代码在`lambda_function.py`中。

## 目录

1) 设置

2) 基本RAG

3) 上下文嵌入

4) 上下文BM25

5) 重新排序

## 设置

在开始本指南之前，请确保您具备：

**技术技能：**
- 中级Python编程
- 对RAG（检索增强生成）的基本理解
- 熟悉向量数据库和嵌入
- 基本命令行熟练度

**系统要求：**
- Python 3.8+
- 已安装并运行Docker（可选，用于BM25搜索）
- 4GB+可用RAM
- ~5-10GB向量数据库磁盘空间

**API访问：**
- [Anthropic API密钥](https://console.anthropic.com/)（免费套餐足够）
- [Voyage AI API密钥](https://www.voyageai.com/)
- [Cohere API密钥](https://cohere.com/)

**时间和成本：**
- 预计完成时间：30-45分钟
- API成本：在整个数据集上运行约需$5-10

### 库

我们需要一些库，包括：

1) `anthropic` - 与Claude交互

2) `voyageai` - 生成高质量嵌入

3) `cohere` - 用于重新排序

4) `elasticsearch` 用于高性能BM25搜索

3) `pandas`、`numpy`、`matplotlib`和`scikit-learn`用于数据操作和可视化

### 环境变量

确保设置了以下环境变量：

```
- VOYAGE_API_KEY
- ANTHROPIC_API_KEY
- COHERE_API_KEY
```

In [6]:
%%capture
!pip install --upgrade anthropic voyageai cohere elasticsearch pandas numpy

我们提前定义模型名称，以便在新模型发布时更容易更改模型

In [None]:
MODEL_NAME = "claude-haiku-4-5"

我们将通过初始化Anthropic客户端开始，我们将使用它来生成上下文描述。

In [None]:
client = anthropic.Anthropic(
    # 这是默认值，可以省略
    api_key=os.getenv("ANTHROPIC_API_KEY"),
)

## 初始化向量数据库类

我们将创建一个VectorDB类来处理嵌入存储和相似性搜索。该类在我们的RAG管道中发挥三个关键功能：

1. **嵌入生成**：使用Voyage AI的嵌入模型将文本块转换为向量表示
2. **存储和缓存**：将嵌入保存到磁盘以避免重新计算（节省时间和API成本）
3. **相似性搜索**：使用余弦相似性检索与给定查询最相关的块

对于本指南，我们使用一个简单的内存向量数据库与pickle序列化。这使得代码易于理解且不需要外部依赖。该类在生成后自动将嵌入保存到磁盘，因此您只需支付一次嵌入成本。

对于生产使用，请考虑托管向量数据库解决方案。

下面的VectorDB类遵循与您在生产解决方案中使用的相同接口模式，使其易于稍后交换。主要功能包括批处理（一次128块）、使用tqdm进行进度跟踪，以及查询缓存在评估期间加速重复搜索。

In [None]:
import pickle
import json
import numpy as np
import voyageai
from typing import List, Dict, Any
from tqdm import tqdm


class VectorDB:
    def __init__(self, name: str, api_key=None):
        if api_key is None:
            api_key = os.getenv("VOYAGE_API_KEY")
        self.client = voyageai.Client(api_key=api_key)
        self.name = name
        self.embeddings = []
        self.metadata = []
        self.query_cache = {}
        self.db_path = f"./data/{name}/vector_db.pkl"

    def load_data(self, dataset: List[Dict[str, Any]]):
        if self.embeddings and self.metadata:
            print("Vector database is already loaded. Skipping data loading.")
            return
        if os.path.exists(self.db_path):
            print("Loading vector database from disk.")
            self.load_db()
            return

        texts_to_embed = []
        metadata = []
        total_chunks = sum(len(doc["chunks"]) for doc in dataset)

        with tqdm(total=total_chunks, desc="Processing chunks") as pbar:
            for doc in dataset:
                for chunk in doc["chunks"]:
                    texts_to_embed.append(chunk["content"])
                    metadata.append(
                        {
                            "doc_id": doc["doc_id"],
                            "original_uuid": doc["original_uuid"],
                            "chunk_id": chunk["chunk_id"],
                            "original_index": chunk["original_index"],
                            "content": chunk["content"],
                        }
                    )
                    pbar.update(1)

        self._embed_and_store(texts_to_embed, metadata)
        self.save_db()

        print(f"Vector database loaded and saved. Total chunks processed: {len(texts_to_embed)}")

    def _embed_and_store(self, texts: List[str], data: List[Dict[str, Any]]):
        batch_size = 128
        with tqdm(total=len(texts), desc="Embedding chunks") as pbar:
            result = []
            for i in range(0, len(texts), batch_size):
                batch = texts[i : i + batch_size]
                batch_result = self.client.embed(batch, model="voyage-2").embeddings
                result.extend(batch_result)
                pbar.update(len(batch))

        self.embeddings = result
        self.metadata = data

    def search(self, query: str, k: int = 20) -> List[Dict[str, Any]]:
        if query in self.query_cache:
            query_embedding = self.query_cache[query]
        else:
            query_embedding = self.client.embed([query], model="voyage-2").embeddings[0]
            self.query_cache[query] = query_embedding

        if not self.embeddings:
            raise ValueError("No data loaded in the vector database.")

        similarities = np.dot(self.embeddings, query_embedding)
        top_indices = np.argsort(similarities)[::-1][:k]

        top_results = []
        for idx in top_indices:
            result = {
                "metadata": self.metadata[idx],
                "similarity": float(similarities[idx]),
            }
            top_results.append(result)

        return top_results

    def save_db(self):
        data = {
            "embeddings": self.embeddings,
            "metadata": self.metadata,
            "query_cache": json.dumps(self.query_cache),
        }
        os.makedirs(os.path.dirname(self.db_path), exist_ok=True)
        with open(self.db_path, "wb") as file:
            pickle.dump(data, file)

    def load_db(self):
        if not os.path.exists(self.db_path):
            raise ValueError(
                "Vector database file not found. Use load_data to create a new database."
            )
        with open(self.db_path, "rb") as file:
            data = pickle.load(file)
        self.embeddings = data["embeddings"]
        self.metadata = data["metadata"]
        self.query_cache = json.loads(data["query_cache"])

现在我们可以使用此类来加载我们的数据集

In [None]:
# 加载您转换后的数据集
with open("data/codebase_chunks.json", "r") as f:
    transformed_dataset = json.load(f)

# 初始化VectorDB
base_db = VectorDB("base_db")

# 加载和处理数据
base_db.load_data(transformed_dataset)

## 基本RAG

首先，我们将使用一个简单的方法设置基本的RAG管道。这在业界有时被称为'朴素RAG'。基本RAG管道包括以下3个步骤：

1) 按标题分块文档 - 仅包含每个子标题的内容

2) 嵌入每个文档

3) 使用余弦相似性检索文档以回答查询

In [None]:
def load_jsonl(file_path: str) -> List[Dict[str, Any]]:
    """加载JSONL文件并返回字典列表。"""
    with open(file_path, "r") as file:
        return [json.loads(line) for line in file]


def evaluate_retrieval(
    queries: List[Dict[str, Any]], retrieval_function: Callable, db, k: int = 20
) -> Dict[str, float]:
    total_score = 0
    total_queries = len(queries)

    for query_item in tqdm(queries, desc="评估检索"):
        query = query_item["query"]
        golden_chunk_uuids = query_item["golden_chunk_uuids"]

        # 查找所有黄金块内容
        golden_contents = []
        for doc_uuid, chunk_index in golden_chunk_uuids:
            golden_doc = next(
                (doc for doc in query_item["golden_documents"] if doc["uuid"] == doc_uuid), None
            )
            if not golden_doc:
                print(f"警告：未找到UUID为{doc_uuid}的黄金文档")
                continue

            golden_chunk = next(
                (chunk for chunk in golden_doc["chunks"] if chunk["index"] == chunk_index), None
            )
            if not golden_chunk:
                print(
                    f"警告：在文档{doc_uuid}中未找到索引{chunk_index}的黄金块"
                )
                continue

            golden_contents.append(golden_chunk["content"].strip())

        if not golden_contents:
            print(f"警告：未找到查询的黄金内容: {query}")
            continue

        retrieved_docs = retrieval_function(query, db, k=k)

        # 计算前k个检索文档中有多少黄金块
        chunks_found = 0
        for golden_content in golden_contents:
            for doc in retrieved_docs[:k]:
                retrieved_content = (
                    doc["metadata"]
                    .get("original_content", doc["metadata"].get("content", ""))
                    .strip()
                )
                if retrieved_content == golden_content:
                    chunks_found += 1
                    break

        query_score = chunks_found / len(golden_contents)
        total_score += query_score

    average_score = total_score / total_queries
    pass_at_n = average_score * 100
    return {"pass_at_n": pass_at_n, "average_score": average_score, "total_queries": total_queries}


def retrieve_base(query: str, db, k: int = 20) -> List[Dict[str, Any]]:
    """
    使用VectorDB或ContextualVectorDB检索相关文档。

    :param query: 查询字符串
    :param db: VectorDB或ContextualVectorDB实例
    :param k: 检索的顶部结果数
    :return: 检索文档列表
    """
    return db.search(query, k=k)


def evaluate_db(db, original_jsonl_path: str, k):
    # 加载查询和基本事实的原始JSONL数据
    original_data = load_jsonl(original_jsonl_path)

    # 评估检索
    results = evaluate_retrieval(original_data, retrieve_base, db, k)
    return results


def evaluate_and_display(db, jsonl_path: str, k_values: List[int] = [5, 10, 20], db_name: str = ""):
    """
    在多个k值上评估检索性能并显示格式化结果。

    Args:
        db: 向量数据库实例 (VectorDB或ContextualVectorDB)
        jsonl_path: 评估数据集路径
        k_values: 要评估的k值列表 (默认: [5, 10, 20])
        db_name: 被评估数据库的可选名称

    Returns:
        将k值映射到其结果的字典
    """
    results = {}

    print(f"{'=' * 60}")
    if db_name:
        print(f"评估结果: {db_name}")
    else:
        print("评估结果")
    print(f"{'=' * 60}\n")

    for k in k_values:
        print(f"评估Pass@{k}...")
        results[k] = evaluate_db(db, jsonl_path, k)
        print()  # 在评估之间添加间距

    # 打印汇总表
    print(f"{'=' * 60}")
    print(f"{'指标':<15} {'通过率':<15} {'分数':<15}")
    print(f"{'-' * 60}")
    for k in k_values:
        pass_rate = f"{results[k]['pass_at_n']:.2f}%"
        score = f"{results[k]['average_score']:.4f}"
        print(f"{'Pass@' + str(k):<15} {pass_rate:<15} {score:<15}")
    print(f"{'=' * 60}\n")

    return results

现在让我们通过评估基本RAG系统来建立基线性能。我们将在k=5、10和20进行测试，以查看有多少黄金块出现在检索结果的前面。这为我们提供了衡量改进的基准。

In [None]:
results = evaluate_and_display(
    base_db, "data/evaluation_set.jsonl", k_values=[5, 10, 20], db_name="基线RAG"
)

这些结果显示了我们的基线RAG性能。系统在顶部5个结果中成功检索正确块的81%，在顶部10个结果中提高到87%，在顶部20个结果中达到90%。

## 上下文嵌入

对于基本RAG，单独的块在孤立嵌入时通常缺乏足够的上下文。上下文嵌入通过使用Claude生成一个简短的描述来"定位"每个块在源文档中的位置来解决这个问题。然后我们将块与此上下文一起嵌入，创建更丰富的向量表示。

对于我们代码库数据集中的每个块，我们将块及其完整源文件都传递给Claude。Claude生成一个简明解释，说明块包含什么以及它在整体文件中的位置。此上下文在嵌入之前被预置到块上。

### 成本和延迟考虑

**何时产生此成本？** 上下文化在摄取时发生一次，而不是在每次查询时发生。与像HyDE（假设文档嵌入）这样的技术不同，后者为每个搜索增加延迟，上下文嵌入在构建向量数据库时是一次性成本。提示缓存使其实用。由于我们按顺序处理来自同一文档的所有块，我们可以利用提示缓存来节省大量成本。

1. 第一个块：我们将完整文档写入缓存（支付少量溢价）
2. 后续块：从缓存读取文档（这些令牌90%折扣）
3. 缓存持续5分钟，有足够时间处理文档中的所有块

**成本示例**：对于8k-token文档中800-token块和100令牌生成上下文，总成本为每百万文档令牌$1.02。当您运行下面的代码时，您将在日志中看到缓存节省。

**注意：** 一些嵌入模型有固定的输入令牌限制。如果您看到上下文嵌入的性能更差，您的上下文块可能被截断 - 考虑使用具有更大上下文窗口的嵌入模型。

--- 

让我们通过为单个块生成上下文来看一个上下文嵌入如何工作的示例。我们将使用Claude来创建定位上下文，您还将看到提示缓存指标的实际效果。

In [None]:
DOCUMENT_CONTEXT_PROMPT = """
<document>
{doc_content}
</document>
"""

CHUNK_CONTEXT_PROMPT = """
Here is the chunk we want to situate within the whole document
<chunk>
{chunk_content}
</chunk>

Please give a short succinct context to situate this chunk within the overall document for the purposes of improving search retrieval of the chunk.
Answer only with the succinct context and nothing else.
"""


def situate_context(doc: str, chunk: str) -> str:
    response = client.messages.create(
        model=MODEL_NAME,
        max_tokens=1024,
        temperature=0.0,
        messages=[
            {
                "role": "user",
                "content": [
                    {
                        "type": "text",
                        "text": DOCUMENT_CONTEXT_PROMPT.format(doc_content=doc),
                        "cache_control": {
                            "type": "ephemeral"
                        },  # 我们将对完整文档利用提示缓存
                    },
                    {
                        "type": "text",
                        "text": CHUNK_CONTEXT_PROMPT.format(chunk_content=chunk),
                    },
                ],
            }
        ],
    )
    return response


jsonl_data = load_jsonl("data/evaluation_set.jsonl")
# 示例用法
doc_content = jsonl_data[0]["golden_documents"][0]["content"]
chunk_content = jsonl_data[0]["golden_chunks"][0]["content"]

response = situate_context(doc_content, chunk_content)
print(f"Situated context: {response.content[0].text}")
print("-" * 10)
# 打印缓存性能指标
print(f"输入令牌: {response.usage.input_tokens}")
print(f"输出令牌: {response.usage.output_tokens}")
print(f"缓存创建输入令牌: {response.usage.cache_creation_input_tokens}")
print(f"缓存读取输入令牌: {response.usage.cache_read_input_tokens}")

### 构建上下文向量数据库

现在我们已经了解了如何为单个块生成上下文描述，让我们将其扩展到处理整个数据集。下面的`ContextualVectorDB`类扩展了我们的基本`VectorDB`，在摄取期间自动进行上下文化。

**主要功能：**

- **并行处理**：使用ThreadPoolExecutor同时对多个块进行上下文化（可配置线程数）
- **自动提示缓存**：逐文档处理块以最大化缓存命中
- **令牌跟踪**：监控缓存性能并计算实际成本节省
- **持久存储**：将嵌入和上下文化元数据保存到磁盘

当您运行此程序时，请注意令牌使用统计 - 您将看到70-80%的输入令牌来自缓存，展示了提示缓存的巨大成本节省。在我们737块的数据集上，这将原本约15美元的摄取作业成本降至约3美元。

In [None]:
import os
import pickle
import json
import numpy as np
import voyageai
from typing import List, Dict, Any
from tqdm import tqdm
import anthropic
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed


class ContextualVectorDB:
    def __init__(self, name: str, voyage_api_key=None, anthropic_api_key=None):
        if voyage_api_key is None:
            voyage_api_key = os.getenv("VOYAGE_API_KEY")
        if anthropic_api_key is None:
            anthropic_api_key = os.getenv("ANTHROPIC_API_KEY")

        self.voyage_client = voyageai.Client(api_key=voyage_api_key)
        self.anthropic_client = anthropic.Anthropic(api_key=anthropic_api_key)
        self.name = name
        self.embeddings = []
        self.metadata = []
        self.query_cache = {}
        self.db_path = f"./data/{name}/contextual_vector_db.pkl"

        self.token_counts = {"input": 0, "output": 0, "cache_read": 0, "cache_creation": 0}
        self.token_lock = threading.Lock()

    def situate_context(self, doc: str, chunk: str) -> tuple[str, Any]:
        DOCUMENT_CONTEXT_PROMPT = """
        <document>
        {doc_content}
        </document>
        """

        CHUNK_CONTEXT_PROMPT = """
        Here is the chunk we want to situate within the whole document
        <chunk>
        {chunk_content}
        </chunk>

        Please give a short succinct context to situate this chunk within the overall document for the purposes of improving search retrieval of the chunk.
        Answer only with the succinct context and nothing else.
        """

        response = self.anthropic_client.messages.create(
            model=MODEL_NAME,
            max_tokens=1000,
            temperature=0.0,
            messages=[
                {
                    "role": "user",
                    "content": [
                        {
                            "type": "text",
                            "text": DOCUMENT_CONTEXT_PROMPT.format(doc_content=doc),
                            "cache_control": {
                                "type": "ephemeral"
                            },  # we will make use of prompt caching for the full documents
                        },
                        {
                            "type": "text",
                            "text": CHUNK_CONTEXT_PROMPT.format(chunk_content=chunk),
                        },
                    ],
                },
            ],
            extra_headers={"anthropic-beta": "prompt-caching-2024-07-31"},
        )
        return response.content[0].text, response.usage

    def load_data(self, dataset: List[Dict[str, Any]], parallel_threads: int = 1):
        if self.embeddings and self.metadata:
            print("Vector database is already loaded. Skipping data loading.")
            return
        if os.path.exists(self.db_path):
            print("Loading vector database from disk.")
            self.load_db()
            return

        texts_to_embed = []
        metadata = []
        total_chunks = sum(len(doc["chunks"]) for doc in dataset)

        def process_chunk(doc, chunk):
            # for each chunk, produce the context
            contextualized_text, usage = self.situate_context(doc["content"], chunk["content"])
            with self.token_lock:
                self.token_counts["input"] += usage.input_tokens
                self.token_counts["output"] += usage.output_tokens
                self.token_counts["cache_read"] += usage.cache_read_input_tokens
                self.token_counts["cache_creation"] += usage.cache_creation_input_tokens

            return {
                # append the context to the original text chunk
                "text_to_embed": f"{chunk['content']}\n\n{contextualized_text}",
                "metadata": {
                    "doc_id": doc["doc_id"],
                    "original_uuid": doc["original_uuid"],
                    "chunk_id": chunk["chunk_id"],
                    "original_index": chunk["original_index"],
                    "original_content": chunk["content"],
                    "contextualized_content": contextualized_text,
                },
            }

        print(f"Processing {total_chunks} chunks with {parallel_threads} threads")
        with ThreadPoolExecutor(max_workers=parallel_threads) as executor:
            futures = []
            for doc in dataset:
                for chunk in doc["chunks"]:
                    futures.append(executor.submit(process_chunk, doc, chunk))

            for future in tqdm(as_completed(futures), total=total_chunks, desc="Processing chunks"):
                result = future.result()
                texts_to_embed.append(result["text_to_embed"])
                metadata.append(result["metadata"])

        self._embed_and_store(texts_to_embed, metadata)
        self.save_db()

        # logging token usage
        print(
            f"Contextual Vector database loaded and saved. Total chunks processed: {len(texts_to_embed)}"
        )
        print(f"Total input tokens without caching: {self.token_counts['input']}")
        print(f"Total output tokens: {self.token_counts['output']}")
        print(f"Total input tokens written to cache: {self.token_counts['cache_creation']}")
        print(f"Total input tokens read from cache: {self.token_counts['cache_read']}")

        total_tokens = (
            self.token_counts["input"]
            + self.token_counts["cache_read"]
            + self.token_counts["cache_creation"]
        )
        savings_percentage = (
            (self.token_counts["cache_read"] / total_tokens) * 100 if total_tokens > 0 else 0
        )
        print(
            f"Total input token savings from prompt caching: {savings_percentage:.2f}% of all input tokens used were read from cache."
        )
        print("Tokens read from cache come at a 90 percent discount!")

    # we use voyage AI here for embeddings. Read more here: https://docs.voyageai.com/docs/embeddings
    def _embed_and_store(self, texts: List[str], data: List[Dict[str, Any]]):
        batch_size = 128
        result = [
            self.voyage_client.embed(texts[i : i + batch_size], model="voyage-2").embeddings
            for i in range(0, len(texts), batch_size)
        ]
        self.embeddings = [embedding for batch in result for embedding in batch]
        self.metadata = data

    def search(self, query: str, k: int = 20) -> List[Dict[str, Any]]:
        if query in self.query_cache:
            query_embedding = self.query_cache[query]
        else:
            query_embedding = self.voyage_client.embed([query], model="voyage-2").embeddings[0]
            self.query_cache[query] = query_embedding

        if not self.embeddings:
            raise ValueError("No data loaded in the vector database.")

        similarities = np.dot(self.embeddings, query_embedding)
        top_indices = np.argsort(similarities)[::-1][:k]

        top_results = []
        for idx in top_indices:
            result = {
                "metadata": self.metadata[idx],
                "similarity": float(similarities[idx]),
            }
            top_results.append(result)
        return top_results

    def save_db(self):
        data = {
            "embeddings": self.embeddings,
            "metadata": self.metadata,
            "query_cache": json.dumps(self.query_cache),
        }
        os.makedirs(os.path.dirname(self.db_path), exist_ok=True)
        with open(self.db_path, "wb") as file:
            pickle.dump(data, file)

    def load_db(self):
        if not os.path.exists(self.db_path):
            raise ValueError(
                "Vector database file not found. Use load_data to create a new database."
            )
        with open(self.db_path, "rb") as file:
            data = pickle.load(file)
        self.embeddings = data["embeddings"]
        self.metadata = data["metadata"]
        self.query_cache = json.loads(data["query_cache"])

In [None]:
# 加载转换后的数据集
with open("data/codebase_chunks.json", "r") as f:
    transformed_dataset = json.load(f)

# 初始化ContextualVectorDB
contextual_db = ContextualVectorDB("my_contextual_db")

# 加载和处理数据
# 注意：考虑增加并行线程数来更快运行此程序，或如果担心达到API速率限制而减少并行线程数
contextual_db.load_data(transformed_dataset, parallel_threads=5)

这些数字揭示了提示缓存对上下文嵌入的强大功能：

- 我们在9个代码库文件中处理了**737个块**
- **61.83%的输入令牌**来自缓存（2.27M令牌，90%折扣）
- 没有缓存，这将花费输入令牌约**$9.20**
- 有缓存，实际成本降至**$2.85**（69%节省）

缓存命中率取决于每个文档包含多少块。包含更多块的文档从缓存中受益更多，因为我们只将完整文档写入缓存一次，然后为该文件中的每个块重复读取它。这就是为什么顺序处理文档（而不是随机打乱块）对于最大化缓存效率至关重要。

现在让我们评估这种上下文化相比基线在多大程度上改善了我们的检索性能。

In [28]:
results = evaluate_and_display(
    contextual_db,
    "data/evaluation_set.jsonl",
    k_values=[5, 10, 20],
    db_name="Contextual Embeddings",
)

Evaluation Results: Contextual Embeddings

Evaluating Pass@5...


Evaluating retrieval: 100%|██████████| 248/248 [00:03<00:00, 64.58it/s]



Evaluating Pass@10...


Evaluating retrieval: 100%|██████████| 248/248 [00:03<00:00, 64.37it/s]



Evaluating Pass@20...


Evaluating retrieval: 100%|██████████| 248/248 [00:03<00:00, 64.14it/s]


Metric          Pass Rate       Score          
------------------------------------------------------------
Pass@5          88.12%          0.8812         
Pass@10         92.34%          0.9234         
Pass@20         94.29%          0.9429         






通过在嵌入之前为每个块添加上下文，我们在所有k值上将检索失败率降低了**~30-40%**。这意味着在您检索的顶部块中不相关的结果更少，当您将这些块传递给Claude进行最终响应生成时，会获得更好的答案。

这种改进在Pass@5最明显，此时精度最重要 - 表明上下文化块不仅更常被检索，而且在相关时排名更高。

## 上下文BM25：混合搜索

仅上下文嵌入就将我们的Pass@10从87%提高到92%。我们可以通过使用**上下文BM25**结合语义搜索和基于关键字的搜索来进一步推动性能 - 一种进一步降低检索失败率的混合方法。

### 为什么使用混合搜索？

语义搜索擅长理解和上下文，但可能错过精确的关键字匹配。BM25（概率关键字排名算法）擅长查找特定术语，但缺乏语义理解。通过结合两者，我们获得了两全其美的效果：

- **语义搜索**：捕获概念相似性和释义
- **BM25**：捕获精确术语、函数名称和特定短语
- **倒数排名融合**：智能合并来自两个源的结果

### 什么是BM25？

BM25是一种概率排名函数，通过考虑文档长度和术语饱和来改进TF-IDF。它广泛应用于生产搜索引擎（包括Elasticsearch），因其有效性而用于排名关键字相关性。有关技术细节，请参阅[这篇博客文章](https://www.elastic.co/blog/practical-bm25-part-2-the-bm25-algorithm-and-its-variables)。

我们不仅搜索原始块内容，还搜索我们之前生成的块*和*上下文描述。这意味着BM25可以匹配原始文本或解释上下文中的关键字。

### 设置：运行Elasticsearch

在运行下面的代码之前，您需要本地运行Elasticsearch。最简单的方法是通过Docker：

```bash
docker run -d --name elasticsearch -p 9200:9200 -p 9300:9300 \
  -e "discovery.type=single-node" \
  -e "xpack.security.enabled=false" \
  elasticsearch:9.2.0
```

## 故障排除：
- 验证它正在运行：docker ps | grep elasticsearch
- 如果端口9200被占用：docker stop elasticsearch && docker rm elasticsearch
- 如果出现问题检查日志：docker logs elasticsearch

## 混合搜索如何工作

下面的retrieve_advanced函数实现了三步过程：

1. 检索候选：从语义搜索和BM25获取前150个结果
2. 分数融合：使用加权倒数排名融合组合排名
   - 默认：语义搜索80%权重，BM25 20%权重
   - 这些权重是可调的 - 实验以优化您的用例
3. 返回top-k：选择融合后得分最高的结果

权重系统让您可以根据数据特征在语义理解和关键字精度之间平衡。

In [None]:
import os
import json
from typing import List, Dict, Any
from tqdm import tqdm
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk


class ElasticsearchBM25:
    def __init__(self, index_name: str = "contextual_bm25_index"):
        self.es_client = Elasticsearch("http://localhost:9200")
        self.index_name = index_name
        self.create_index()

    def create_index(self):
        index_settings = {
            "settings": {
                "analysis": {"analyzer": {"default": {"type": "english"}}},
                "similarity": {"default": {"type": "BM25"}},
                "index.queries.cache.enabled": False,
            },
            "mappings": {
                "properties": {
                    "content": {"type": "text", "analyzer": "english"},
                    "contextualized_content": {"type": "text", "analyzer": "english"},
                    "doc_id": {"type": "keyword", "index": False},
                    "chunk_id": {"type": "keyword", "index": False},
                    "original_index": {"type": "integer", "index": False},
                }
            },
        }

        # Change this line - remove 'body=' parameter
        if not self.es_client.indices.exists(index=self.index_name):
            self.es_client.indices.create(
                index=self.index_name,
                settings=index_settings["settings"],
                mappings=index_settings["mappings"],
            )
            print(f"Created index: {self.index_name}")

    def index_documents(self, documents: List[Dict[str, Any]]):
        actions = [
            {
                "_index": self.index_name,
                "_source": {
                    "content": doc["original_content"],
                    "contextualized_content": doc["contextualized_content"],
                    "doc_id": doc["doc_id"],
                    "chunk_id": doc["chunk_id"],
                    "original_index": doc["original_index"],
                },
            }
            for doc in documents
        ]
        success, _ = bulk(self.es_client, actions)
        self.es_client.indices.refresh(index=self.index_name)
        return success

    def search(self, query: str, k: int = 20) -> List[Dict[str, Any]]:
        self.es_client.indices.refresh(index=self.index_name)

        # Change this - remove 'body=' and pass query directly
        response = self.es_client.search(
            index=self.index_name,
            query={
                "multi_match": {
                    "query": query,
                    "fields": ["content", "contextualized_content"],
                }
            },
            size=k,
        )

        return [
            {
                "doc_id": hit["_source"]["doc_id"],
                "original_index": hit["_source"]["original_index"],
                "content": hit["_source"]["content"],
                "contextualized_content": hit["_source"]["contextualized_content"],
                "score": hit["_score"],
            }
            for hit in response["hits"]["hits"]
        ]


def create_elasticsearch_bm25_index(db: ContextualVectorDB):
    es_bm25 = ElasticsearchBM25()
    es_bm25.index_documents(db.metadata)
    return es_bm25


def retrieve_advanced(
    query: str,
    db: ContextualVectorDB,
    es_bm25: ElasticsearchBM25,
    k: int,
    semantic_weight: float = 0.8,
    bm25_weight: float = 0.2,
):
    num_chunks_to_recall = 150

    # Semantic search
    semantic_results = db.search(query, k=num_chunks_to_recall)
    ranked_chunk_ids = [
        (result["metadata"]["doc_id"], result["metadata"]["original_index"])
        for result in semantic_results
    ]

    # BM25 search using Elasticsearch
    bm25_results = es_bm25.search(query, k=num_chunks_to_recall)
    ranked_bm25_chunk_ids = [
        (result["doc_id"], result["original_index"]) for result in bm25_results
    ]

    # Combine results
    chunk_ids = list(set(ranked_chunk_ids + ranked_bm25_chunk_ids))
    chunk_id_to_score = {}

    # Initial scoring with weights
    for chunk_id in chunk_ids:
        score = 0
        if chunk_id in ranked_chunk_ids:
            index = ranked_chunk_ids.index(chunk_id)
            score += semantic_weight * (1 / (index + 1))  # Weighted 1/n scoring for semantic
        if chunk_id in ranked_bm25_chunk_ids:
            index = ranked_bm25_chunk_ids.index(chunk_id)
            score += bm25_weight * (1 / (index + 1))  # Weighted 1/n scoring for BM25
        chunk_id_to_score[chunk_id] = score

    # Sort chunk IDs by their scores in descending order
    sorted_chunk_ids = sorted(
        chunk_id_to_score.keys(), key=lambda x: (chunk_id_to_score[x], x[0], x[1]), reverse=True
    )

    # Assign new scores based on the sorted order
    for index, chunk_id in enumerate(sorted_chunk_ids):
        chunk_id_to_score[chunk_id] = 1 / (index + 1)

    # Prepare the final results
    final_results = []
    semantic_count = 0
    bm25_count = 0
    for chunk_id in sorted_chunk_ids[:k]:
        chunk_metadata = next(
            chunk
            for chunk in db.metadata
            if chunk["doc_id"] == chunk_id[0] and chunk["original_index"] == chunk_id[1]
        )
        is_from_semantic = chunk_id in ranked_chunk_ids
        is_from_bm25 = chunk_id in ranked_bm25_chunk_ids
        final_results.append(
            {
                "chunk": chunk_metadata,
                "score": chunk_id_to_score[chunk_id],
                "from_semantic": is_from_semantic,
                "from_bm25": is_from_bm25,
            }
        )

        if is_from_semantic and not is_from_bm25:
            semantic_count += 1
        elif is_from_bm25 and not is_from_semantic:
            bm25_count += 1
        else:  # it's in both
            semantic_count += 0.5
            bm25_count += 0.5

    return final_results, semantic_count, bm25_count


def evaluate_db_advanced(
    db: ContextualVectorDB,
    original_jsonl_path: str,
    k_values: List[int] = [5, 10, 20],
    db_name: str = "Hybrid Search",
):
    """
    Evaluate hybrid search (semantic + BM25) at multiple k values with formatted results.

    Args:
        db: ContextualVectorDB instance
        original_jsonl_path: Path to evaluation dataset
        k_values: List of k values to evaluate (default: [5, 10, 20])
        db_name: Name for the evaluation display

    Returns:
        Dict mapping k values to their results and source breakdowns
    """
    original_data = load_jsonl(original_jsonl_path)
    es_bm25 = create_elasticsearch_bm25_index(db)
    results = {}

    print(f"{'=' * 70}")
    print(f"Evaluation Results: {db_name}")
    print(f"{'=' * 70}\n")

    try:
        # Warm-up queries
        warm_up_queries = original_data[:10]
        for query_item in warm_up_queries:
            _ = retrieve_advanced(query_item["query"], db, es_bm25, k_values[0])

        for k in k_values:
            print(f"Evaluating Pass@{k}...")

            total_score = 0
            total_semantic_count = 0
            total_bm25_count = 0
            total_results = 0

            for query_item in tqdm(original_data, desc=f"Pass@{k}"):
                query = query_item["query"]
                golden_chunk_uuids = query_item["golden_chunk_uuids"]

                golden_contents = []
                for doc_uuid, chunk_index in golden_chunk_uuids:
                    golden_doc = next(
                        (doc for doc in query_item["golden_documents"] if doc["uuid"] == doc_uuid),
                        None,
                    )
                    if golden_doc:
                        golden_chunk = next(
                            (
                                chunk
                                for chunk in golden_doc["chunks"]
                                if chunk["index"] == chunk_index
                            ),
                            None,
                        )
                        if golden_chunk:
                            golden_contents.append(golden_chunk["content"].strip())

                if not golden_contents:
                    continue

                retrieved_docs, semantic_count, bm25_count = retrieve_advanced(
                    query, db, es_bm25, k
                )

                chunks_found = 0
                for golden_content in golden_contents:
                    for doc in retrieved_docs[:k]:
                        retrieved_content = doc["chunk"]["original_content"].strip()
                        if retrieved_content == golden_content:
                            chunks_found += 1
                            break

                query_score = chunks_found / len(golden_contents)
                total_score += query_score

                total_semantic_count += semantic_count
                total_bm25_count += bm25_count
                total_results += len(retrieved_docs)

            total_queries = len(original_data)
            average_score = total_score / total_queries
            pass_at_n = average_score * 100

            semantic_percentage = (
                (total_semantic_count / total_results) * 100 if total_results > 0 else 0
            )
            bm25_percentage = (total_bm25_count / total_results) * 100 if total_results > 0 else 0

            results[k] = {
                "pass_at_n": pass_at_n,
                "average_score": average_score,
                "total_queries": total_queries,
                "semantic_percentage": semantic_percentage,
                "bm25_percentage": bm25_percentage,
            }

            print(f"Pass@{k}: {pass_at_n:.2f}%")
            print(f"Semantic: {semantic_percentage:.1f}% | BM25: {bm25_percentage:.1f}%\n")

        # Print summary table
        print(f"{'=' * 70}")
        print(f"{'Metric':<12} {'Pass Rate':<12} {'Score':<12} {'Semantic':<12} {'BM25':<12}")
        print(f"{'-' * 70}")
        for k in k_values:
            r = results[k]
            print(
                f"{'Pass@' + str(k):<12} {r['pass_at_n']:>10.2f}% {r['average_score']:>10.4f} "
                f"{r['semantic_percentage']:>10.1f}% {r['bm25_percentage']:>10.1f}%"
            )
        print(f"{'=' * 70}\n")

        return results

    finally:
        # Delete the Elasticsearch index
        if es_bm25.es_client.indices.exists(index=es_bm25.index_name):
            es_bm25.es_client.indices.delete(index=es_bm25.index_name)
            print(f"Deleted Elasticsearch index: {es_bm25.index_name}")

In [39]:
results = evaluate_db_advanced(
    contextual_db,
    "data/evaluation_set.jsonl",
    k_values=[5, 10, 20],
    db_name="Contextual BM25 Hybrid Search",
)

Created index: contextual_bm25_index
Evaluation Results: Contextual BM25 Hybrid Search

Evaluating Pass@5...


Pass@5: 100%|██████████| 248/248 [00:05<00:00, 41.79it/s]


Pass@5: 88.86%
Semantic: 54.6% | BM25: 45.4%

Evaluating Pass@10...


Pass@10: 100%|██████████| 248/248 [00:05<00:00, 42.20it/s]


Pass@10: 92.31%
Semantic: 57.6% | BM25: 42.4%

Evaluating Pass@20...


Pass@20: 100%|██████████| 248/248 [00:05<00:00, 42.15it/s]


Pass@20: 95.23%
Semantic: 60.8% | BM25: 39.2%

Metric       Pass Rate    Score        Semantic     BM25        
----------------------------------------------------------------------
Pass@5            88.86%     0.8886       54.6%       45.4%
Pass@10           92.31%     0.9231       57.6%       42.4%
Pass@20           95.23%     0.9523       60.8%       39.2%

Deleted Elasticsearch index: contextual_bm25_index


## 重新排序

我们通过混合搜索取得了强劲的结果（93.21% Pass@10），但还有一种技术可以挤出额外的性能：**重新排序**。

### 什么是重新排序？

重新排序是一种两阶段检索方法：

1. **阶段1 - 广泛检索**：通过检索比您需要的更多候选来撒大网（例如，检索100个块）
2. **阶段2 - 精确选择**：使用专门的重新排序模型对这些候选进行评分，只选择top-k最相关的

**为什么这有效？** 初始检索方法（嵌入、BM25）针对跨数百万文档的速度进行了优化。重新排序模型较慢但更准确 - 它们可以对较小的候选集进行更深入的分析。这创造了一个速度/精度权衡，在实践中效果很好。

### 我们的重新排序方法

对于这个例子，我们将使用一个更简单的重新排序管道，仅基于上下文嵌入（不是完整的混合搜索）。这是过程：

1. **过度检索**：获取比需要多10倍的结果（例如，当我们需要10个时检索100个块）
2. **使用Cohere重新排序**：使用Cohere的`rerank-english-v3.0`模型对所有候选进行评分
3. **选择top-k**：只返回得分最高的结果

重新排序模型可以访问原始块内容和我们生成的上下文描述，使其具有做出精确相关性判断的丰富信息。

### 预期性能

添加重新排序可带来适度但有意义的改进：
- **没有重新排序**：92.34% Pass@10（仅上下文嵌入）
- **有重新排序**：~95% Pass@10（额外2-3%增益）

这看起来可能很小，但在生产系统中，将失败率从7.66%降低到~5%可以显著改善用户体验。权衡是查询延迟 - 重新排序根据候选集大小每查询增加~100-200ms延迟。

In [None]:
import cohere
from typing import List, Dict, Any, Callable
import json
from tqdm import tqdm


def evaluate_db_rerank(
    db, original_jsonl_path: str, k_values: List[int] = [5, 10, 20], db_name: str = "Reranking"
):
    """
    Evaluate reranking performance at multiple k values with formatted results.

    Args:
        db: ContextualVectorDB instance
        original_jsonl_path: Path to evaluation dataset
        k_values: List of k values to evaluate (default: [5, 10, 20])
        db_name: Name for the evaluation display

    Returns:
        Dict mapping k values to their results
    """
    original_data = load_jsonl(original_jsonl_path)
    co = cohere.Client(os.getenv("COHERE_API_KEY"))
    results = {}

    print(f"{'=' * 60}")
    print(f"Evaluation Results: {db_name}")
    print(f"{'=' * 60}\n")

    for k in k_values:
        print(f"Evaluating Pass@{k} with reranking...")

        total_score = 0
        total_queries = len(original_data)

        for query_item in tqdm(original_data, desc=f"Pass@{k}"):
            query = query_item["query"]
            golden_chunk_uuids = query_item["golden_chunk_uuids"]

            # Find golden contents
            golden_contents = []
            for doc_uuid, chunk_index in golden_chunk_uuids:
                golden_doc = next(
                    (doc for doc in query_item["golden_documents"] if doc["uuid"] == doc_uuid), None
                )
                if golden_doc:
                    golden_chunk = next(
                        (chunk for chunk in golden_doc["chunks"] if chunk["index"] == chunk_index),
                        None,
                    )
                    if golden_chunk:
                        golden_contents.append(golden_chunk["content"].strip())

            if not golden_contents:
                continue

            # Retrieve and rerank
            semantic_results = db.search(query, k=k * 10)

            # Prepare documents for reranking
            documents = [
                f"{res['metadata']['original_content']}\n\nContext: {res['metadata']['contextualized_content']}"
                for res in semantic_results
            ]

            # Rerank
            rerank_response = co.rerank(
                model="rerank-english-v3.0", query=query, documents=documents, top_n=k
            )
            time.sleep(0.1)  # Rate limiting

            # Get final results
            retrieved_docs = []
            for r in rerank_response.results:
                original_result = semantic_results[r.index]
                retrieved_docs.append(
                    {"chunk": original_result["metadata"], "score": r.relevance_score}
                )

            # Check if golden chunks are in results
            chunks_found = 0
            for golden_content in golden_contents:
                for doc in retrieved_docs[:k]:
                    retrieved_content = doc["chunk"]["original_content"].strip()
                    if retrieved_content == golden_content:
                        chunks_found += 1
                        break

            query_score = chunks_found / len(golden_contents)
            total_score += query_score

        average_score = total_score / total_queries
        pass_at_n = average_score * 100

        results[k] = {
            "pass_at_n": pass_at_n,
            "average_score": average_score,
            "total_queries": total_queries,
        }

        print(f"Pass@{k}: {pass_at_n:.2f}%")
        print(f"Average Score: {average_score:.4f}\n")

    # Print summary table
    print(f"{'=' * 60}")
    print(f"{'Metric':<15} {'Pass Rate':<15} {'Score':<15}")
    print(f"{'-' * 60}")
    for k in k_values:
        pass_rate = f"{results[k]['pass_at_n']:.2f}%"
        score = f"{results[k]['average_score']:.4f}"
        print(f"{'Pass@' + str(k):<15} {pass_rate:<15} {score:<15}")
    print(f"{'=' * 60}\n")

    return results

In [48]:
results = evaluate_db_rerank(
    contextual_db,
    "data/evaluation_set.jsonl",
    k_values=[5, 10, 20],
    db_name="Contextual Embeddings + Reranking",
)

Evaluation Results: Contextual Embeddings + Reranking

Evaluating Pass@5 with reranking...


Pass@5: 100%|██████████| 248/248 [01:40<00:00,  2.47it/s]


Pass@5: 92.15%
Average Score: 0.9215

Evaluating Pass@10 with reranking...


Pass@10: 100%|██████████| 248/248 [02:29<00:00,  1.66it/s]


Pass@10: 95.26%
Average Score: 0.9526

Evaluating Pass@20 with reranking...


Pass@20: 100%|██████████| 248/248 [03:03<00:00,  1.35it/s]

Pass@20: 97.45%
Average Score: 0.9745

Metric          Pass Rate       Score          
------------------------------------------------------------
Pass@5          92.15%          0.9215         
Pass@10         95.26%          0.9526         
Pass@20         97.45%          0.9745         






重新排序提供了我们最强的结果，几乎消除了检索失败。让我们看看每种技术如何建立在之前的基础上来实现这种改进。

从我们基线RAG系统的87% Pass@10开始，我们通过系统地应用先进的检索技术攀升到95%以上。每种方法都解决了不同的弱点：上下文嵌入解决了"孤立块"问题，混合搜索捕获嵌入错过的关键字特定查询，重新排序应用更复杂的相关性评分来优化最终选择。

| 方法 | Pass@5 | Pass@10 | Pass@20 |
|----------|--------|---------|---------|
| **基线RAG** | 80.92% | 87.15% | 90.06% |
| **+ 上下文嵌入** | 88.12% | 92.34% | 94.29% |
| **+ 混合搜索（BM25）** | 86.43% | 93.21% | 94.99% |
| **+ 重新排序** | 92.15% | 95.26% | 97.45% |

**关键要点：**

1. **上下文嵌入提供了最大的单一改进**（+5-7个百分点），验证了向块添加文档级上下文显著提高检索质量。这种技术 alone就能让您获得90%的最佳性能。

2. **重新排序达到最高绝对性能**，达到95.26% Pass@10 - 意味着95%查询的正确块出现在前10个结果中。这代表了相比基线RAG**检索失败率降低47%**（从12.85%失败率降至4.74%）。

3. **权衡很重要**：每种技术都增加复杂性和成本：
   - 上下文嵌入：一次性摄取成本（使用提示缓存此数据集约需3美元）
   - 混合搜索：需要Elasticsearch基础设施和维护
   - 重新排序：增加100-200ms查询延迟和每查询API成本（约$0.002每查询）

4. **根据您的需求选择方法**：
   - **高容量、成本敏感**：仅上下文嵌入（92% Pass@10，无每查询成本）
   - **最大精度、延迟容忍**：完整重新排序管道（95% Pass@10，最佳精度）
   - **平衡生产系统**：混合搜索，无每查询成本的强大性能（93% Pass@10）

对于大多数生产RAG系统，**上下文嵌入提供了最佳性能成本比**，只需一次性摄取成本即可提供92% Pass@10。当您需要额外的2-3个百分点精度并能负担额外的基础设施或查询成本时，混合搜索和重新排序可用。

### 下一步和关键要点

1) 我们演示了如何使用上下文嵌入来改进检索性能，然后通过上下文BM25和重新排序提供了额外的改进。

2) 此示例使用代码库，但这些方法也适用于其他数据类型，如内部公司知识库、财务和法律内容、教育内容等等。

3) 如果您是AWS用户，您可以从`contextual-rag-lambda-function`中的Lambda函数开始，如果您的GCP用户，您可以启动自己的Cloud Run实例并遵循类似模式！