# 向量化与高效检索实战

本notebook演示如何构建可检索的向量知识库，包括文档向量化、相似度检索等核心功能。

## 1. 环境准备

In [38]:
# 导入必要的库
from langchain_openai import OpenAIEmbeddings
from langchain_chroma import Chroma
from langchain_community.document_loaders import PyPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from dotenv import load_dotenv
import os

# 加载环境变量
load_dotenv()

True

## 2. 文档加载与分割

In [6]:
# 加载PDF文档
pdf_loader = PyPDFLoader("files/ddia.pdf")
raw_docs = pdf_loader.load()

print(f"加载了 {len(raw_docs)} 页文档")

加载了 613 页文档


In [7]:
# 文档分割
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=800,
    chunk_overlap=150,
    separators=["\n\n", "\n", "。", ".", " ", ""]
)

text_chunks = text_splitter.split_documents(raw_docs)
print(f"分割为 {len(text_chunks)} 个文本块")
print(f"\n示例块：\n{text_chunks[0].page_content[:200]}...")

分割为 2340 个文本块

示例块：
Martin Kleppmann
Designing 
Data-Intensive 
Applications
THE BIG IDEAS BEHIND RELIABLE, SCALABLE,  
AND MAINTAINABLE SYSTEMS...


## 3. Embedding模型初始化


In [15]:
# 初始化Embedding模型
# embedding_model = OpenAIEmbeddings(
#     model="text-embedding-3-small"
# )
from langchain.agents import create_agent
import os
from langchain_openai import ChatOpenAI
# 其他供应商
embedding_model = OpenAIEmbeddings(
    base_url=os.getenv("OPENAI_API_BASE"),
    api_key=os.getenv("OPENAI_API_KEY"),
    model="Qwen/Qwen3-Embedding-4B"
)

# 测试向量化
test_vector = embedding_model.embed_query("深度学习")
print(f"向量维度：{len(test_vector)}")
print(f"向量示例（前5维）：{test_vector[:5]}")

向量维度：2560
向量示例（前5维）：[-0.0004277588741388172, -0.03691500797867775, 0.020067891106009483, 0.031092843040823936, -0.0027407535817474127]


## 4. 构建向量数据库

In [17]:
# 创建Chroma向量数据库
vector_db = Chroma.from_documents(
    documents=text_chunks,
    embedding=embedding_model,
    persist_directory="./files/vector_storage"
)

print(f"向量数据库已创建，包含 {vector_db._collection.count()} 条记录")

向量数据库已创建，包含 2340 条记录


## 5. 相似度检索

使用余弦相似度找到与查询最相关的文档块。

In [21]:
# 执行相似度检索
query_text = "what is data intensive？"

retrieved_docs = vector_db.similarity_search(
    query_text,
    k=3
)

print(f"查询：{query_text}\n")
print(f"检索到 {len(retrieved_docs)} 个相关文档：\n")

for idx, doc in enumerate(retrieved_docs, 1):
    print(f"--- 结果 {idx} ---")
    print(f"来源：第 {doc.metadata.get('page', 'N/A')} 页")
    print(f"内容：{doc.page_content[:150]}...\n")

查询：what is data intensive？

检索到 3 个相关文档：

--- 结果 1 ---
来源：第 512 页
内容：usefulness of a technology. The range of different things you might want to do with
data is dizzyingly wide. What one person considers to be an obscur...

--- 结果 2 ---
来源：第 16 页
内容：the job, and different technologies each have their own strengths and weaknesses. As
we shall see, relational databases are important but not the fina...

--- 结果 3 ---
来源：第 276 页
内容：current date and time, for example, it must do so through special deterministic APIs. 
Partitioning
Executing all transactions serially makes concurre...



## 6. 带相似度评分的检索

In [22]:
# 获取带评分的检索结果
docs_with_scores = vector_db.similarity_search_with_score(
    query_text,
    k=3
)

print("检索结果及相似度评分：\n")
for idx, (doc, score) in enumerate(docs_with_scores, 1):
    print(f"结果 {idx} - 距离分数：{score:.4f}")
    print(f"内容：{doc.page_content[:100]}...\n")

检索结果及相似度评分：

结果 1 - 距离分数：0.8561
内容：usefulness of a technology. The range of different things you might want to do with
data is dizzying...

结果 2 - 距离分数：0.8584
内容：the job, and different technologies each have their own strengths and weaknesses. As
we shall see, r...

结果 3 - 距离分数：0.8616
内容：current date and time, for example, it must do so through special deterministic APIs. 
Partitioning
...



## 7. 使用检索器接口

In [24]:
# 创建检索器
retriever = vector_db.as_retriever(
    search_type="similarity",
    search_kwargs={"k": 3}
)

# 使用检索器
results = retriever.invoke(query_text)
print(f"检索到 {len(results)} 个文档")

检索到 3 个文档


## 8. 混合检索（语义+关键词）

In [35]:
from typing import List
from langchain_community.retrievers import BM25Retriever

