# 使用 Milvus 和 DeepSeek 构建 RAG

DeepSeek 帮助开发者使用高性能语言模型构建和扩展 AI 应用。它提供高效的推理、灵活的 API 以及先进的专家混合 (MoE) 架构，用于强大的推理和检索任务。

在本教程中，我们将展示如何使用 Milvus 和 DeepSeek 构建一个检索增强生成 (RAG) 管道。

## 准备工作

### 依赖与环境

In [1]:
!pip install "pymilvus[model]==2.5.10" openai==1.82.0 requests==2.32.3 tqdm==4.67.1 torch==2.7.0

Looking in indexes: http://mirrors.aliyun.com/pypi/simple/



[notice] A new release of pip available: 22.2.2 -> 25.1.1
[notice] To update, run: python.exe -m pip install --upgrade pip


---

In [2]:
import os

# 从环境变量获取 DeepSeek API Key
api_key = os.getenv("DEEPSEEK_API_KEY")

### 准备数据

我们使用 Milvus 文档 2.4.x 中的 FAQ 页面作为我们 RAG 中的私有知识库，这是一个简单 RAG 管道的良好数据源。

下载 zip 文件并将文档解压到 `milvus_docs` 文件夹。

**建议在命令行执行下面命令**

In [3]:
#!wget https://github.com/milvus-io/milvus-docs/releases/download/v2.4.6-preview/milvus_docs_2.4.x_en.zip
#!unzip -q milvus_docs_2.4.x_en.zip -d milvus_docs

我们从 `milvus_docs/en/faq` 文件夹加载所有 markdown 文件。对于每个文档，我们简单地使用 "# " 来分割文件中的内容，这样可以大致分离出 markdown 文件中每个主要部分的内容。

In [10]:
from glob import glob
import re

def split_civil_code(file_path):
    chunks = []
    current_headings = [""] * 3  # 存储三级标题路径
    current_article = []
    
    with open(file_path, "r", encoding='UTF-8') as file:
        for line in file:
            line = line.strip()
            if not line:
                continue
                
            # 检测标题层级
            if line.startswith('## '):
                current_headings[0] = line[3:]
                current_headings[1] = ""
                current_headings[2] = ""
            elif line.startswith('### '):
                current_headings[1] = line[4:]
                current_headings[2] = ""
            elif line.startswith('#### '):
                current_headings[2] = line[5:]
                
            # 检测法律条文（**条号**格式）
            elif re.match(r"^\*\*第[零一二三四五六七八九十百千]+条\*\*", line):
                if current_article:  # 保存上一条文
                    save_chunk(chunks, current_headings, current_article)
                    current_article = []
                current_article.append(line)
                
            # 非条文内容但属于当前条文
            elif current_article:
                current_article.append(line)
    
    # 保存最后一个条文
    if current_article:
        save_chunk(chunks, current_headings, current_article)
        
    return chunks

def save_chunk(chunks, headings, article_lines):
    # 拼接层级标题（过滤空标题）
    context = " > ".join([h for h in headings if h])
    # 拼接条文内容
    article = "\n".join(article_lines)
    
    # 组合完整段落
    if context:
        chunk = f"{context}\n{article}"
    else:
        chunk = article
        
    chunks.append(chunk)

# 处理文件
all_chunks = []
for file_path in glob("mfd.md", recursive=True):
    all_chunks.extend(split_civil_code(file_path))

# 示例输出
print(f"生成{len(all_chunks)}个段落")
for i, chunk in enumerate(all_chunks[:3], 1):
    print(f"\n段落{i}:\n{'-'*30}\n{chunk}\n")

生成387个段落

段落1:
------------------------------
中华人民共和国民法典 > （二）物权编 > 第一章 一般规定
**第二百零四条** 为了明确物的归属，充分发挥物的效用，保护权利人的合法权益，维护社会经济秩序，制定本编。


段落2:
------------------------------
中华人民共和国民法典 > （二）物权编 > 第一章 一般规定
**第二百零五条** 本编调整因物的归属和利用产生的民事关系。


