# 使用 Milvus 和 DeepSeek 构建 RAG

DeepSeek 帮助开发者使用高性能语言模型构建和扩展 AI 应用。它提供高效的推理、灵活的 API 以及先进的专家混合 (MoE) 架构，用于强大的推理和检索任务。

在本教程中，我们将展示如何使用 Milvus 和 DeepSeek 构建一个检索增强生成 (RAG) 管道。

## 准备工作

### 依赖与环境

In [16]:
!pip install -i https://pypi.tuna.tsinghua.edu.cn/simple "pymilvus[model]==2.5.11" openai==1.82.0 requests==2.32.3 tqdm==4.67.1 torch==2.7.0

Looking in indexes: https://pypi.tuna.tsinghua.edu.cn/simple


---

In [25]:
import os

# 从环境变量获取 DeepSeek API Key
api_key = os.getenv("DEEPSEEK_API_KEY")

### 准备数据

我们使用 Milvus 文档 2.4.x 中的 FAQ 页面作为我们 RAG 中的私有知识库，这是一个简单 RAG 管道的良好数据源。

下载 zip 文件并将文档解压到 `milvus_docs` 文件夹。

**建议在命令行执行下面命令**

我们从 `milvus_docs/en/faq` 文件夹加载所有 markdown 文件。对于每个文档，我们简单地使用 "# " 来分割文件中的内容，这样可以大致分离出 markdown 文件中每个主要部分的内容。

In [11]:
from glob import glob

text_lines = []

for file_path in glob("mfd.md", recursive=True):
    with open(file_path, "r") as file:
        file_text = file.read()

    text_lines += file_text.split("# ")

In [12]:
len(text_lines)

30

### 准备 LLM 和 Embedding 模型

DeepSeek 支持 OpenAI 风格的 API，您可以使用相同的 API 进行微小调整来调用 LLM。

In [26]:
from openai import OpenAI

deepseek_client = OpenAI(
    api_key=api_key,
    base_url="https://api.deepseek.com/v1",  # DeepSeek API 的基地址
)

定义一个 embedding 模型，使用 `milvus_model` 来生成文本嵌入。我们以 `DefaultEmbeddingFunction` 模型为例，这是一个预训练的轻量级嵌入模型。

In [1]:
import os
os.environ["TRANSFORMERS_OFFLINE"] = "1"  # 强制离线

try:
    from sentence_transformers import SentenceTransformer
    
    # 加载本地模型
    model = SentenceTransformer(
        "/home/fiona/data/hf_cache/all-MiniLM-L6-v2",
        device="cpu"
    )
    
    # 测试
    embeddings = model.encode(["测试文本"])
    print(f"嵌入向量维度: {embeddings.shape}")
    
except ModuleNotFoundError:
    print("错误：sentence-transformers库未安装！")
    print("请运行: pip install sentence-transformers")
except Exception as e:
    print(f"加载失败: {str(e)}")
    print("请检查模型路径是否正确")

  from .autonotebook import tqdm as notebook_tqdm


嵌入向量维度: (1, 384)


生成一个测试嵌入并打印其维度和前几个元素。

In [4]:
# 正确使用方法
test_embedding = model.encode(["This is a test"])[0]  # 使用encode而不是encode_queries
embedding_dim = len(test_embedding)
print(f"嵌入维度: {embedding_dim}")
print(f"前10个值: {test_embedding[:10]}")

嵌入维度: 384
前10个值: [ 0.03061248  0.01383147 -0.0208438   0.01632785 -0.01023149 -0.04798422
 -0.0173133   0.03728747  0.04588731  0.03440501]


In [5]:
test_embedding_0 = model.encode(["That is a test"])[0]
print(test_embedding_0[:10])

[ 0.04992693  0.01530657 -0.01478047  0.01978593 -0.00713997 -0.03579964
  0.00823074  0.03300444  0.02352727  0.03255139]


## 将数据加载到 Milvus

### 创建 Collection

In [6]:
from pymilvus import MilvusClient

milvus_client = MilvusClient(uri="./milvus_demo.db")

collection_name = "my_rag_collection"

关于 `MilvusClient` 的参数：

*   将 `uri` 设置为本地文件，例如 `./milvus.db`，是最方便的方法，因为它会自动利用 Milvus Lite 将所有数据存储在此文件中。
*   如果您有大规模数据，可以在 Docker 或 Kubernetes 上设置性能更高的 Milvus 服务器。在此设置中，请使用服务器 URI，例如 `http://localhost:19530`，作为您的 `uri`。
*   如果您想使用 Zilliz Cloud（Milvus 的完全托管云服务），请调整 `uri` 和 `token`，它们对应 Zilliz Cloud 中的 Public Endpoint 和 Api key。

检查 collection 是否已存在，如果存在则删除它。

In [7]:
if milvus_client.has_collection(collection_name):
    milvus_client.drop_collection(collection_name)

创建一个具有指定参数的新 collection。

如果我们不指定任何字段信息，Milvus 将自动创建一个默认的 `id` 字段作为主键，以及一个 `vector` 字段来存储向量数据。一个保留的 JSON 字段用于存储非 schema 定义的字段及其值。

`metric_type` (距离度量类型):
     作用：定义如何计算向量之间的相似程度。
     例如：`IP` (内积) - 值越大通常越相似；`L2` (欧氏距离) - 值越小越相似；`COSINE` (余弦相似度) - 通常转换为距离，值越小越相似。
     选择依据：根据你的嵌入模型的特性和期望的相似性定义来选择。

 `consistency_level` (一致性级别):
     作用：定义数据写入后，读取操作能多快看到这些新数据。
     例如：
         `Strong` (强一致性): 总是读到最新数据，可能稍慢。
         `Bounded` (有界过期): 可能读到几秒内旧数据，性能较好 (默认)。
         `Session` (会话一致性): 自己写入的自己能立刻读到。
         `Eventually` (最终一致性): 最终会读到新数据，但没时间保证，性能最好。
     选择依据：在数据实时性要求和系统性能之间做权衡。