# -------------------------
# 1. 定义自定义混合检索器
# -------------------------
class HybridRetriever:
    """
    支持多个检索器混合 + 权重 + 去重
    """

    def __init__(self, retrievers, weights=None, k=5):
        self.retrievers = retrievers
        self.weights = weights or [1] * len(retrievers)
        self.k = k

    def invoke(self, query: str) -> List:
        scored_docs = {}

        for retriever, weight in zip(self.retrievers, self.weights):
            # 新版 retriever 用 invoke()
            docs = retriever.invoke(query)

            for rank, doc in enumerate(docs):
                # 兼容新版，不一定有 page_content
                content = getattr(doc, "page_content", str(doc))
                score = weight * (1 / (rank + 1))

                if content not in scored_docs:
                    scored_docs[content] = (doc, score)
                else:
                    scored_docs[content] = (doc, scored_docs[content][1] + score)

        # 按分数排序
        sorted_docs = sorted(scored_docs.values(), key=lambda x: x[1], reverse=True)

        # 返回前 k 条文档
        return [doc for doc, _ in sorted_docs[:self.k]]

# -------------------------
# 2. 初始化各个检索器
# -------------------------
# BM25 检索器
keyword_retriever = BM25Retriever.from_documents(text_chunks)
keyword_retriever.k = 3

# 向量检索器
vector_retriever = vector_db.as_retriever(search_kwargs={"k": 3})

# -------------------------
# 3. 创建混合检索器
# -------------------------
hybrid_retriever = HybridRetriever(
    retrievers=[keyword_retriever, vector_retriever],
    weights=[0.4, 0.6],  # BM25 40%，向量 60%
    k=5
)

# -------------------------
# 4. 执行检索
# -------------------------
hybrid_results = hybrid_retriever.invoke(query_text)

print(f"混合检索到 {len(hybrid_results)} 个文档")

for i, doc in enumerate(hybrid_results):
    print(f"Doc {i+1}: {str(doc)[:200]}")  # 取前200字符展示


混合检索到 5 个文档
Doc 1: page_content='usefulness of a technology. The range of different things you might want to do with
data is dizzyingly wide. What one person considers to be an obscure and pointless
feature may well be 
Doc 2: page_content='idea of using multiple differently partitioned stages is similar to what we discussed in
“Multi-partition data processing” on page 514 (see also “Concurrency control” on
page 462). 
Time
Doc 3: page_content='the job, and different technologies each have their own strengths and weaknesses. As
we shall see, relational databases are important but not the final word on dealing with
data.
Scope o
Doc 4: page_content='understanding.
Moreover, data is extracted from users through a one-way process, not a relationship
with true reciprocity, and not a fair value exchange. There is no dialog, no option fo
Doc 5: page_content='current date and time, for example, it must do so through special deterministic APIs. 
Partitioning
Executing all transactions serially

## 9. MMR检索（最大边际相关性）

MMR在保证相关性的同时增加结果多样性。

In [36]:
# 使用MMR检索
mmr_docs = vector_db.max_marginal_relevance_search(
    query_text,
    k=3,
    fetch_k=10  # 先获取10个候选，再筛选3个
)

print("MMR检索结果：\n")
for idx, doc in enumerate(mmr_docs, 1):
    print(f"结果 {idx}：{doc.page_content[:100]}...\n")

MMR检索结果：

结果 1：usefulness of a technology. The range of different things you might want to do with
data is dizzying...

结果 2：that requests from a particular user are always routed to the same datacenter and use
the leader in ...

结果 3：that it helps preserve causality. We have already seen several examples over the
course of this book...



## 10. 持久化与增量更新

In [39]:
# 加载已有数据库
existing_db = Chroma(
    persist_directory="./files/vector_storage",
    embedding_function=embedding_model
)

print(f"加载已有数据库，包含 {existing_db._collection.count()} 条记录")

加载已有数据库，包含 2340 条记录


In [40]:
# 增量添加新文档
new_texts = [
    "Transformer架构是现代NLP的基础",
    "注意力机制能够捕捉长距离依赖关系"
]

existing_db.add_texts(new_texts)
print(f"添加后数据库包含 {existing_db._collection.count()} 条记录")

添加后数据库包含 2342 条记录


## 11. 元数据过滤检索

In [41]:
# 带元数据过滤的检索
filtered_docs = vector_db.similarity_search(
    query_text,
    k=3,
    filter={"page": {"$gte": 5}}  # 只检索第5页及之后的内容
)

print(f"过滤检索到 {len(filtered_docs)} 个文档")

过滤检索到 3 个文档


## 12. 批量处理示例

In [45]:
# 批量向量化大规模文档
def batch_process_documents(chunks, batch_size=50):
    """分批处理文档以避免超时"""
    for i in range(0, 100, batch_size):
        batch = chunks[i:i+batch_size]
        vector_db.add_documents(batch)
        processed = min(i+batch_size, len(chunks))
        print(f"已处理 {processed}/{len(chunks)} 个文档")

# 示例调用
batch_process_documents(text_chunks)

已处理 50/2340 个文档
已处理 100/2340 个文档


## 总结

本notebook演示了：
- 文档向量化的完整流程
- 多种检索策略（相似度、MMR、混合检索）
- 向量数据库的持久化和增量更新

下一步可以将这些组件整合到完整的RAG应用中。