In [None]:
# 目标 构建语义搜索引擎
# 1.文档和文档加载
# 2.文本拆分器使用
# 3.嵌入模型使用
# 4.向量存储和召回（就是根据语义检索相似文档块）

In [1]:
# 依赖安装
%uv add langchain-community pypdf

Note: you may need to restart the kernel to use updated packages.


[2mResolved [1m142 packages[0m [2min 1.38s[0m[0m
[36m[1mDownloading[0m[39m numpy [2m(12.3MiB)[0m
[36m[1mDownloading[0m[39m langchain-community [2m(2.4MiB)[0m
 [32m[1mDownloaded[0m[39m langchain-community
 [32m[1mDownloaded[0m[39m numpy
[2mPrepared [1m17 packages[0m [2min 14.16s[0m[0m
         If the cache and target directories are on different filesystems, hardlinking may not be supported.
[2mInstalled [1m17 packages[0m [2min 3.07s[0m[0m
 [32m+[39m [1maiohappyeyeballs[0m[2m==2.6.1[0m
 [32m+[39m [1maiohttp[0m[2m==3.11.14[0m
 [32m+[39m [1maiosignal[0m[2m==1.3.2[0m
 [32m+[39m [1mdataclasses-json[0m[2m==0.6.7[0m
 [32m+[39m [1mfrozenlist[0m[2m==1.5.0[0m
 [32m+[39m [1mhttpx-sse[0m[2m==0.4.0[0m
 [32m+[39m [1mlangchain-community[0m[2m==0.3.20[0m
 [32m+[39m [1mmarshmallow[0m[2m==3.26.1[0m
 [32m+[39m [1mmultidict[0m[2m==6.2.0[0m
 [32m+[39m [1mmypy-extensions[0m[2m==1.0.0[0m
 [32m+[39m [1mnumpy[

In [None]:
# 相关环境变量设置
import sys 
sys.path.append("..") 
from config import config_loader

config_loader.load_env()

In [3]:
# 文档和文档加载器
# 文档（Document）是 langchain 的一个抽象概念，代表文本单元和相关元数据，有三个属性
# page_content 文档内容字符串
# metadata 包含任意元数据的字典，比如说获取相关文档来源，与其他文档的关系及其它额外信息
# id 文档的字符串标识符
# 一般单个 Document 对象代表较大文档的一部分
# 文档使用示例
from langchain_core.documents import Document

documents = [
    Document(
        page_content="Dogs are great companions, known for their loyalty and friendliness.",
        metadata={"source": "mammal-pets-doc"},
    ),
    Document(
        page_content="Cats are independent pets that often enjoy their own space.",
        metadata={"source": "mammal-pets-doc"},
    ),
]
documents

[Document(metadata={'source': 'mammal-pets-doc'}, page_content='Dogs are great companions, known for their loyalty and friendliness.'),
 Document(metadata={'source': 'mammal-pets-doc'}, page_content='Cats are independent pets that often enjoy their own space.')]

In [4]:
# 加载pdf文件为 Document 对象
# 使用基于 pypdf 的 pdf 加载器 pdf一个页面会加载成为一个 Document 对象
from langchain_community.document_loaders import PyPDFLoader

file_path = "../data/Kafka权威指南-22-52.pdf"
loader = PyPDFLoader(file_path)

docs = loader.load()

print(len(docs))
print('='*100)
# document 对象有原始文档字符串和元数据
print(f"{docs[0].page_content[:200]}\n")
print(docs[0].metadata)

31
第1 章
初识Kafka
数据为企业的发展提供动力。我们从数据中获取信息，对它们进行分析处理，然后生成更
多的数据。每个应用程序都会产生数据，包括日志消息、度量指标、用户活动记录、响应
消息等。数据的点点滴滴都在暗示一些重要的事情，比如下一步行动的方向。我们把数据
从源头移动到可以对它们进行分析处理的地方，然后把得到的结果应用到实际场景中，这
样才能够确切地知道这些数据要告诉我们什么。例如，我们每

{'producer': 'iLovePDF', 'creator': 'PyPDF', 'creationdate': '', 'moddate': '2025-03-22T07:33:14+00:00', 'source': '../data/Kafka权威指南-22-52.pdf', 'total_pages': 31, 'page': 0, 'page_label': '1'}


In [5]:
# 对于信息检索来说，一个页面一个 Document 太粗略了，所以需要进一步细致的拆分
# 这里就用上了文本分割器 RecursiveCharacterTextSplitter，这个分割器是使用常用分隔符（比如说换行符）递归分割文档，直到每个块的大小合适
# 这也是针对一般文本用例推荐的分割器
# 我们将文档分割成1000个字符的块，块之间有200个字符的重叠，重叠有助于减轻将语句与与其相关的重要上下文分离的问题
# 我们设置 add_start_index=True 每个分割文档在初始文档中开始的字符索引作为元数据属性 “start_index” 保存。
from langchain_text_splitters import RecursiveCharacterTextSplitter

text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000, chunk_overlap=200, add_start_index=True
)
# 基于上面的文档集合 再次通过文本拆分成更多的小文档块 返回一个 List[Document]
all_splits = text_splitter.split_documents(docs)

print(type(all_splits[0]))
print(len(all_splits))
print("="*100)
print(f'第一个chunk：{all_splits[0]}')

<class 'langchain_core.documents.base.Document'>
50
第一个chunk：page_content='第1 章
初识Kafka
数据为企业的发展提供动力。我们从数据中获取信息，对它们进行分析处理，然后生成更
多的数据。每个应用程序都会产生数据，包括日志消息、度量指标、用户活动记录、响应
消息等。数据的点点滴滴都在暗示一些重要的事情，比如下一步行动的方向。我们把数据
从源头移动到可以对它们进行分析处理的地方，然后把得到的结果应用到实际场景中，这
样才能够确切地知道这些数据要告诉我们什么。例如，我们每天在Amazon 网站上浏览感
兴趣的商品，浏览信息被转化成商品推荐，并在稍后展示给我们。
这个过程完成得越快，组织的反应就越敏捷。花费越少的精力在数据移动上，就越能专注
于核心业务。这就是为什么在一个以数据为驱动的企业里，数据管道会成为关键性组件。
如何移动数据，几乎变得与数据本身一样重要。
每一次科学家们发生分歧，都是因为掌握的数据不够充分。所以我们可以先就获
取哪一类数据达成一致。只要获取了数据，问题也就迎刃而解了。要么我是对
的，要么你是对的，要么我们都是错的。然后我们继续研究。
——Neil deGrasse Tyson
1.1　发布与订阅消息系统
在正式讨论 Apache Kafka（以下简称 Kafka）之前，先来了解发布与订阅消息系统的概念，
并认识这个系统的重要性。数据（消息）的发送者（发布者）不会直接把消息发送给接收
者，这是发布与订阅消息系统的一个特点。发布者以某种方式对消息进行分类，接收者
（订阅者）订阅它们，以便接收特定类型的消息。发布与订阅系统一般会有一个 broker，也
就是发布消息的中心点。
1' metadata={'producer': 'iLovePDF', 'creator': 'PyPDF', 'creationdate': '', 'moddate': '2025-03-22T07:33:14+00:00', 'source': '../data/Kafka权威指南-22-52.pdf', 'total_pages': 31, 'page': 0, 'page_label': '1', 'start_index': 0}


In [6]:
# 嵌入模型使用
from langchain_google_genai import GoogleGenerativeAIEmbeddings

# 初始化
embeddings = GoogleGenerativeAIEmbeddings(model="models/text-embedding-004")

# 使用嵌入模型 将文本块生成向量嵌入 List[float]
vector_1 = embeddings.embed_query(all_splits[0].page_content)
vector_2 = embeddings.embed_query(all_splits[1].page_content)

assert len(vector_1) == len(vector_2)
print(f"生成的向量嵌入长度 {len(vector_1)}\n")
print(vector_1[:10])

生成的向量嵌入长度 768

[0.0535992830991745, 0.04645499959588051, -0.06358546018600464, 0.001913669635541737, 0.02215440571308136, 0.022496016696095467, 0.03912724554538727, -0.012047704309225082, -0.03164248913526535, -0.007910564541816711]


In [None]:
# 向量存储 有多种方式可以内存，也可以用 FAISS 第三方向量库 这里方便起见 用内存
from langchain_core.vectorstores import InMemoryVectorStore

# 初始化向量存储
vector_store = InMemoryVectorStore(embeddings)
# 把文档做索引操作 加到库中
ids = vector_store.add_documents(documents=all_splits)
# 语义检索 返回语义相似的文档对象list 会自动对输入的文本进行嵌入
results = vector_store.similarity_search(
    "Kafka Broker 配置"
)

print(len(results))

print(results)
print(results[0])

# 返回带分数的
results = vector_store.similarity_search_with_score(
    "Kafka Broker 配置"
)
doc, score = results[0]
print(f"Score: {score}\n")
print(doc)

4
[Document(id='05086e2c-e5ae-4250-a9d5-36e3075bddb1', metadata={'producer': 'iLovePDF', 'creator': 'PyPDF', 'creationdate': '', 'moddate': '2025-03-22T07:33:14+00:00', 'source': '../data/Kafka权威指南-22-52.pdf', 'total_pages': 31, 'page': 24, 'page_label': '25', 'start_index': 0}, page_content='安装Kafka   ｜   25\n生产者\n生产者\nKafka集群\n主题A\n主题A\n主题B\n分区0\n分区1\n分区0\n消费者\n图 2-2：一个简单的 Kafka 集群\n2.6.1\u3000需要多少个broker\n一个 Kafka 集群需要多少个 broker 取决于以下几个因素。首先，需要多少磁盘空间来保\n留数据，以及单个 broker 有多少空间 可用。如果整个集群需要保留 10TB 的数据，每个\nbroker 可以存储 2TB，那么至少需要 5 个 broker。如果启用了数据复制，那么至少还需要\n一倍的空间，不过这要取决于配置的复制系数是多少 （将在第 6 章介绍）\n。也就是说，如\n果启用了数据复制，那么这个集群至少需要 10 个 broker。\n第二\n个要考虑的因素是集群处理请求的能力。这通常与网络接口处理客户端流量的能力有\n关，特别是当有多个消费者存在或者在数据保留期间流量发生波动（比如高峰时段的流量\n爆发）时。如果单个 broker 的网络接口在 高峰时段可以达到 80% 的使用量，并且有两个\n消费者，那么消费者就无法保持峰值，除非有两个 broker。如果集群启用了复制功能，则\n要把这个额外的消费者考虑在内。因磁盘吞吐量低和系统内存不足造成的性能问题，也可\n以通过扩展多个 broker 来解决。\n2.6.2\u3000broker 配置\n要把一个 broker 加入到集群里，只需要修改两个配置参数。首先，所有 broker 都必须配\n置相同的 zookeeper.connect，该参数指定了用于保存元数据的 Zookeep

In [None]:
# 根据向量嵌入来查询相似性 和上面是一个意思 只不过这个是根据向量来查询，上面是自动嵌入
query_embedding = embeddings.embed_query("Kafka Broker 配置")
results = vector_store.similarity_search_with_score_by_vector(query_embedding)

print(len(results))
doc, score = results[0]
print(f"Score: {score}\n")
print(doc)

4
Score: 0.7773890246747375

page_content='安装Kafka   ｜   25
生产者
生产者
Kafka集群
主题A
主题A
主题B
分区0
分区1
分区0
消费者
图 2-2：一个简单的 Kafka 集群
2.6.1　需要多少个broker
一个 Kafka 集群需要多少个 broker 取决于以下几个因素。首先，需要多少磁盘空间来保
留数据，以及单个 broker 有多少空间 可用。如果整个集群需要保留 10TB 的数据，每个
broker 可以存储 2TB，那么至少需要 5 个 broker。如果启用了数据复制，那么至少还需要
一倍的空间，不过这要取决于配置的复制系数是多少 （将在第 6 章介绍）
。也就是说，如
果启用了数据复制，那么这个集群至少需要 10 个 broker。
第二
个要考虑的因素是集群处理请求的能力。这通常与网络接口处理客户端流量的能力有
关，特别是当有多个消费者存在或者在数据保留期间流量发生波动（比如高峰时段的流量
爆发）时。如果单个 broker 的网络接口在 高峰时段可以达到 80% 的使用量，并且有两个
消费者，那么消费者就无法保持峰值，除非有两个 broker。如果集群启用了复制功能，则
要把这个额外的消费者考虑在内。因磁盘吞吐量低和系统内存不足造成的性能问题，也可
以通过扩展多个 broker 来解决。
2.6.2　broker 配置
要把一个 broker 加入到集群里，只需要修改两个配置参数。首先，所有 broker 都必须配
置相同的 zookeeper.connect，该参数指定了用于保存元数据的 Zookeeper 群组和路径。
其次，每个 broker 都必须为 broker.id 参数设置唯一的值。如果两个 broker 使用相同的
broker.id，那么第二个 broker 就无法启动。在运行集群时，还可以配置其他一些参数，特
别是那些用于控制数据复制的参数，这些将在后续的章节介绍。' metadata={'producer': 'iLovePDF', 'creator': 'PyPDF', 'creationdate': '', 'moddate': '2025-03-22T07:33:14+00:00', 'source': '../data/Kafka权威指南-2