# 使用 Milvus 和 DeepSeek 构建 RAG

DeepSeek 帮助开发者使用高性能语言模型构建和扩展 AI 应用。它提供高效的推理、灵活的 API 以及先进的专家混合 (MoE) 架构，用于强大的推理和检索任务。

在本教程中，我们将展示如何使用 Milvus 和 DeepSeek 构建一个检索增强生成 (RAG) 管道。

## 准备工作

### 依赖与环境

In [None]:
!pip install "pymilvus[model]==2.5.10" openai==1.82.0 requests==2.32.3 tqdm==4.67.1 torch==2.7.0

---

In [2]:
import os

# 从环境变量获取 DeepSeek API Key
api_key = os.getenv("DEEPSEEK_API_KEY")

### 准备数据

我们使用 Milvus 文档 2.4.x 中的 FAQ 页面作为我们 RAG 中的私有知识库，这是一个简单 RAG 管道的良好数据源。

下载 zip 文件并将文档解压到 `milvus_docs` 文件夹。

**建议在命令行执行下面命令**

In [3]:
#!wget https://github.com/milvus-io/milvus-docs/releases/download/v2.4.6-preview/milvus_docs_2.4.x_en.zip
#!unzip -q milvus_docs_2.4.x_en.zip -d milvus_docs

我们从 `milvus_docs/en/faq` 文件夹加载所有 markdown 文件。对于每个文档，我们简单地使用 "# " 来分割文件中的内容，这样可以大致分离出 markdown 文件中每个主要部分的内容。

In [4]:
from glob import glob

text_lines = []

for file_path in glob("milvus_docs/en/faq/*.md", recursive=True):
    with open(file_path, "r") as file:
        file_text = file.read()

    text_lines += file_text.split("# ")

In [4]:
len(text_lines)

72

### 准备 LLM 和 Embedding 模型

DeepSeek 支持 OpenAI 风格的 API，您可以使用相同的 API 进行微小调整来调用 LLM。

In [5]:
from openai import OpenAI

deepseek_client = OpenAI(
    api_key=api_key,
    base_url="https://api.deepseek.com/v1",  # DeepSeek API 的基地址
)

定义一个 embedding 模型，使用 `milvus_model` 来生成文本嵌入。我们以 `DefaultEmbeddingFunction` 模型为例，这是一个预训练的轻量级嵌入模型。

In [None]:
# from pymilvus import model as milvus_model

# embedding_model = milvus_model.DefaultEmbeddingFunction()

from pymilvus import model as milvus_model

# OpenAI国内代理 https://api.apiyi.com/token 
embedding_model = milvus_model.dense.OpenAIEmbeddingFunction(
    model_name='text-embedding-3-large', # Specify the model name
    api_key='sk-XXX', # Provide your OpenAI API key
    base_url='https://api.apiyi.com/v1',
    dimensions=512
)

生成一个测试嵌入并打印其维度和前几个元素。

In [14]:
test_embedding = embedding_model.encode_queries(["This is a test"])[0]
embedding_dim = len(test_embedding)
print(embedding_dim)
print(test_embedding[:10])

512
[-0.02814663  0.00428726 -0.01852599  0.08190062 -0.03156214 -0.05275258
 -0.04885425  0.12481797 -0.0208328   0.03966279]


In [15]:
test_embedding_0 = embedding_model.encode_queries(["That is a test"])[0]
print(test_embedding_0[:10])

[-0.00578664  0.02242682 -0.01892621  0.12811586 -0.01249751 -0.07321841
 -0.00281971  0.08617394 -0.04377401  0.03073668]


## 将数据加载到 Milvus

### 创建 Collection

In [16]:
from pymilvus import MilvusClient

milvus_client = MilvusClient(uri="./milvus_demo.db")

collection_name = "my_rag_collection"

  from pkg_resources import DistributionNotFound, get_distribution


关于 `MilvusClient` 的参数：

*   将 `uri` 设置为本地文件，例如 `./milvus.db`，是最方便的方法，因为它会自动利用 Milvus Lite 将所有数据存储在此文件中。
*   如果您有大规模数据，可以在 Docker 或 Kubernetes 上设置性能更高的 Milvus 服务器。在此设置中，请使用服务器 URI，例如 `http://localhost:19530`，作为您的 `uri`。
*   如果您想使用 Zilliz Cloud（Milvus 的完全托管云服务），请调整 `uri` 和 `token`，它们对应 Zilliz Cloud 中的 Public Endpoint 和 Api key。