简单来说：
 `metric_type`：怎么算相似。
 `consistency_level`：新数据多久能被读到。

In [8]:
milvus_client.create_collection(
    collection_name=collection_name,
    dimension=embedding_dim,
    metric_type="IP",  # 内积距离
    consistency_level="Strong",  # 支持的值为 (`"Strong"`, `"Session"`, `"Bounded"`, `"Eventually"`)。更多详情请参见 https://milvus.io/docs/consistency.md#Consistency-Level。
)

### 插入数据

遍历文本行，创建嵌入，然后将数据插入 Milvus。

这里有一个新字段 `text`，它是在 collection schema 中未定义的字段。它将自动添加到保留的 JSON 动态字段中，该字段在高级别上可以被视为普通字段。

In [13]:
from tqdm import tqdm

data = []

doc_embeddings = model.encode(text_lines)

for i, line in enumerate(tqdm(text_lines, desc="Creating embeddings")):
    data.append({"id": i, "vector": doc_embeddings[i], "text": line})

milvus_client.insert(collection_name=collection_name, data=data)

Creating embeddings: 100%|████████████████████████████████████████████████████████████████████████████████| 30/30 [00:00<00:00, 255750.24it/s]


{'insert_count': 30, 'ids': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29], 'cost': 0}

## 构建 RAG

### 检索查询数据

我们指定一个关于 Milvus 的常见问题。

In [14]:
question = "科学上网犯法吗"

在 collection 中搜索该问题，并检索语义上最匹配的前3个结果。

In [17]:
search_res = milvus_client.search(
    collection_name=collection_name,
    data=model.encode(
        [question]
    ),  # 将问题转换为嵌入向量
    limit=3,  # 返回前3个结果
    search_params={"metric_type": "IP", "params": {}},  # 内积距离
    output_fields=["text"],  # 返回 text 字段
)

让我们看一下查询的搜索结果

In [18]:
import json

retrieved_lines_with_distances = [
    (res["entity"]["text"], res["distance"]) for res in search_res[0]
]
print(json.dumps(retrieved_lines_with_distances, indent=4))

[
    [
        "\u4e2d\u534e\u4eba\u6c11\u5171\u548c\u56fd\u6c11\u6cd5\u5178\n\n##",
        0.4711516797542572
    ],
    [
        "\u7b2c\u56db\u7ae0 \u62c5\u4fdd\u7269\u6743\n\n####",
        0.42818528413772583
    ],
    [
        "\u7b2c\u4e09\u7ae0 \u7528\u76ca\u7269\u6743\n\n####",
        0.4100675582885742
    ]
]


### 使用 LLM 获取 RAG 响应

将检索到的文档转换为字符串格式。

In [19]:
context = "\n".join(
    [line_with_distance[0] for line_with_distance in retrieved_lines_with_distances]
)

In [20]:
context

'中华人民共和国民法典\n\n##\n第四章 担保物权\n\n####\n第三章 用益物权\n\n####'

In [21]:
question

'科学上网犯法吗'

为语言模型定义系统和用户提示。此提示是使用从 Milvus 检索到的文档组装而成的。

In [22]:
SYSTEM_PROMPT = """
Human: 你是一个 AI 助手。你能够从提供的上下文段落片段中找到问题的答案。
"""
USER_PROMPT = f"""
请使用以下用 <context> 标签括起来的信息片段来回答用 <question> 标签括起来的问题。最后追加原始回答的中文翻译，并用 <translated>和</translated> 标签标注。
<context>
{context}
</context>
<question>
{question}
</question>
<translated>
</translated>
"""

In [23]:
USER_PROMPT

'\n请使用以下用 <context> 标签括起来的信息片段来回答用 <question> 标签括起来的问题。最后追加原始回答的中文翻译，并用 <translated>和</translated> 标签标注。\n<context>\n中华人民共和国民法典\n\n##\n第四章 担保物权\n\n####\n第三章 用益物权\n\n####\n</context>\n<question>\n科学上网犯法吗\n</question>\n<translated>\n</translated>\n'

使用 DeepSeek 提供的 `deepseek-chat` 模型根据提示生成响应。

In [27]:
response = deepseek_client.chat.completions.create(
    model="deepseek-chat",
    messages=[
        {"role": "system", "content": SYSTEM_PROMPT},
        {"role": "user", "content": USER_PROMPT},
    ],
)
print(response.choices[0].message.content)

根据提供的上下文信息，中华人民共和国民法典中关于"担保物权"和"用益物权"的章节内容并未涉及"科学上网"相关法律规定，因此无法从给定材料中得出明确结论。

关于"科学上网"是否违法的问题，需要参考其他法律法规（如《中华人民共和国网络安全法》）。根据中国现行法律，未经批准擅自建立或使用VPN进行国际联网可能构成违法行为。

<translated>
According to the provided context, the chapters on "security rights" and "usufructuary rights" in the Civil Code of the People's Republic of China do not mention regulations related to "scientific internet access", so no clear conclusion can be drawn from the given materials.

Regarding whether "scientific internet access" is illegal, other laws and regulations (such as the Cybersecurity Law of the People's Republic of China) need to be consulted. According to current Chinese laws, unauthorized establishment or use of VPNs for international networking may constitute illegal activity.
</translated>
