# 使用 Milvus 和 DeepSeek 构建 RAG

DeepSeek 帮助开发者使用高性能语言模型构建和扩展 AI 应用。它提供高效的推理、灵活的 API 以及先进的专家混合 (MoE) 架构，用于强大的推理和检索任务。

在本教程中，我们将展示如何使用 Milvus 和 DeepSeek 构建一个检索增强生成 (RAG) 管道。

## 准备工作

### 依赖与环境

In [1]:
# !pip install "pymilvus[model]==2.5.10" openai==1.82.0 requests==2.32.3 tqdm==4.67.1 torch==2.7.0

---

In [2]:
import os

# 从环境变量获取 DeepSeek API Key
api_key = os.getenv("DEEPSEEK_API_KEY")

### 准备数据

我们使用 Milvus 文档 2.4.x 中的 FAQ 页面作为我们 RAG 中的私有知识库，这是一个简单 RAG 管道的良好数据源。

下载 zip 文件并将文档解压到 `milvus_docs` 文件夹。

**建议在命令行执行下面命令**

In [3]:
# !wget https://github.com/milvus-io/milvus-docs/releases/download/v2.4.6-preview/milvus_docs_2.4.x_en.zip
#!unzip -q milvus_docs_2.4.x_en.zip -d milvus_docs

我们从 `milvus_docs/en/faq` 文件夹加载所有 markdown 文件。对于每个文档，我们简单地使用 "# " 来分割文件中的内容，这样可以大致分离出 markdown 文件中每个主要部分的内容。

In [4]:
# from glob import glob

# text_lines = []

# for file_path in glob("mfd.md", recursive=True):
#     with open(file_path, "r") as file:
#         file_text = file.read()

#     text_lines += file_text.split("# ")

In [5]:
from glob import glob
import re

def smart_chunk_text(text, min_length=100, max_length=1000, overlap=50):
    """
    智能文本切分函数
    
    Args:
        text: 输入文本
        min_length: 最小块长度
        max_length: 最大块长度  
        overlap: 重叠长度
    
    Returns:
        list: 切分后的文本块列表
    """
    chunks = []
    
    # 首先按照原有逻辑用 "# " 分割
    sections = text.split("# ")
    
    for section in sections:
        section = section.strip()
        if not section:
            continue
            
        # 如果段落长度合适，直接使用
        if min_length <= len(section) <= max_length:
            chunks.append(section)
        # 如果段落太短，尝试与下一段合并（这里简化处理）
        elif len(section) < min_length:
            if section:  # 确保不是空字符串
                chunks.append(section)
        # 如果段落太长，进一步切分
        else:
            # 按段落分割（两个换行符）
            paragraphs = re.split(r'\n\s*\n', section)
            current_chunk = ""
            
            for para in paragraphs:
                para = para.strip()
                if not para:
                    continue
                    
                # 如果加上当前段落不会超过最大长度
                if len(current_chunk) + len(para) + 2 <= max_length:
                    if current_chunk:
                        current_chunk += "\n\n" + para
                    else:
                        current_chunk = para
                else:
                    # 保存当前块（如果不为空且长度足够）
                    if current_chunk and len(current_chunk.strip()) >= min_length:
                        chunks.append(current_chunk.strip())
                    
                    # 如果单个段落就超过最大长度，按句子分割
                    if len(para) > max_length:
                        sentences = re.split(r'[。！？.!?]', para)
                        temp_chunk = ""
                        
                        for sentence in sentences:
                            sentence = sentence.strip()
                            if not sentence:
                                continue
                                
                            if len(temp_chunk) + len(sentence) + 1 <= max_length:
                                if temp_chunk:
                                    temp_chunk += "。" + sentence
                                else:
                                    temp_chunk = sentence
                            else:
                                if temp_chunk and len(temp_chunk.strip()) >= min_length:
                                    chunks.append(temp_chunk.strip())
                                temp_chunk = sentence
                        
                        if temp_chunk and len(temp_chunk.strip()) >= min_length:
                            chunks.append(temp_chunk.strip())
                        current_chunk = ""
                    else:
                        current_chunk = para
            
            # 保存最后一个块
            if current_chunk and len(current_chunk.strip()) >= min_length:
                chunks.append(current_chunk.strip())
    
    return chunks

text_lines = []

# for file_path in glob("milvus_docs/en/faq/*.md", recursive=True):
for file_path in glob("mfd.md", recursive=True):
    with open(file_path, "r") as file:
        file_text = file.read()

    # 使用智能切分函数
    text_lines += smart_chunk_text(file_text, min_length=100, max_length=800, overlap=50)

