# 使用 Milvus 和 DeepSeek 构建 RAG

DeepSeek 帮助开发者使用高性能语言模型构建和扩展 AI 应用。它提供高效的推理、灵活的 API 以及先进的专家混合 (MoE) 架构，用于强大的推理和检索任务。

在本教程中，我们将展示如何使用 Milvus 和 DeepSeek 构建一个检索增强生成 (RAG) 管道。

## 准备工作

### 依赖与环境

---

In [1]:
import os

# 从环境变量获取 DeepSeek API Key
api_key = os.getenv("DEEPSEEK_API_KEY")

In [2]:
api_key

'sk-6a8d2f0e50a14e31bc151d5dca1b6f2b'

### 准备数据

我们使用 Milvus 文档 2.4.x 中的 FAQ 页面作为我们 RAG 中的私有知识库，这是一个简单 RAG 管道的良好数据源。

下载 zip 文件并将文档解压到 `milvus_docs` 文件夹。

**建议在命令行执行下面命令**

In [3]:
import re
import os
import time
from pprint import pprint
from pymilvus import MilvusClient, model as milvus_model
# --- 配置 ---
# Milvus Lite (本地文件数据库) 配置
MILVUS_DB_PATH = "./milvus_mfd_demo.db"
COLLECTION_NAME = "civil_code_rag_lite"

# 使用 Milvus 内置模型的维度
DIMENSION = 768

  from .autonotebook import tqdm as notebook_tqdm


In [4]:
def parse_civil_code_md(file_path: str) -> list[dict]:
    """
    解析民法典Markdown文件，将其切割成以“法条”为单位的知识块。
    (此函数与之前版本基本相同，但优化了正则表达式)
    """
    with open(file_path, 'r', encoding='utf-8') as f:
        lines = f.readlines()

    chunks = []
    current_part = ""
    current_chapter = ""

    # 优化正则表达式以匹配 '##' 和 '###' 两种级别的标题
    part_regex = re.compile(r'^\s*#{2,3}\s*（.*?）(.*?)\s*$')
    chapter_regex = re.compile(r'^\s*####\s*(.*?)\s*$')
    article_regex = re.compile(r'^\s*\*\*(第.*?条)\*\*\s*(.*)\s*$')

    for line in lines:
        line = line.strip()
        if not line:
            continue

        part_match = part_regex.match(line)
        if part_match:
            current_part = part_match.group(1).strip()
            current_chapter = ""
            continue

        chapter_match = chapter_regex.match(line)
        if chapter_match:
            current_chapter = chapter_match.group(1).strip()
            continue

        article_match = article_regex.match(line)
        if article_match:
            article_number = article_match.group(1).strip()
            article_content = article_match.group(2).strip()

            content_for_embedding = f"{current_part} > {current_chapter} > {article_number} {article_content}"
            
            metadata = {
                'source': file_path,
                'part': current_part,
                'chapter': current_chapter,
                'article_number': article_number,
                'text': article_content
            }

            chunks.append({
                'content': content_for_embedding,
                'metadata': metadata
            })

    return chunks

In [5]:
def setup_and_insert(client: MilvusClient, collection_name: str, chunks: list[dict], embedder):
    """设置Collection并插入数据。"""
    if client.has_collection(collection_name=collection_name):
        print(f"Collection '{collection_name}' 已存在，将删除重建。")
        client.drop_collection(collection_name=collection_name)

    print(f"创建 Collection: '{collection_name}'")
    client.create_collection(
        collection_name=collection_name,
        dimension=DIMENSION,
        metric_type="IP",  # 使用内积作为相似度度量
        consistency_level="Strong",
        primary_field_name="pk",
        vector_field_name="embedding"
    )

    print("准备并插入数据...")
    contents_to_embed = [chunk['content'] for chunk in chunks]
    
    # Milvus 的默认模型可以批量处理文档
    doc_embeddings = embedder.encode_documents(contents_to_embed)

    data_to_insert = []
    for i, chunk in enumerate(chunks):
        data_to_insert.append({
            "pk": i,  # 提供主键
            "embedding": doc_embeddings[i],
            "text": chunk['metadata']['text'],
            "part": chunk['metadata']['part'],
            "chapter": chunk['metadata']['chapter'],
            "article_number": chunk['metadata']['article_number']
        })

    client.insert(collection_name=collection_name, data=data_to_insert)
    print(f"成功插入 {len(data_to_insert)} 条数据。")