检查 collection 是否已存在，如果存在则删除它。

In [17]:
if milvus_client.has_collection(collection_name):
    milvus_client.drop_collection(collection_name)

创建一个具有指定参数的新 collection。

如果我们不指定任何字段信息，Milvus 将自动创建一个默认的 `id` 字段作为主键，以及一个 `vector` 字段来存储向量数据。一个保留的 JSON 字段用于存储非 schema 定义的字段及其值。

`metric_type` (距离度量类型):
     作用：定义如何计算向量之间的相似程度。
     例如：`IP` (内积) - 值越大通常越相似；`L2` (欧氏距离) - 值越小越相似；`COSINE` (余弦相似度) - 通常转换为距离，值越小越相似。
     选择依据：根据你的嵌入模型的特性和期望的相似性定义来选择。

 `consistency_level` (一致性级别):
     作用：定义数据写入后，读取操作能多快看到这些新数据。
     例如：
         `Strong` (强一致性): 总是读到最新数据，可能稍慢。
         `Bounded` (有界过期): 可能读到几秒内旧数据，性能较好 (默认)。
         `Session` (会话一致性): 自己写入的自己能立刻读到。
         `Eventually` (最终一致性): 最终会读到新数据，但没时间保证，性能最好。
     选择依据：在数据实时性要求和系统性能之间做权衡。

简单来说：
 `metric_type`：怎么算相似。
 `consistency_level`：新数据多久能被读到。

In [19]:
milvus_client.create_collection(
    collection_name=collection_name,
    dimension=embedding_dim,
    metric_type="IP",  # 内积距离
    consistency_level="Strong",  # 支持的值为 (`"Strong"`, `"Session"`, `"Bounded"`, `"Eventually"`)。更多详情请参见 https://milvus.io/docs/consistency.md#Consistency-Level。
)

### 插入数据

遍历文本行，创建嵌入，然后将数据插入 Milvus。

这里有一个新字段 `text`，它是在 collection schema 中未定义的字段。它将自动添加到保留的 JSON 动态字段中，该字段在高级别上可以被视为普通字段。

In [42]:
from tqdm import tqdm

data = []

doc_embeddings = embedding_model.encode_documents(text_lines)

for i, line in enumerate(tqdm(text_lines, desc="Creating embeddings")):
    data.append({"id": i, "vector": doc_embeddings[i], "text": line})

milvus_client.insert(collection_name=collection_name, data=data)

Creating embeddings: 100%|██████████| 72/72 [00:00<00:00, 159277.37it/s]


{'insert_count': 72, 'ids': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71], 'cost': 0}

## 构建 RAG

### 检索查询数据

我们指定一个关于 Milvus 的常见问题。

In [31]:
question = "why we should use milvus?"

在 collection 中搜索该问题，并检索语义上最匹配的前3个结果。

In [32]:
search_res = milvus_client.search(
    collection_name=collection_name,
    data=embedding_model.encode_queries(
        [question]
    ),  # 将问题转换为嵌入向量
    limit=3,  # 返回前3个结果
    search_params={"metric_type": "IP", "params": {}},  # 内积距离
    output_fields=["text"],  # 返回 text 字段
)

让我们看一下查询的搜索结果

In [33]:
import json

retrieved_lines_with_distances = [
    (res["entity"]["text"], res["distance"]) for res in search_res[0]
]
print(json.dumps(retrieved_lines_with_distances, indent=4))