In [6]:
# Print the length of each chunk
print("Length of each text chunk:")
for i, chunk in enumerate(text_lines):
    print(f"Chunk {i}: {len(chunk)} characters") 

Length of each text chunk:
Chunk 0: 1 characters
Chunk 1: 14 characters
Chunk 2: 11 characters
Chunk 3: 766 characters
Chunk 4: 751 characters
Chunk 5: 773 characters
Chunk 6: 701 characters
Chunk 7: 301 characters
Chunk 8: 13 characters
Chunk 9: 743 characters
Chunk 10: 736 characters
Chunk 11: 753 characters
Chunk 12: 775 characters
Chunk 13: 766 characters
Chunk 14: 778 characters
Chunk 15: 170 characters
Chunk 16: 791 characters
Chunk 17: 553 characters
Chunk 18: 767 characters
Chunk 19: 630 characters
Chunk 20: 304 characters
Chunk 21: 14 characters
Chunk 22: 286 characters
Chunk 23: 640 characters
Chunk 24: 746 characters
Chunk 25: 339 characters
Chunk 26: 362 characters
Chunk 27: 758 characters
Chunk 28: 14 characters
Chunk 29: 498 characters
Chunk 30: 759 characters
Chunk 31: 785 characters
Chunk 32: 682 characters
Chunk 33: 13 characters
Chunk 34: 762 characters
Chunk 35: 372 characters
Chunk 36: 643 characters
Chunk 37: 780 characters
Chunk 38: 597 characters
Chunk 39: 11 cha

In [7]:
len(text_lines)

53

In [8]:
print(text_lines[4])

**第二百一十四条** 不动产物权的设立、变更、转让和消灭，依照法律规定应当登记的，自记载于不动产登记簿时发生效力。

**第二百一十五条** 不动产登记簿由登记机构管理。
不动产登记簿应当采用纸质形式或者电子形式。
不动产登记簿采用电子形式的，应当备份。

**第二百一十六条** 不动产登记簿是物权归属和内容的根据。
不动产登记簿记载的事项与不动产权属证书记载的事项不一致的，除有证据证明不动产登记簿确有错误外，以不动产登记簿为准。

**第二百一十七条** 不动产权属证书是权利人享有该不动产物权的证明。不动产权属证书记载的事项，应当与不动产登记簿一致；不一致的，除有证据证明不动产登记簿确有错误外，以不动产登记簿为准。

**第二百一十八条** 权利人、利害关系人可以申请查询、复制不动产登记资料，登记机构应当提供。

**第二百一十九条** 利害关系人可以申请查询不动产登记资料。申请查询的，登记机构应当提供。

**第二百二十条** 权利人、利害关系人认为不动产登记簿记载的事项错误的，可以申请更正登记。不动产登记簿记载的权利人书面同意或者有证据证明登记确有错误的，登记机构应当予以更正。
不动产登记簿记载的权利人不同意更正的，利害关系人可以申请异议登记。登记机构予以异议登记的，申请人在异议登记之日起十五日内不提起诉讼的，异议登记失效。异议登记不当，造成权利人损害的，权利人可以请求损害赔偿。

**第二百二十一条** 当事人签订买卖房屋的协议或者签订其他不动产物权的协议，为保障将来实现物权，可以依照约定向登记机构申请预告登记。预告登记后，未经预告登记的权利人同意，处分该不动产的，不发生物权效力。
预告登记后，债权消灭或者自能够进行不动产登记之日起九十日内未申请登记的，预告登记失效。


### 准备 LLM 和 Embedding 模型

DeepSeek 支持 OpenAI 风格的 API，您可以使用相同的 API 进行微小调整来调用 LLM。

In [9]:
from openai import OpenAI

deepseek_client = OpenAI(
    api_key=api_key,
    base_url="https://api.deepseek.com/v1",  # DeepSeek API 的基地址
)

定义一个 embedding 模型，使用 `milvus_model` 来生成文本嵌入。我们以 `DefaultEmbeddingFunction` 模型为例，这是一个预训练的轻量级嵌入模型。

In [10]:
from pymilvus import model as milvus_model

embedding_model = milvus_model.DefaultEmbeddingFunction()

  from .autonotebook import tqdm as notebook_tqdm


生成一个测试嵌入并打印其维度和前几个元素。

In [11]:
test_embedding = embedding_model.encode_queries(["This is a test"])[0]
embedding_dim = len(test_embedding)
print(embedding_dim)
print(test_embedding[:10])

768
[-0.04836056  0.07163018 -0.01130064 -0.03789344 -0.03320646 -0.01318444
 -0.03041711 -0.02269505 -0.02317867 -0.00426023]