def search(client: MilvusClient, collection_name: str, questions: list[str], embedder, top_k: int = 3):
    """在Milvus中搜索并打印结果。"""
    for question in questions:
        print("\n" + "="*50)
        print(f"查询: {question}")
        print("="*50)

        # 编码查询问题
        query_embeddings = embedder.encode_queries([question])
        
        results = client.search(
            collection_name=collection_name,
            data=query_embeddings,
            limit=top_k,
            output_fields=["text", "part", "chapter", "article_number"] # 指定需要返回的字段
        )
        
        if not results[0]:
            print("未找到相关结果。")
            continue

        print("查询结果:")
        for rank, hit in enumerate(results[0], 1):
            print(f"  [结果 {rank}] 相似度: {hit['distance']:.4f}")
            print(f"  - 所属编: {hit['entity']['part']}")
            print(f"  - 所属章: {hit['entity']['chapter']}")
            print(f"  - 法条号: {hit['entity']['article_number']}")
            print(f"  - 内  容: {hit['entity']['text']}")
            print("  ---")

In [7]:
# 1. 解析Markdown文件
file_name = 'mfd.md'
knowledge_chunks = parse_civil_code_md(file_name)
print(f"总共生成了 {len(knowledge_chunks)} 个知识块。")

print("\n切割后的知识块示例:")
pprint(knowledge_chunks[:5])

总共生成了 387 个知识块。