段落3:
------------------------------
中华人民共和国民法典 > （二）物权编 > 第一章 一般规定
**第二百零六条** 国家坚持和完善社会主义公有制为主体、多种所有制经济共同发展的基本经济制度。
国家巩固和发展公有制经济，鼓励、支持和引导非公有制经济的发展。
国家实行社会主义市场经济，保障一切市场主体的平等法律地位和发展权利。



In [11]:
len(all_chunks)

387

In [12]:
print(all_chunks)

['中华人民共和国民法典 > （二）物权编 > 第一章 一般规定\n**第二百零四条** 为了明确物的归属，充分发挥物的效用，保护权利人的合法权益，维护社会经济秩序，制定本编。', '中华人民共和国民法典 > （二）物权编 > 第一章 一般规定\n**第二百零五条** 本编调整因物的归属和利用产生的民事关系。', '中华人民共和国民法典 > （二）物权编 > 第一章 一般规定\n**第二百零六条** 国家坚持和完善社会主义公有制为主体、多种所有制经济共同发展的基本经济制度。\n国家巩固和发展公有制经济，鼓励、支持和引导非公有制经济的发展。\n国家实行社会主义市场经济，保障一切市场主体的平等法律地位和发展权利。', '中华人民共和国民法典 > （二）物权编 > 第一章 一般规定\n**第二百零七条** 国家、集体、私人的物权和其他权利人的物权受法律平等保护，任何组织或者个人不得侵犯。', '中华人民共和国民法典 > （二）物权编 > 第一章 一般规定\n**第二百零八条** 不动产权利的设立、变更、转让和消灭，应当依照法律规定登记。动产物权的设立和转让，应当依照法律规定交付。', '中华人民共和国民法典 > （二）物权编 > 第一章 一般规定\n**第二百零九条** 不动产物权的设立、变更、转让和消灭，经依法登记，发生效力；未经登记，不发生效力，但是法律另有规定的除外。\n依法属于国家所有的自然资源，所有权可以不登记。', '中华人民共和国民法典 > （二）物权编 > 第一章 一般规定\n**第二百一十条** 不动产登记，由不动产所在地的登记机构办理。\n国家对不动产实行统一登记制度。统一登记的范围、登记机构和登记办法，由法律、行政法规规定。', '中华人民共和国民法典 > （二）物权编 > 第一章 一般规定\n**第二百一十一条** 当事人申请登记，应当根据不同登记事项提供材料。\n申请登记材料以及登记事项相关信息，可以公开查询。', '中华人民共和国民法典 > （二）物权编 > 第一章 一般规定\n**第二百一十二条** 登记机构应当履行下列职责：\n（一）审查申请人提供的材料；\n（二）询问申请人；\n（三）如实、及时登记；\n（四）法律、行政法规规定的其他职责。\n申请登记的不动产存在尚未解决的权属争议的，登记机构应当不予登记，并书面告知申请人。', '中华人

### 准备 LLM 和 Embedding 模型

DeepSeek 支持 OpenAI 风格的 API，您可以使用相同的 API 进行微小调整来调用 LLM。

In [13]:
from openai import OpenAI

deepseek_client = OpenAI(
    api_key=api_key,
    base_url="https://api.deepseek.com/v1",  # DeepSeek API 的基地址
)

定义一个 embedding 模型，使用 `milvus_model` 来生成文本嵌入。我们以 `DefaultEmbeddingFunction` 模型为例，这是一个预训练的轻量级嵌入模型。

In [14]:
from pymilvus import model as milvus_model

embedding_model = milvus_model.DefaultEmbeddingFunction()

  from .autonotebook import tqdm as notebook_tqdm


生成一个测试嵌入并打印其维度和前几个元素。

In [15]:
test_embedding = embedding_model.encode_queries(["This is a test"])[0]
embedding_dim = len(test_embedding)
print(embedding_dim)
print(test_embedding[:10])

768
[-0.04836059  0.07163021 -0.01130063 -0.03789341 -0.03320651 -0.01318453
 -0.03041721 -0.02269495 -0.02317858 -0.00426026]


In [16]:
test_embedding_0 = embedding_model.encode_queries(["That is a test"])[0]
print(test_embedding_0[:10])

[-0.02752976  0.0608853   0.00388525 -0.00215193 -0.02774976 -0.0118618
 -0.04020916 -0.06023417 -0.03813156  0.0100272 ]


## 将数据加载到 Milvus

### 创建 Collection

In [17]:
from pymilvus import connections, utility
import pymilvus

def test_milvus_connection_fixed():
    """修复版本兼容性问题的连接测试"""
    
    print(f"📦 PyMilvus版本: {pymilvus.__version__}")
    print("🔍 开始测试Milvus连接...")
    
    try:
        # 先断开可能存在的连接
        try:
            connections.disconnect("default")
            print("🔄 断开旧连接")
        except:
            pass
        
        # 尝试连接
        print("🔗 正在连接Milvus服务器...")
        connections.connect(
            alias="default",
            host="localhost",
            port="19530",
            timeout=10
        )
        
        print("✅ 连接命令执行成功")
        
        # 测试连接是否真正有效 - 尝试执行一个简单操作
        try:
            collections = utility.list_collections()
            print(f"✅ 连接验证成功！当前集合数量: {len(collections)}")
            if collections:
                print(f"📋 现有集合: {collections}")
            return True
            
        except Exception as e:
            print(f"❌ 连接验证失败: {e}")
            return False
            
    except Exception as e:
        print(f"❌ 连接失败: {e}")
        return False

# 测试连接
success = test_milvus_connection_fixed()

if success:
    print("\n🎉 Milvus连接成功！现在可以创建和使用集合了")
    
    # 创建集合的示例代码
    def create_rag_collection():
        """创建RAG集合"""
        try:
            from pymilvus import Collection, FieldSchema, CollectionSchema, DataType
            
            collection_name = "my_rag_collection"
            
            # 检查集合是否已存在
            existing_collections = utility.list_collections()
            if collection_name in existing_collections:
                print(f"✅ 集合 '{collection_name}' 已存在")
                collection = Collection(collection_name)
                print(f"📊 集合信息: {collection.schema}")
                return collection
            
            # 创建新集合
            print(f"🔨 正在创建集合 '{collection_name}'...")
            
            # 定义字段
            fields = [
                FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=False),
                FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=768),
                FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=65535)
            ]
            
            # 创建集合schema
            schema = CollectionSchema(fields, "RAG collection for document search")
            
            # 创建集合
            collection = Collection(collection_name, schema)
            
            print(f"✅ 集合 '{collection_name}' 创建成功")
            print(f"📊 集合schema: {collection.schema}")
            
            return collection
            
        except Exception as e:
            print(f"❌ 创建集合失败: {e}")
            return None
    
    # 创建集合
    collection = create_rag_collection()
    
    if collection:
        print(f"\n🎯 集合 '{collection.name}' 准备就绪，可以开始插入数据！")
        
        # 简单的数据插入示例
        print("\n💡 数据插入示例：")
        print("""
# 示例数据格式
data = [
    {"id": 1, "vector": [0.1] * 768, "text": "文档内容1"},
    {"id": 2, "vector": [0.2] * 768, "text": "文档内容2"}
]

# 插入数据
collection.insert(data)
collection.flush()  # 确保数据持久化

# 创建索引（用于搜索）
index_params = {
    "metric_type": "IP",
    "index_type": "IVF_FLAT",
    "params": {"nlist": 128}
}
collection.create_index("vector", index_params)

# 加载集合到内存
collection.load()
        """)