In [12]:
test_embedding_0 = embedding_model.encode_queries(["That is a test"])[0]
print(test_embedding_0[:10])

[-0.0275297   0.06088526  0.00388529 -0.00215193 -0.02774976 -0.01186187
 -0.04020914 -0.06023425 -0.03813157  0.01002724]


## 将数据加载到 Milvus

### 创建 Collection

In [13]:
from pymilvus import MilvusClient

milvus_client = MilvusClient(uri="./milvus_demo.db")

collection_name = "my_rag_collection"

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


关于 `MilvusClient` 的参数：

*   将 `uri` 设置为本地文件，例如 `./milvus.db`，是最方便的方法，因为它会自动利用 Milvus Lite 将所有数据存储在此文件中。
*   如果您有大规模数据，可以在 Docker 或 Kubernetes 上设置性能更高的 Milvus 服务器。在此设置中，请使用服务器 URI，例如 `http://localhost:19530`，作为您的 `uri`。
*   如果您想使用 Zilliz Cloud（Milvus 的完全托管云服务），请调整 `uri` 和 `token`，它们对应 Zilliz Cloud 中的 Public Endpoint 和 Api key。

检查 collection 是否已存在，如果存在则删除它。

In [14]:
if milvus_client.has_collection(collection_name):
    milvus_client.drop_collection(collection_name)

创建一个具有指定参数的新 collection。

如果我们不指定任何字段信息，Milvus 将自动创建一个默认的 `id` 字段作为主键，以及一个 `vector` 字段来存储向量数据。一个保留的 JSON 字段用于存储非 schema 定义的字段及其值。

`metric_type` (距离度量类型):
     作用：定义如何计算向量之间的相似程度。
     例如：`IP` (内积) - 值越大通常越相似；`L2` (欧氏距离) - 值越小越相似；`COSINE` (余弦相似度) - 通常转换为距离，值越小越相似。
     选择依据：根据你的嵌入模型的特性和期望的相似性定义来选择。

 `consistency_level` (一致性级别):
     作用：定义数据写入后，读取操作能多快看到这些新数据。
     例如：
         `Strong` (强一致性): 总是读到最新数据，可能稍慢。
         `Bounded` (有界过期): 可能读到几秒内旧数据，性能较好 (默认)。
         `Session` (会话一致性): 自己写入的自己能立刻读到。
         `Eventually` (最终一致性): 最终会读到新数据，但没时间保证，性能最好。
     选择依据：在数据实时性要求和系统性能之间做权衡。

简单来说：
 `metric_type`：怎么算相似。
 `consistency_level`：新数据多久能被读到。

In [15]:
milvus_client.create_collection(
    collection_name=collection_name,
    dimension=embedding_dim,
    metric_type="IP",  # 内积距离
    consistency_level="Strong",  # 支持的值为 (`"Strong"`, `"Session"`, `"Bounded"`, `"Eventually"`)。更多详情请参见 https://milvus.io/docs/consistency.md#Consistency-Level。
)

### 插入数据

遍历文本行，创建嵌入，然后将数据插入 Milvus。

这里有一个新字段 `text`，它是在 collection schema 中未定义的字段。它将自动添加到保留的 JSON 动态字段中，该字段在高级别上可以被视为普通字段。

In [16]:
from tqdm import tqdm

data = []

doc_embeddings = embedding_model.encode_documents(text_lines)

for i, line in enumerate(tqdm(text_lines, desc="Creating embeddings")):
    data.append({"id": i, "vector": doc_embeddings[i], "text": line})

milvus_client.insert(collection_name=collection_name, data=data)

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
Creating embeddings: 100%|██████████| 53/53 [00:00<00:00, 499546.32it/s]


{'insert_count': 53, 'ids': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52], 'cost': 0}

## 构建 RAG

### 检索查询数据

我们指定一个关于 Milvus 的常见问题。

In [17]:
question = "欠债必须还钱吗?不还怎样？"

在 collection 中搜索该问题，并检索语义上最匹配的前3个结果。

In [18]:
search_res = milvus_client.search(
    collection_name=collection_name,
    data=embedding_model.encode_queries(
        [question]
    ),  # 将问题转换为嵌入向量
    limit=3,  # 返回前3个结果
    search_params={"metric_type": "IP", "params": {}},  # 内积距离
    output_fields=["text"],  # 返回 text 字段
)

让我们看一下查询的搜索结果

In [19]:
import json

retrieved_lines_with_distances = [
    (res["entity"]["text"], res["distance"]) for res in search_res[0]
]
print(json.dumps(retrieved_lines_with_distances, indent=4))