切割后的知识块示例:
[{'content': '物权编 > 第一章 一般规定 > 第二百零四条 '
             '为了明确物的归属，充分发挥物的效用，保护权利人的合法权益，维护社会经济秩序，制定本编。',
  'metadata': {'article_number': '第二百零四条',
               'chapter': '第一章 一般规定',
               'part': '物权编',
               'source': 'mfd.md',
               'text': '为了明确物的归属，充分发挥物的效用，保护权利人的合法权益，维护社会经济秩序，制定本编。'}},
 {'content': '物权编 > 第一章 一般规定 > 第二百零五条 本编调整因物的归属和利用产生的民事关系。',
  'metadata': {'article_number': '第二百零五条',
               'chapter': '第一章 一般规定',
               'part': '物权编',
               'source': 'mfd.md',
               'text': '本编调整因物的归属和利用产生的民事关系。'}},
 {'content': '物权编 > 第一章 一般规定 > 第二百零六条 国家坚持和完善社会主义公有制为主体、多种所有制经济共同发展的基本经济制度。',
  'metadata': {'article_number': '第二百零六条',
               'chapter': '第一章 一般规定',
               'part': '物权编',
               'source': 'mfd.md',
               'text': '国家坚持和完善社会主义公有制为主体、多种所有制经济共同发展的基本经济制度。'}},
 {'content': '物权编 > 第一章 一般规定 > 第二百零七条 '
             '国家、集体、私人的物权和其他权利人的物权受法律平等保护，任何组织或者个人不得侵犯。',
  'metadat

In [8]:
# 为保证每次运行都是全新的，先删除旧的数据库文件
if os.path.exists(MILVUS_DB_PATH):
    print(f"删除旧的数据库文件: '{MILVUS_DB_PATH}'")
    os.remove(MILVUS_DB_PATH)

删除旧的数据库文件: './milvus_mfd_demo.db'


### 准备 LLM 和 Embedding 模型

In [9]:
print("--> 步骤 1: 初始化嵌入模型...")
embedding_model = milvus_model.DefaultEmbeddingFunction()
print("--> 嵌入模型初始化成功。")

# 3. 然后初始化Milvus客户端，启动后台服务
print("\n--> 步骤 2: 初始化 MilvusClient...")
milvus_client = MilvusClient(uri=MILVUS_DB_PATH)
print("--> MilvusClient 初始化成功。")

--> 步骤 1: 初始化嵌入模型...
--> 嵌入模型初始化成功。

--> 步骤 2: 初始化 MilvusClient...
--> MilvusClient 初始化成功。


In [10]:
setup_and_insert(milvus_client, COLLECTION_NAME, knowledge_chunks, embedding_model)

创建 Collection: 'civil_code_rag_lite'
准备并插入数据...
成功插入 387 条数据。


In [16]:
def search(client: MilvusClient, collection_name: str, question: str, embedder, top_k: int = 3) -> list:
    """
    在Milvus中为单个问题进行向量搜索，并返回原始结果。
    """
    # 编码查询问题
    query_embeddings = embedder.encode_queries([question])
    
    results = client.search(
        collection_name=collection_name,
        data=query_embeddings,
        limit=top_k,
        output_fields=["text", "part", "chapter", "article_number"] # 指定需要返回的字段
    )
    
    # search v2.x 返回一个包含结果列表的列表
    return results[0] if results and results[0] else []

In [22]:
def rag_get_context(client: MilvusClient, collection_name: str, question: str, embedder):
    """
    执行完整的RAG流程：搜索相关文档，并使用LLM生成答案。
    """
    print("\n" + "="*80)
    print(f"开始处理问题: \"{question}\"")
    print("="*80)

    # 1. 在Milvus中搜索相关法条
    print("\n[步骤 1/3] 正在从Milvus中检索相关法条...")
    search_results = search(client, collection_name, question, embedder)

    if not search_results:
        print("检索完成：未找到与问题相关的法条。")
        return

    # 2. 准备上下文
    print(f"检索完成：找到 {len(search_results)} 条相关法条。正在准备上下文...")
    context_parts = []
    for rank, hit in enumerate(search_results, 1):
        entity = hit['entity']
        context_parts.append(
            f"【来源 {rank}】\n"
            f"相似度: {hit['distance']:.4f} \n"
            f"法条号: {entity['article_number']} ({entity['part']} > {entity['chapter']})\n"
            f"内  容: {entity['text']}"
        )
    context_str = "\n\n---\n\n".join(context_parts)
    
    print("\n--- [检索到的上下文] ---")
    print(context_str)
    print("------------------------\n")

    # # 3. 调用LLM进行综合回答
    # print("[步骤 3/3] 正在调用大语言模型生成最终答案...")
    # final_answer = get_llm_answer(question, context_str)
    
    # print("\n--- [AI助手回答] ---")
    # print(final_answer)
    # print("--------------------\n")
    return context_str

In [24]:
# 4. 构造问题并查询
question = "登记机构不得有下列行为"

context = rag_get_context(milvus_client, COLLECTION_NAME, question, embedding_model)



开始处理问题: "登记机构不得有下列行为"

[步骤 1/3] 正在从Milvus中检索相关法条...
检索完成：找到 3 条相关法条。正在准备上下文...

--- [检索到的上下文] ---
【来源 1】
相似度: 0.7712 
法条号: 第二百一十五条 (物权编 > 第一章 一般规定)
内  容: 不动产登记簿由登记机构管理。

---

【来源 2】
相似度: 0.7712 
法条号: 第二百零六条 (物权编 > 第一章 一般规定)
内  容: 国家坚持和完善社会主义公有制为主体、多种所有制经济共同发展的基本经济制度。

---

【来源 3】
相似度: 0.7712 
法条号: 第二百零五条 (物权编 > 第一章 一般规定)
内  容: 本编调整因物的归属和利用产生的民事关系。
------------------------



In [25]:
SYSTEM_PROMPT = """
Human: 你是一个 AI 助手。你能够从提供的上下文段落片段中找到问题的答案。
"""
USER_PROMPT = f"""
请使用以下用 <context> 标签括起来的信息片段来回答用 <question> 标签括起来的问题。
<context>
{context}
</context>
<question>
{question}
</question>
"""

DeepSeek 支持 OpenAI 风格的 API，您可以使用相同的 API 进行微小调整来调用 LLM。

In [26]:
from openai import OpenAI
deepseek_client = OpenAI(
    api_key=api_key,
    base_url="https://api.deepseek.com"
    # base_url="https://api.deepseek.com/v1",  # DeepSeek API 的基地址
)

In [27]:

response = deepseek_client.chat.completions.create(
    model="deepseek-chat",
    messages=[
        {"role": "system", "content": SYSTEM_PROMPT},
        {"role": "user", "content": USER_PROMPT},
    ],
)
print(response.choices[0].message.content)

根据提供的上下文信息，未包含关于"登记机构不得有下列行为"的具体规定。现有内容仅涉及：
1. 不动产登记簿的管理（第二百一十五条）
2. 国家基本经济制度（第二百零六条）
3. 物权编的调整范围（第二百零五条）

建议查阅其他相关法条以获取关于登记机构禁止行为的明确规定。