else:
    print("\n❌ 连接失败，请检查：")
    print("1. Docker容器是否正常运行: docker-compose ps")
    print("2. Milvus日志: docker-compose logs milvus")
    print("3. 端口是否正确监听: netstat -ano | findstr :19530")

📦 PyMilvus版本: 2.5.10
🔍 开始测试Milvus连接...
🔄 断开旧连接
🔗 正在连接Milvus服务器...
✅ 连接命令执行成功
✅ 连接验证成功！当前集合数量: 1
📋 现有集合: ['my_rag_collection']

🎉 Milvus连接成功！现在可以创建和使用集合了
✅ 集合 'my_rag_collection' 已存在
📊 集合信息: {'auto_id': False, 'description': '', 'fields': [{'name': 'id', 'description': '', 'type': <DataType.INT64: 5>, 'is_primary': True, 'auto_id': False}, {'name': 'vector', 'description': '', 'type': <DataType.FLOAT_VECTOR: 101>, 'params': {'dim': 768}}], 'enable_dynamic_field': True}

🎯 集合 'my_rag_collection' 准备就绪，可以开始插入数据！

💡 数据插入示例：

# 示例数据格式
data = [
    {"id": 1, "vector": [0.1] * 768, "text": "文档内容1"},
    {"id": 2, "vector": [0.2] * 768, "text": "文档内容2"}
]