[
    [
        "Does Milvus support message engines other than Pulsar?\n\nYes. Kafka is supported in Milvus 2.1.0.\n\n###",
        0.5013245344161987
    ],
    [
        "How does Milvus handle vector data types and precision?\n\nMilvus supports Binary, Float32, Float16, and BFloat16 vector types.\n\n- Binary vectors: Store binary data as sequences of 0s and 1s, used in image processing and information retrieval.\n- Float32 vectors: Default storage with a precision of about 7 decimal digits. Even Float64 values are stored with Float32 precision, leading to potential precision loss upon retrieval.\n- Float16 and BFloat16 vectors: Offer reduced precision and memory usage. Float16 is suitable for applications with limited bandwidth and storage, while BFloat16 balances range and efficiency, commonly used in deep learning to reduce computational requirements without significantly impacting accuracy.\n\n###",
        0.4910174012184143
    ],
    [
        "What data types does Milvus sup

### 使用 LLM 获取 RAG 响应

将检索到的文档转换为字符串格式。

In [34]:
context = "\n".join(
    [line_with_distance[0] for line_with_distance in retrieved_lines_with_distances]
)

In [35]:
context

'Does Milvus support message engines other than Pulsar?\n\nYes. Kafka is supported in Milvus 2.1.0.\n\n###\nHow does Milvus handle vector data types and precision?\n\nMilvus supports Binary, Float32, Float16, and BFloat16 vector types.\n\n- Binary vectors: Store binary data as sequences of 0s and 1s, used in image processing and information retrieval.\n- Float32 vectors: Default storage with a precision of about 7 decimal digits. Even Float64 values are stored with Float32 precision, leading to potential precision loss upon retrieval.\n- Float16 and BFloat16 vectors: Offer reduced precision and memory usage. Float16 is suitable for applications with limited bandwidth and storage, while BFloat16 balances range and efficiency, commonly used in deep learning to reduce computational requirements without significantly impacting accuracy.\n\n###\nWhat data types does Milvus support on the primary key field?\n\nIn current release, Milvus supports both INT64 and string.\n\n###'

In [36]:
question

'why we should use milvus?'

为语言模型定义系统和用户提示。此提示是使用从 Milvus 检索到的文档组装而成的。

In [49]:
SYSTEM_PROMPT = """
Human: 你是一个 AI 助手。你能够从提供的上下文段落片段中找到问题的答案。
"""
USER_PROMPT = f"""
请使用以下用 <context> 标签括起来的信息片段来回答用 <question> 标签括起来的问题。最后追加原始回答的中文翻译，并用 <translated>和</translated> 标签标注。
<context>
{context}
</context>
<question>
{question}
</question>
<translated>
</translated>
"""

In [40]:
USER_PROMPT

'\n请使用以下用 <context> 标签括起来的信息片段来回答用 <question> 标签括起来的问题。最后追加原始回答的中文翻译，并用 <translated>和</translated> 标签标注。\n<context>\nDoes Milvus support message engines other than Pulsar?\n\nYes. Kafka is supported in Milvus 2.1.0.\n\n###\nHow does Milvus handle vector data types and precision?\n\nMilvus supports Binary, Float32, Float16, and BFloat16 vector types.\n\n- Binary vectors: Store binary data as sequences of 0s and 1s, used in image processing and information retrieval.\n- Float32 vectors: Default storage with a precision of about 7 decimal digits. Even Float64 values are stored with Float32 precision, leading to potential precision loss upon retrieval.\n- Float16 and BFloat16 vectors: Offer reduced precision and memory usage. Float16 is suitable for applications with limited bandwidth and storage, while BFloat16 balances range and efficiency, commonly used in deep learning to reduce computational requirements without significantly impacting accuracy.\n\n###\nWhat data types does Milvus sup

使用 DeepSeek 提供的 `deepseek-chat` 模型根据提示生成响应。

In [41]:
response = deepseek_client.chat.completions.create(
    model="deepseek-chat",
    messages=[
        {"role": "system", "content": SYSTEM_PROMPT},
        {"role": "user", "content": USER_PROMPT},
    ],
)
print(response.choices[0].message.content)

Milvus is a powerful vector database that offers several advantages, making it a great choice for handling vector data:

1. **Flexible Message Engine Support**: Milvus supports multiple message engines like Pulsar and Kafka (from version 2.1.0), providing flexibility in data streaming and processing.

2. **Diverse Vector Data Type Support**: It accommodates various vector types, including Binary, Float32, Float16, and BFloat16, catering to different precision and memory requirements. This makes it suitable for applications like image processing, information retrieval, and deep learning.

3. **Efficient Storage and Retrieval**: Milvus optimizes storage and retrieval, even handling Float64 values with Float32 precision (with potential precision loss) to balance performance and accuracy.

4. **Primary Key Flexibility**: It supports both INT64 and string types for primary keys, offering versatility in data organization and indexing.

5. **Scalability and Performance**: Milvus is designed f

In [3]:
import re

# 假设 mfd.md 就在当前目录下
file_path = "mfd.md"
with open(file_path, "r", encoding="utf-8") as file: # 确保使用 utf-8 编码读取中文
    file_text = file.read()

text_lines2 = []

# # 方案一
# # 定义一个正则表达式，用于匹配以“**XXX条**”开头的条文
# # 但是，最简单且适用于你当前文件的策略是直接匹配“**第”和“条**”
# split_pattern = r"\*\*第[零一二三四五六七八九十百千万]+条\*\*|\*\*第\d+条\*\*"

# # 使用re.split进行分割，re.split会保留分隔符，所以我们需要一些后处理
# # re.split(pattern, string, flags=re.MULTILINE) 可以用来处理多行文本
# # re.split 会在匹配到的地方进行分割，并且把匹配到的内容（分隔符）也作为列表的一部分返回
# # 我们需要对返回的列表进行处理，将条文号与条文内容组合
# split_results = re.split(split_pattern, file_text)

# # 获取所有匹配的条文标题，以便后面组合
# article_titles = re.findall(split_pattern, file_text)

# # 重新组合文本，确保每个片段都以条文标题开头
# # 第一个 split_results[0] 是文件开头到第一个条文的内容，通常是空白或者不需要的引言
# # 从第二个元素开始，每个 split_results[i] 是一个条文的内容，
# # 并且 article_titles[i-1] 是对应的条文标题
# parsed_articles = []
# for i in range(len(article_titles)):
#     # 确保条文标题和内容正确拼接
#     # split_results 的长度会比 article_titles 的长度多1
#     # split_results[i+1] 是当前条文标题后的内容
#     article_content = article_titles[i] + split_results[i+1].strip()
#     parsed_articles.append(article_content.strip()) # 移除多余的空白符

# text_lines2 = parsed_articles

# 方案二
text_lines2 += file_text.split("#### ")


# 观察结果
print(len(text_lines2))

# 打印前几个片段看效果
for i, line in enumerate(text_lines2[:5]):
    print(f"--- Segment {i+1} ---")
    print(line)
    print("-" * 20)


27
--- Segment 1 ---
## 中华人民共和国民法典

### （二）物权编


--------------------
--- Segment 2 ---
第一章 一般规定

**第二百零四条** 为了明确物的归属，充分发挥物的效用，保护权利人的合法权益，维护社会经济秩序，制定本编。

**第二百零五条** 本编调整因物的归属和利用产生的民事关系。

**第二百零六条** 国家坚持和完善社会主义公有制为主体、多种所有制经济共同发展的基本经济制度。
国家巩固和发展公有制经济，鼓励、支持和引导非公有制经济的发展。
国家实行社会主义市场经济，保障一切市场主体的平等法律地位和发展权利。

**第二百零七条** 国家、集体、私人的物权和其他权利人的物权受法律平等保护，任何组织或者个人不得侵犯。

**第二百零八条** 不动产权利的设立、变更、转让和消灭，应当依照法律规定登记。动产物权的设立和转让，应当依照法律规定交付。

**第二百零九条** 不动产物权的设立、变更、转让和消灭，经依法登记，发生效力；未经登记，不发生效力，但是法律另有规定的除外。
依法属于国家所有的自然资源，所有权可以不登记。

**第二百一十条** 不动产登记，由不动产所在地的登记机构办理。
国家对不动产实行统一登记制度。统一登记的范围、登记机构和登记办法，由法律、行政法规规定。

**第二百一十一条** 当事人申请登记，应当根据不同登记事项提供材料。
申请登记材料以及登记事项相关信息，可以公开查询。

**第二百一十二条** 登记机构应当履行下列职责：
（一）审查申请人提供的材料；
（二）询问申请人；
（三）如实、及时登记；
（四）法律、行政法规规定的其他职责。
申请登记的不动产存在尚未解决的权属争议的，登记机构应当不予登记，并书面告知申请人。

**第二百一十三条** 登记机构不得有下列行为：
（一）要求对不动产进行评估；
（二）以不动产登记为条件收取其他费用；
（三）超出登记职责范围的其他行为。

**第二百一十四条** 不动产物权的设立、变更、转让和消灭，依照法律规定应当登记的，自记载于不动产登记簿时发生效力。

**第二百一十五条** 不动产登记簿由登记机构管理。
不动产登记簿应当采用纸质形式或者电子形式。
不动产登记簿采用电子形式的，应当备份。

**第二百一十六条** 不动

In [7]:
from pymilvus import model as milvus_model

embedding_model2 = milvus_model.DefaultEmbeddingFunction()

from pymilvus import MilvusClient

milvus_client2 = MilvusClient(uri="./milvus_demo2.db")

collection_name2 = "my_rag_collection2"

if milvus_client2.has_collection(collection_name2):
    milvus_client2.drop_collection(collection_name2)

milvus_client2.create_collection(
    collection_name=collection_name2,
    dimension=768,
    metric_type="IP",  # 内积距离
    consistency_level="Strong",  
)

# 插入数据
from tqdm import tqdm

data2 = []

doc_embeddings2 = embedding_model2.encode_documents(text_lines2)

data2.extend(
    {"id": i, "vector": doc_embeddings2[i], "text": line}
    for i, line in enumerate(tqdm(text_lines2, desc="Creating embeddings2"))
)
milvus_client2.insert(collection_name=collection_name2, data=data2)

question2 = "第三百二十五条内容是？"

search_res2 = milvus_client2.search(
    collection_name=collection_name2,
    data=embedding_model2.encode_queries(
        [question2]
    ),  # 将问题转换为嵌入向量
    limit=4,  # 返回前3个结果
    search_params={"metric_type": "IP", "params": {}},  # 内积距离
    output_fields=["text"],  # 返回 text 字段
)

import json

retrieved_lines_with_distances2 = [
    (res["entity"]["text"], res["distance"]) for res in search_res2[0]
]
print(json.dumps(retrieved_lines_with_distances2, indent=4))

context2 = "\n".join(
    [line_with_distance[0] for line_with_distance in retrieved_lines_with_distances2]
)

print(context2)
print(f"{len(context2)}........................")

SYSTEM_PROMPT2 = """
Human: 你是一个 AI 助手。你能够从提供的上下文段落片段中找到问题的答案。
"""
USER_PROMPT2 = f"""
请使用以下用 <context> 标签括起来的信息片段来回答用 <question> 标签括起来的问题。
<context>
{context2}
</context>
<question>
{question2}
</question>
"""

from openai import OpenAI
import os

deepseek_client2 = OpenAI(
    api_key=os.getenv("DEEPSEEK_API_KEY"),
    base_url="https://api.deepseek.com/v1",  # DeepSeek API 的基地址
)

response2 = deepseek_client2.chat.completions.create(
    model="deepseek-chat",
    messages=[
        {"role": "system", "content": SYSTEM_PROMPT2},
        {"role": "user", "content": USER_PROMPT2},
    ],
)
print(response2.choices[0].message.content)

Creating embeddings2: 100%|██████████| 27/27 [00:00<00:00, 79863.33it/s]


[
    [
        "\u4e8c\u3001\u6743\u5229\u8d28\u6743\n\n**\u7b2c\u56db\u767e\u56db\u5341\u4e5d\u6761** \u53ef\u4ee5\u51fa\u8d28\u7684\u6743\u5229\u5305\u62ec\uff1a\n\uff08\u4e00\uff09\u6c47\u7968\u3001\u672c\u7968\u3001\u652f\u7968\uff1b\n\uff08\u4e8c\uff09\u503a\u5238\u3001\u5b58\u6b3e\u5355\uff1b\n\uff08\u4e09\uff09\u4ed3\u5355\u3001\u63d0\u5355\uff1b\n\uff08\u56db\uff09\u53ef\u4ee5\u8f6c\u8ba9\u7684\u57fa\u91d1\u4efd\u989d\u3001\u80a1\u6743\uff1b\n\uff08\u4e94\uff09\u53ef\u4ee5\u8f6c\u8ba9\u7684\u6ce8\u518c\u5546\u6807\u4e13\u7528\u6743\u3001\u4e13\u5229\u6743\u3001\u8457\u4f5c\u6743\u7b49\u77e5\u8bc6\u4ea7\u6743\u4e2d\u7684\u8d22\u4ea7\u6743\uff1b\n\uff08\u516d\uff09\u5e94\u6536\u8d26\u6b3e\uff1b\n\uff08\u4e03\uff09\u6cd5\u5f8b\u3001\u884c\u653f\u6cd5\u89c4\u89c4\u5b9a\u53ef\u4ee5\u51fa\u8d28\u7684\u5176\u4ed6\u8d22\u4ea7\u6743\u5229\u3002\n\n**\u7b2c\u56db\u767e\u4e94\u5341\u6761** \u4ee5\u6c47\u7968\u3001\u672c\u7968\u3001\u652f\u7968\u3001\u503a\u5238\u3001\u5b58\u6b3e\u5355\u3