## 使用 Milvus 和 Deepseek 构建 RAG

In [3]:
!pip install "pymilvus[model]==2.5.10" openai==1.82.0 requests==2.32.3 tqdm==4.67.1 torch==2.7.0



###  1.准备数据集
使用 Milvus 文档 2.4.x 中的 FAQ 页面作为我们 RAG 中的私有知识库
```bash
wget https://github.com/milvus-io/milvus-docs/releases/download/v2.4.6-preview/milvus_docs_2.4.x_en.zip
unzip -q milvus_docs_2.4.x_en.zip -d milvus_docs
```

### 2.加载数据和分割文档
从 `milvus_docs/en/faq` 文件夹加载所有 markdown 文件。
对于每个文档，简单使用 "# " 来分割文件中的内容，这样可以大致分离出 markdown 文件中每个主要部分的内容。

In [6]:
#glob库是Python标准库中用于文件路径匹配的模块，支持通配符模式查找文件 支持递归查找
from glob import glob
text_line = []
file_regex_path = "../rag_resources/milvus_docs/en/faq/*.md"
for file_path in glob(file_regex_path,recursive=True):
    with open(file_path,"r") as f:
        file_text = f.read()
    text_line+=file_text.split("# ")

In [7]:
import os
deepseek_key = os.getenv("DEEPSEEK_API_KEY")


In [8]:
# 准备 LLM 和 Embeding 模型
from openai import OpenAI

deepseek_api_base_address="https://api.deepseek.com/v1"

deepseek_client = OpenAI(
    api_key=deepseek_key,
    base_url=deepseek_api_base_address
)


In [9]:
# 定义 embedding 模型 ，使用 milvus_model 做文本嵌入(embedding)  默认使用 DefaultEmbeddingfunction 为例， 这是一个预训练的轻量embedding 模型
from pymilvus import model as milvus_model
# 嵌入方法
embedding_model = milvus_model.DefaultEmbeddingFunction()

  from .autonotebook import tqdm as notebook_tqdm


In [63]:
test_question_list = ["This is test question"]
# 结果数组为二维数组  第一维的大小与问题个数有关  如果只有一个问题那么第一维大小只有 1  其中第一维度长度只有 1 ，第二维度为高维向量
embedding_arr = embedding_model.encode_queries(test_question_list)
embedding_vector = embedding_arr[0]
embedding_dim = len(embedding_vector)
# 向量维度(768维)
print(embedding_dim)
# 打印向量前 10 个元素
print(embedding_vector[:10])

768
[-0.06585765  0.09031744 -0.00271127 -0.0488881   0.00344155 -0.01846086
  0.00613829 -0.01427601  0.0023624   0.01040119]


### 加载数据到 milvus 

MilvusClient 参数：
- 如果 uri 设置为本地文件，调用客户端时候 会使用 milvus-lite (milvus 轻量版 ) 将 所有数据存储到文件中
- 生产环境上，可以使用 docker  或者 k8s 搭建 milvus 服务，那url = http://xxxx:19530 作为 uri
- 如果使用云服务，例如 zilliz Cloud (Milvus 托管服务) 调整`uri`和`token` 对应  `public endpoint` 和 `API key`

In [12]:
# 创建collection  collection 等同于关系型数据库表的概念
from pymilvus import MilvusClient
local_save_file = "./milvus_demo.db"
collection_name = "test_rag_collection1"
milvus_client = MilvusClient(uri=local_save_file)
if milvus_client.has_collection(collection_name):
    milvus_client.drop_collection(collection_name)



huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


### 创建一个具有指定参数的 collection
如果不指定任何字段信息，milvus 会自动创建一个 默认的`id` 字段作为主键，以及一个`vector` 字段来存储向量。一个**保留的** `json`字段用于存储非`schema`定义的字段及其值
- `metric_type` :计算向量之间的相似程度的方法
  作用：定义如何计算向量之间的相似程度。
  例如：`IP` (内积) - 值越大通常越相似；`L2` (欧氏距离) - 值越小越相似；`COSINE` (余弦相似度) - 通常转换为距离，值越小越相似。
  选择依据：根据你的嵌入模型的特性和期望的相似性定义来选择。
- `consistency_level`：数据一致性级别
  作用：定义数据写入后，读取操作能多快看到这些新数据。
  例如：
   `Strong` (强一致性): 总是读到最新数据，可能稍慢。
   `Bounded` (有界过期): 可能读到几秒内旧数据，性能较好 (默认)。
   `Session` (会话一致性): 自己写入的自己能立刻读到。
   `Eventually` (最终一致性): 最终会读到新数据，但没时间保证，性能最好。
  选择依据：在数据实时性要求和系统性能之间做权衡。