[
    [
        "\u4e8c\u3001\u6743\u5229\u8d28\u6743\n\n**\u7b2c\u56db\u767e\u56db\u5341\u4e5d\u6761** \u53ef\u4ee5\u51fa\u8d28\u7684\u6743\u5229\u5305\u62ec\uff1a\n\uff08\u4e00\uff09\u6c47\u7968\u3001\u672c\u7968\u3001\u652f\u7968\uff1b\n\uff08\u4e8c\uff09\u503a\u5238\u3001\u5b58\u6b3e\u5355\uff1b\n\uff08\u4e09\uff09\u4ed3\u5355\u3001\u63d0\u5355\uff1b\n\uff08\u56db\uff09\u53ef\u4ee5\u8f6c\u8ba9\u7684\u57fa\u91d1\u4efd\u989d\u3001\u80a1\u6743\uff1b\n\uff08\u4e94\uff09\u53ef\u4ee5\u8f6c\u8ba9\u7684\u6ce8\u518c\u5546\u6807\u4e13\u7528\u6743\u3001\u4e13\u5229\u6743\u3001\u8457\u4f5c\u6743\u7b49\u77e5\u8bc6\u4ea7\u6743\u4e2d\u7684\u8d22\u4ea7\u6743\uff1b\n\uff08\u516d\uff09\u5e94\u6536\u8d26\u6b3e\uff1b\n\uff08\u4e03\uff09\u6cd5\u5f8b\u3001\u884c\u653f\u6cd5\u89c4\u89c4\u5b9a\u53ef\u4ee5\u51fa\u8d28\u7684\u5176\u4ed6\u8d22\u4ea7\u6743\u5229\u3002\n\n**\u7b2c\u56db\u767e\u4e94\u5341\u6761** \u4ee5\u6c47\u7968\u3001\u672c\u7968\u3001\u652f\u7968\u3001\u503a\u5238\u3001\u5b58\u6b3e\u5355\u3

### 使用 LLM 获取 RAG 响应

将检索到的文档转换为字符串格式。

In [20]:
context = "\n".join(
    [line_with_distance[0] for line_with_distance in retrieved_lines_with_distances]
)

In [21]:
context

'二、权利质权\n\n**第四百四十九条** 可以出质的权利包括：\n（一）汇票、本票、支票；\n（二）债券、存款单；\n（三）仓单、提单；\n（四）可以转让的基金份额、股权；\n（五）可以转让的注册商标专用权、专利权、著作权等知识产权中的财产权；\n（六）应收账款；\n（七）法律、行政法规规定可以出质的其他财产权利。\n\n**第四百五十条** 以汇票、本票、支票、债券、存款单、仓单、提单出质的，当事人应当订立书面合同。质权自权利凭证交付之日起设立。\n\n**第四百五十一条** 以记名股票出质的，当事人应当订立书面合同。质权自股票交付之日起设立。\n以未上市公司股权出质的，适用公司法有关股权转让的规定。\n\n**第四百五十二条** 以可以转让的基金份额、股权出质的，当事人应当订立书面合同。质权自基金份额、股权登记于证券登记结算机构或者公司章程载明的股权登记簿时设立。\n以未上市公司股权出质的，适用公司法有关股权转让的规定。\n\n**第四百五十三条** 以可以转让的注册商标专用权、专利权、著作权等知识产权中的财产权出质的，当事人应当订立书面合同。质权自权利质押登记于相关部门时设立。\n\n**第四百五十四条** 以应收账款出质的，当事人应当订立书面合同。质权自应收账款质押登记于中国人民银行征信中心时设立。\n\n**第四百五十五条** 以法律、行政法规规定可以出质的其他财产权利出质的，依照法律、行政法规的规定。\n\n**第四百五十六条** 权利质权除适用本节规定外，参照适用本章动产质权的有关规定。\n\n####\n第一章 一般规定\n\n**第四百八十一条** 为了保护合同当事人的合法权益，维护社会经济秩序，促进社会主义现代化建设，制定本编。\n\n**第四百八十二条** 本编调整因合同产生的民事关系。\n\n**第四百八十三条** 合同是民事主体之间设立、变更、终止民事法律关系的协议。\n\n**第四百八十四条** 当事人订立合同，可以采用书面形式、口头形式或者其他形式。\n法律、行政法规规定采用书面形式的，应当采用书面形式。当事人约定采用书面形式的，应当采用书面形式。\n\n**第四百八十五条** 当事人可以参照各类合同的示范文本订立合同。\n\n**第四百八十六条** 合同内容由当事人约定，一般包括下列条款：\n（一）当事人的姓名或者名称和住所；\n

In [22]:
question

'欠债必须还钱吗?不还怎样？'

为语言模型定义系统和用户提示。此提示是使用从 Milvus 检索到的文档组装而成的。

In [23]:
SYSTEM_PROMPT = """
Human: 你是一个 AI 助手。你能够从提供的上下文段落片段中找到问题的答案。
"""
USER_PROMPT = f"""
请使用以下用 <context> 标签括起来的信息片段来回答用 <question> 标签括起来的问题。最后追加原始回答的中文翻译，并用 <translated>和</translated> 标签标注。
<context>
{context}
</context>
<question>
{question}
</question>
<translated>
</translated>
"""

In [24]:
USER_PROMPT

'\n请使用以下用 <context> 标签括起来的信息片段来回答用 <question> 标签括起来的问题。最后追加原始回答的中文翻译，并用 <translated>和</translated> 标签标注。\n<context>\n二、权利质权\n\n**第四百四十九条** 可以出质的权利包括：\n（一）汇票、本票、支票；\n（二）债券、存款单；\n（三）仓单、提单；\n（四）可以转让的基金份额、股权；\n（五）可以转让的注册商标专用权、专利权、著作权等知识产权中的财产权；\n（六）应收账款；\n（七）法律、行政法规规定可以出质的其他财产权利。\n\n**第四百五十条** 以汇票、本票、支票、债券、存款单、仓单、提单出质的，当事人应当订立书面合同。质权自权利凭证交付之日起设立。\n\n**第四百五十一条** 以记名股票出质的，当事人应当订立书面合同。质权自股票交付之日起设立。\n以未上市公司股权出质的，适用公司法有关股权转让的规定。\n\n**第四百五十二条** 以可以转让的基金份额、股权出质的，当事人应当订立书面合同。质权自基金份额、股权登记于证券登记结算机构或者公司章程载明的股权登记簿时设立。\n以未上市公司股权出质的，适用公司法有关股权转让的规定。\n\n**第四百五十三条** 以可以转让的注册商标专用权、专利权、著作权等知识产权中的财产权出质的，当事人应当订立书面合同。质权自权利质押登记于相关部门时设立。\n\n**第四百五十四条** 以应收账款出质的，当事人应当订立书面合同。质权自应收账款质押登记于中国人民银行征信中心时设立。\n\n**第四百五十五条** 以法律、行政法规规定可以出质的其他财产权利出质的，依照法律、行政法规的规定。\n\n**第四百五十六条** 权利质权除适用本节规定外，参照适用本章动产质权的有关规定。\n\n####\n第一章 一般规定\n\n**第四百八十一条** 为了保护合同当事人的合法权益，维护社会经济秩序，促进社会主义现代化建设，制定本编。\n\n**第四百八十二条** 本编调整因合同产生的民事关系。\n\n**第四百八十三条** 合同是民事主体之间设立、变更、终止民事法律关系的协议。\n\n**第四百八十四条** 当事人订立合同，可以采用书面形式、口头形式或者其他形式。\n法律、行政法规规定采用书面形式的，应当采用书面形式。

使用 DeepSeek 提供的 `deepseek-chat` 模型根据提示生成响应。

In [25]:
response = deepseek_client.chat.completions.create(
    model="deepseek-chat",
    messages=[
        {"role": "system", "content": SYSTEM_PROMPT},
        {"role": "user", "content": USER_PROMPT},
    ],
)
print(response.choices[0].message.content)

根据《中华人民共和国民法典》第五百五十条至第五百五十六条规定：

1. 欠债必须还钱是基本原则，债权人有权要求债务人履行债务。

2. 如果债务人不还钱：
- 债权人可以将债权转让给第三人（除非存在不得转让的情形）
- 债权人可以依法向法院起诉要求偿还
- 债权人有权请求损害赔偿
- 债务人转让债务必须经债权人同意
- 新债务人需承担原债务及相关从债务

3. 特殊情形下债务可以免除：
- 债权人自愿免除
- 经协商一致变更合同内容
- 债权已过诉讼时效（但债务人自愿履行的除外）

<translated>
根据《中华人民共和国民法典》第五百五十条至第五百五十六条规定：

1. 欠债还钱是基本原则，债权人有权要求债务人履行债务。

2. 如果债务人不还款：
- 债权人可依法转让债权（除法律规定不可转让情形外）
- 债权人可向法院提起诉讼追讨
- 债权人有权主张损害赔偿
- 债务人转移债务需获债权人同意
- 债务受让人需承担原债务及附属债务

3. 特殊情况下可免除债务：
- 债权人主动免除
- 双方协商变更合同条款
- 超过诉讼时效（但债务人自愿偿还的除外）
</translated>