# 插入数据
collection.insert(data)
collection.flush()  # 确保数据持久化

# 创建索引（用于搜索）
index_params = {
    "metric_type": "IP",
    "index_type": "IVF_FLAT",
    "params": {"nlist": 128}
}
collection.create_index("vector", index_params)

# 加载集合到内存
collection.load()
        


In [18]:
# milvus_lite不支持windows，改用docker部署Milvus
# from pymilvus import MilvusClient

# milvus_client = MilvusClient(uri="./milvus_demo.db")

# collection_name = "my_rag_collection"

from pymilvus import MilvusClient

# 修改为连接你的 Docker Milvus 服务
milvus_client = MilvusClient(uri="http://localhost:19530")

collection_name = "my_rag_collection"

关于 `MilvusClient` 的参数：

*   将 `uri` 设置为本地文件，例如 `./milvus.db`，是最方便的方法，因为它会自动利用 Milvus Lite 将所有数据存储在此文件中。
*   如果您有大规模数据，可以在 Docker 或 Kubernetes 上设置性能更高的 Milvus 服务器。在此设置中，请使用服务器 URI，例如 `http://localhost:19530`，作为您的 `uri`。
*   如果您想使用 Zilliz Cloud（Milvus 的完全托管云服务），请调整 `uri` 和 `token`，它们对应 Zilliz Cloud 中的 Public Endpoint 和 Api key。

检查 collection 是否已存在，如果存在则删除它。

In [19]:
if milvus_client.has_collection(collection_name):
    milvus_client.drop_collection(collection_name)

创建一个具有指定参数的新 collection。

如果我们不指定任何字段信息，Milvus 将自动创建一个默认的 `id` 字段作为主键，以及一个 `vector` 字段来存储向量数据。一个保留的 JSON 字段用于存储非 schema 定义的字段及其值。

`metric_type` (距离度量类型):
     作用：定义如何计算向量之间的相似程度。
     例如：`IP` (内积) - 值越大通常越相似；`L2` (欧氏距离) - 值越小越相似；`COSINE` (余弦相似度) - 通常转换为距离，值越小越相似。
     选择依据：根据你的嵌入模型的特性和期望的相似性定义来选择。

 `consistency_level` (一致性级别):
     作用：定义数据写入后，读取操作能多快看到这些新数据。
     例如：
         `Strong` (强一致性): 总是读到最新数据，可能稍慢。
         `Bounded` (有界过期): 可能读到几秒内旧数据，性能较好 (默认)。
         `Session` (会话一致性): 自己写入的自己能立刻读到。
         `Eventually` (最终一致性): 最终会读到新数据，但没时间保证，性能最好。
     选择依据：在数据实时性要求和系统性能之间做权衡。

简单来说：
 `metric_type`：怎么算相似。
 `consistency_level`：新数据多久能被读到。