简单来说：
 `metric_type`：怎么算相似。
 `consistency_level`：新数据多久能被读到。

In [14]:
metric_type="IP"
consistency_level = "Strong"
milvus_client.create_collection(
    collection_name=collection_name,
    #向量维度 与问题个数无关，只有单个向量的维度有关 bert 模型向量维度 768
    dimension=embedding_dim,
    # 计算向量相似度方法
    metric_type=metric_type,
    consistency_level=consistency_level
)


### 插入数据
遍历文本行，创建 embedding，将数据插入到 milvus 中
新添加一个字段`text` 他是在 collection schema 中未定义的字段、它将自动添加到保存的 `json` 动态字段中,该字段在高级别上可以被视为普通字段

In [16]:
# tqdm 为一个可视化的进度条模块
from tqdm import tqdm
# 定义插入到 milvus 中的数据列表
data = []
#调用模型 encode_document 将数据encode 成 vector
# text_line 中为二维数组，对应第一维 为文本块个数，第二维 为每个文本块映射的 768 维向量
doc_embeddings =embedding_model.encode_documents(text_line)
text_line_size =  len(text_line)
print(f"文本块个数为: {text_line_size}")
print(f"二维数组 第一维 大小为 :{len(doc_embeddings)}")
# enumerate() 是 Python 中的一个内置函数，它用于将一个可遍历的数据对象（如列表、元组或字符串）组合为一个索引序列，同时列出数据和数据下标，一般用在 for 循环当中
for index,line in enumerate(tqdm(text_line,desc="create embeddings!")):
    item={"id":index,"vector":doc_embeddings[index],"text":line}
    data.append(item)
milvus_client.insert(collection_name=collection_name,data=data)


文本块个数为: 72
二维数组 第一维 大小为 :72


huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
create embeddings!: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████| 72/72 [00:00<00:00, 236298.82it/s]


{'insert_count': 72, 'ids': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71], 'cost': 0}

# 构建 RAG
## 检索查询数据
指定一个关于 Milvus 的常见问题。

In [18]:
question = "How is data stored in milvus?"

在构建的 milvus 中搜索该问题，并检索语义上最匹配的前三个结果

In [20]:
# 将问题转换为嵌入向量
question_embedding_vector = embedding_model.encode_queries([question])
# 查询参数包含 搜索方式  搜索方式和文档写入 milvus 的方式要保持一致 否则会搜索不精准
search_params={"metric_type":"IP","params":{}}
# milvus 返回的字段
output_fields  = ["text"]
search_result = milvus_client.search(
    collection_name=collection_name,
    # data 指的是 query 的问题 vector
    data=question_embedding_vector,
    limit=3, # 返回前 3 个 结果,
    search_params=search_params,
    output_fields=output_fields
)


In [37]:
import json
# search_result为 返回 search 到的所有结果，结果为二维数组   第一维只有一个元素。

print(len(search_result))

retrieved_lines_with_distince = [
    (res['entity']["text"],res["distance"]) for res in search_result[0]
]
# indent：指定缩进的空格数或字符串，用于结构化显示JSON数据
print(json.dumps(retrieved_lines_with_distince,indent=4))