In [20]:
milvus_client.create_collection(
    collection_name=collection_name,
    dimension=embedding_dim,
    metric_type="IP",  # 内积距离
    consistency_level="Strong",  # 支持的值为 (`"Strong"`, `"Session"`, `"Bounded"`, `"Eventually"`)。更多详情请参见 https://milvus.io/docs/consistency.md#Consistency-Level。
)

### 插入数据

遍历文本行，创建嵌入，然后将数据插入 Milvus。

这里有一个新字段 `text`，它是在 collection schema 中未定义的字段。它将自动添加到保留的 JSON 动态字段中，该字段在高级别上可以被视为普通字段。

In [21]:
from tqdm import tqdm

data = []

doc_embeddings = embedding_model.encode_documents(all_chunks)

for i, line in enumerate(tqdm(all_chunks, desc="Creating embeddings")):
    data.append({"id": i, "vector": doc_embeddings[i], "text": line})

milvus_client.insert(collection_name=collection_name, data=data)

Creating embeddings: 100%|███████████████████████████████████████████████████████| 387/387 [00:00<00:00, 385191.18it/s]


{'insert_count': 387, 'ids': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179, 180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207, 208, 209, 210, 211, 212, 213, 214, 215, 

## 构建 RAG

### 检索查询数据

我们指定一个关于 Milvus 的常见问题。

In [22]:
question = "跟权利人、利害关系人有关的法条有哪些?"

在 collection 中搜索该问题，并检索语义上最匹配的前3个结果。

In [23]:
search_res = milvus_client.search(
    collection_name=collection_name,
    data=embedding_model.encode_queries(
        [question]
    ),  # 将问题转换为嵌入向量
    limit=3,  # 返回前3个结果
    search_params={"metric_type": "IP", "params": {}},  # 内积距离
    output_fields=["text"],  # 返回 text 字段
)

让我们看一下查询的搜索结果

In [24]:
import json

retrieved_lines_with_distances = [
    (res["entity"]["text"], res["distance"]) for res in search_res[0]
]
print(json.dumps(retrieved_lines_with_distances, indent=4))

[
    [
        "\u4e2d\u534e\u4eba\u6c11\u5171\u548c\u56fd\u6c11\u6cd5\u5178 > \uff08\u4e8c\uff09\u7269\u6743\u7f16 > \u7b2c\u4e8c\u7ae0 \u6240\u6709\u6743\n**\u7b2c\u4e09\u767e\u4e00\u5341\u4e8c\u6761** \u5171\u540c\u5171\u6709\u4eba\u5bf9\u5171\u6709\u7684\u4e0d\u52a8\u4ea7\u6216\u8005\u52a8\u4ea7\u5171\u540c\u4eab\u6709\u6240\u6709\u6743\u3002\n\u5171\u540c\u5171\u6709\u4eba\u5bf9\u5171\u6709\u8d22\u4ea7\u4eab\u6709\u5e73\u7b49\u7684\u5360\u6709\u3001\u4f7f\u7528\u3001\u6536\u76ca\u548c\u5904\u5206\u7684\u6743\u5229\u3002",
        0.7064037322998047
    ],
    [
        "\u4e2d\u534e\u4eba\u6c11\u5171\u548c\u56fd\u6c11\u6cd5\u5178 > \uff08\u4e8c\uff09\u7269\u6743\u7f16 > \u7b2c\u4e09\u7ae0 \u7528\u76ca\u7269\u6743\n**\u7b2c\u4e09\u767e\u516d\u5341\u6761** \u571f\u5730\u627f\u5305\u7ecf\u8425\u6743\u4eba\u4f9d\u6cd5\u5bf9\u627f\u5305\u5730\u4eab\u6709\u5360\u6709\u3001\u4f7f\u7528\u3001\u6536\u76ca\u7684\u6743\u5229\u3002\n\u571f\u5730\u627f\u5305\u7ecf\u8425\u6743\u4eba\u53ef\u4ee

### 使用 LLM 获取 RAG 响应

将检索到的文档转换为字符串格式。

In [25]:
context = "\n".join(
    [line_with_distance[0] for line_with_distance in retrieved_lines_with_distances]
)

In [26]:
context

'中华人民共和国民法典 > （二）物权编 > 第二章 所有权\n**第三百一十二条** 共同共有人对共有的不动产或者动产共同享有所有权。\n共同共有人对共有财产享有平等的占有、使用、收益和处分的权利。\n中华人民共和国民法典 > （二）物权编 > 第三章 用益物权\n**第三百六十条** 土地承包经营权人依法对承包地享有占有、使用、收益的权利。\n土地承包经营权人可以采取出租、入股或者其他方式流转土地经营权。\n中华人民共和国民法典 > （二）物权编 > 第二章 所有权\n**第二百八十四条** 业主应当遵守法律、法规以及管理规约。\n管理规约对全体业主具有约束力。'

In [27]:
question

'跟权利人、利害关系人有关的法条有哪些?'

为语言模型定义系统和用户提示。此提示是使用从 Milvus 检索到的文档组装而成的。

In [28]:
SYSTEM_PROMPT = """
Human: 你是一个 AI 助手。你能够从提供的上下文段落片段中找到问题的答案。
"""
USER_PROMPT = f"""
请使用以下用 <context> 标签括起来的信息片段来回答用 <question> 标签括起来的问题。最后追加原始回答的中文翻译，并用 <translated>和</translated> 标签标注。
<context>
{context}
</context>
<question>
{question}
</question>
<translated>
</translated>
"""

In [29]:
USER_PROMPT

'\n请使用以下用 <context> 标签括起来的信息片段来回答用 <question> 标签括起来的问题。最后追加原始回答的中文翻译，并用 <translated>和</translated> 标签标注。\n<context>\n中华人民共和国民法典 > （二）物权编 > 第二章 所有权\n**第三百一十二条** 共同共有人对共有的不动产或者动产共同享有所有权。\n共同共有人对共有财产享有平等的占有、使用、收益和处分的权利。\n中华人民共和国民法典 > （二）物权编 > 第三章 用益物权\n**第三百六十条** 土地承包经营权人依法对承包地享有占有、使用、收益的权利。\n土地承包经营权人可以采取出租、入股或者其他方式流转土地经营权。\n中华人民共和国民法典 > （二）物权编 > 第二章 所有权\n**第二百八十四条** 业主应当遵守法律、法规以及管理规约。\n管理规约对全体业主具有约束力。\n</context>\n<question>\n跟权利人、利害关系人有关的法条有哪些?\n</question>\n<translated>\n</translated>\n'

使用 DeepSeek 提供的 `deepseek-chat` 模型根据提示生成响应。

In [30]:
response = deepseek_client.chat.completions.create(
    model="deepseek-chat",
    messages=[
        {"role": "system", "content": SYSTEM_PROMPT},
        {"role": "user", "content": USER_PROMPT},
    ],
)
print(response.choices[0].message.content)

根据提供的上下文，与权利人、利害关系人相关的法条包括：

1. **第三百一十二条** - 规定了共同共有人对共有财产的权利，包括占有、使用、收益和处分的平等权利。  
2. **第三百六十条** - 规定了土地承包经营权人对承包地的权利，包括占有、使用、收益以及流转土地经营权的权利。  
3. **第二百八十四条** - 规定了业主的法律义务，强调管理规约对全体业主的约束力。  

这些法条分别涉及共有财产的权利人（共同共有人）、土地承包经营权人以及业主的权利和义务。

<translated>
根据提供的上下文，与权利人、利害关系人相关的法条包括：

1. **第三百一十二条** - 规定了共同共有人对共有财产的权利，包括占有、使用、收益和处分的平等权利。  
2. **第三百六十条** - 规定了土地承包经营权人对承包地的权利，包括占有、使用、收益以及流转土地经营权的权利。  
3. **第二百八十四条** - 规定了业主的法律义务，强调管理规约对全体业主的约束力。  

这些法条分别涉及共有财产的权利人（共同共有人）、土地承包经营权人以及业主的权利和义务。
</translated>