1
[
    [
        " Where does Milvus store data?\n\nMilvus deals with two types of data, inserted data and metadata. \n\nInserted data, including vector data, scalar data, and collection-specific schema, are stored in persistent storage as incremental log. Milvus supports multiple object storage backends, including [MinIO](https://min.io/), [AWS S3](https://aws.amazon.com/s3/?nc1=h_ls), [Google Cloud Storage](https://cloud.google.com/storage?hl=en#object-storage-for-companies-of-all-sizes) (GCS), [Azure Blob Storage](https://azure.microsoft.com/en-us/products/storage/blobs), [Alibaba Cloud OSS](https://www.alibabacloud.com/product/object-storage-service), and [Tencent Cloud Object Storage](https://www.tencentcloud.com/products/cos) (COS).\n\nMetadata are generated within Milvus. Each Milvus module has its own metadata that are stored in etcd.\n\n###",
        0.6572663187980652
    ],
    [
        "How does Milvus flush data?\n\nMilvus returns success when inserted data are loaded to

milvus 查询返回格式如下： 结果为一个二维数组 第一维只有一个元素
```python
[
    [
        {
            "id": 1024,
            "distance": 0.96,
            "entity": {"query 指定的 且 在milvus中定义的字段":"value"}
        },
        {
            "id": 2056,
            "distance": 0.93,
            "entity": {"title": "深度学习实战", "author": "李四"}
        }
    ]
]
```

In [None]:
### 使用LLM获取RAG响应
将检索到的文档转换为字符串格式

In [39]:
context_str = "\n".join(
    [line[0] for line in retrieved_lines_with_distince]
)
context_str

" Where does Milvus store data?\n\nMilvus deals with two types of data, inserted data and metadata. \n\nInserted data, including vector data, scalar data, and collection-specific schema, are stored in persistent storage as incremental log. Milvus supports multiple object storage backends, including [MinIO](https://min.io/), [AWS S3](https://aws.amazon.com/s3/?nc1=h_ls), [Google Cloud Storage](https://cloud.google.com/storage?hl=en#object-storage-for-companies-of-all-sizes) (GCS), [Azure Blob Storage](https://azure.microsoft.com/en-us/products/storage/blobs), [Alibaba Cloud OSS](https://www.alibabacloud.com/product/object-storage-service), and [Tencent Cloud Object Storage](https://www.tencentcloud.com/products/cos) (COS).\n\nMetadata are generated within Milvus. Each Milvus module has its own metadata that are stored in etcd.\n\n###\nHow does Milvus flush data?\n\nMilvus returns success when inserted data are loaded to the message queue. However, the data are not yet flushed to the dis

In [41]:
question

'How is data stored in milvus?'

### 构建 LLM 的系统角色+ 用户提示词(prompt),  这个提示是从 milvus 检索到的文档组装成的。

In [47]:
SYSTEM_PROMPT="""
Human : 你是一个 AI 助手、你能从提供的上下文段落片段中和你的知识储备中找到问题的答案
"""

USER_PROMPT=f"""
请使用以下用<milvus-search> </milvus-search> 标签括起来的信息片段来回答<question> </question> 标签括起来的问题，最后追加原始回答的中文翻译,并用<translate> 和</translate>进行标记
<milvus-search>{context_str} </milvus-search>
<question>{question}</question>
<translate></translate>
"""

In [49]:
USER_PROMPT

"\n请使用以下用<milvus-search> </milvus-search> 标签括起来的信息片段来回答<question> </question> 标签括起来的问题，最后追加原始回答的中文翻译,并用<translate> 和</translate>进行标记\n<milvus-search> Where does Milvus store data?\n\nMilvus deals with two types of data, inserted data and metadata. \n\nInserted data, including vector data, scalar data, and collection-specific schema, are stored in persistent storage as incremental log. Milvus supports multiple object storage backends, including [MinIO](https://min.io/), [AWS S3](https://aws.amazon.com/s3/?nc1=h_ls), [Google Cloud Storage](https://cloud.google.com/storage?hl=en#object-storage-for-companies-of-all-sizes) (GCS), [Azure Blob Storage](https://azure.microsoft.com/en-us/products/storage/blobs), [Alibaba Cloud OSS](https://www.alibabacloud.com/product/object-storage-service), and [Tencent Cloud Object Storage](https://www.tencentcloud.com/products/cos) (COS).\n\nMetadata are generated within Milvus. Each Milvus module has its own metadata that are stored in etcd.\n\n###\nHow do

使用 deepseek 提供的`deepseek-chat` 模型根据 prompt 生成响应

In [56]:
deepseek_model_type = "deepseek-chat"
messages= [
    {"role":"system","content":SYSTEM_PROMPT},
    {"role":"user","content":USER_PROMPT}
]
# chat 表示聊天模型,completions 表示补全
response = deepseek_client.chat.completions.create(
    model= deepseek_model_type,
    messages=messages
)
print(response.choices[0].message.content)

Milvus stores data in two main categories: inserted data and metadata.

1. **Inserted Data** (including vector data, scalar data, and collection schemas) is stored as incremental logs in persistent storage. Supported storage backends include:
   - MinIO
   - AWS S3
   - Google Cloud Storage (GCS)
   - Azure Blob Storage
   - Alibaba Cloud OSS
   - Tencent Cloud Object Storage (COS)

2. **Metadata** (generated internally by Milvus modules) is stored in etcd.

Data flushing occurs asynchronously: after insertion into the message queue, a `flush()` call forces immediate writing to persistent storage.

<translate>
Milvus以两种主要类型存储数据：插入数据和元数据。

1. **插入数据**（包括向量数据、标量数据和集合模式）以增量日志形式存储在持久化存储中，支持的存储后端包括：
   - MinIO
   - AWS S3
   - 谷歌云存储(GCS)
   - Azure Blob存储
   - 阿里云OSS
   - 腾讯云对象存储(COS)

2. **元数据**（由Milvus模块内部生成）存储在etcd中。

数据刷盘是异步进行的：数据先写入消息队列，调用`flush()`会强制立即写入持久化存储。
</translate>
