# 向量库知识智能体 (DB Agent RAG)

---

本教程演示如何将 `files` 目录下的 txt 文件按空行分段，写入 Chroma 内存向量库，并通过 ReAct Agent 检索回答。

### 数据准备提示
1. 请在当前目录下创建一个 `files` 文件夹。
2. 在其中放入 `.txt` 文件。
3. **注意**：若是 Excel 数据，需额外写脚本将列表头转为以下格式（空行分割），问答效果更准：

```text
问题：<问题内容>
答案：<答案内容>

（此处为空行）
```

## 1. 导入依赖与加载环境

确保目录下存在 `.env` 文件，包含 `DASHSCOPE_API_KEY` 和 `DASHSCOPE_BASE_URL`。

In [1]:
import os
import re
from pathlib import Path
from typing import Iterable

from dotenv import load_dotenv
from openai import OpenAI
from langchain_openai import ChatOpenAI
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_chroma import Chroma
import chromadb
from chromadb.config import Settings
from langchain.agents import create_agent
from langchain.tools import tool

# 加载模型配置
_ = load_dotenv()

## 2. 配置大模型与客户端

In [2]:
# 配置大模型 (LangChain)
llm = ChatOpenAI(
    api_key=os.getenv("DASHSCOPE_API_KEY"),
    base_url=os.getenv("DASHSCOPE_BASE_URL"),
    model="qwen3-coder-plus",
    temperature=0,
)

# 创建 OpenAI 客户端 (用于 Embeddings)
client = OpenAI(
    api_key=os.getenv("DASHSCOPE_API_KEY"),
    base_url=os.getenv("DASHSCOPE_BASE_URL"),
)

## 3. 定义 DashScope Embeddings

自定义 Embeddings 类以适配 LangChain 接口。

In [3]:
class DashScopeEmbeddings(Embeddings):
    """DashScope 兼容的 Embeddings 封装。"""

    def __init__(self, model: str = "text-embedding-v4", dimensions: int = 1024):
        self.model = model
        self.dimensions = dimensions

    def embed_documents(self, texts: list[str]) -> list[list[float]]:
        vectors: list[list[float]] = []
        for i in range(0, len(texts), 10):
            chunk = texts[i : i + 10]
            response = client.embeddings.create(
                model=self.model,
                input=chunk,
                dimensions=self.dimensions,
            )
            vectors.extend([item.embedding for item in response.data])
        return vectors

    def embed_query(self, text: str) -> list[float]:
        response = client.embeddings.create(
            model=self.model,
            input=[text],
            dimensions=self.dimensions,
        )
        return response.data[0].embedding

## 4. 文档加载逻辑

读取目录下的 txt 文件，并使用**空行**进行切分。

In [4]:
def load_txt_documents(data_dir: Path) -> list[Document]:
    """读取目录下的 txt 文件并按空行分割为 Document。"""

    def split_on_blank(text: str) -> Iterable[str]:
        for block in re.split(r"\n\s*\n", text):
            cleaned = block.strip()
            if cleaned:
                yield cleaned

    documents: list[Document] = []
    # 检查目录是否存在
    if not data_dir.exists():
        print(f"Warning: 目录 {data_dir} 不存在")
        return []

    for path in sorted(data_dir.glob("*.txt")):
        content = path.read_text(encoding="utf-8")
        for idx, part in enumerate(split_on_blank(content)):
            documents.append(
                Document(
                    page_content=part,
                    metadata={"source": path.name, "chunk_id": idx},
                )
            )
    if not documents:
        print(f"目录 {data_dir} 下未找到 txt 文档")
    return documents

## 5. 构建向量数据库

连接到 Chroma 服务并写入数据。

> **注意**：此处配置连接到远程 Chroma 服务器 (`120.24.168.78:7020`)。如果需要本地运行，请注释掉 `client_settings` 参数。

In [5]:
def build_vector_store(data_dir: Path | None = None) -> Chroma:
    """读取 txt 文件并构建内存向量库。"""
    # 在 Notebook 中，默认指向当前目录下的 files
    target_dir = data_dir or (Path.cwd() / "files")
    documents = load_txt_documents(target_dir)

    print(f"成功加载 {len(documents)} 个文档到向量库")

    embeddings = DashScopeEmbeddings()
    
    # 配置 Chroma 设置
    chroma_settings = Settings(
        chroma_server_host="120.24.168.78",
        chroma_server_http_port=7020
    )
    
    vector_store = Chroma(
        collection_name="test_collection",
        embedding_function=embeddings,
        persist_directory=None,
        client_settings=chroma_settings
    )
    
    # 清空集合（可选，防止重复数据堆积）
    try:
        existing_ids = vector_store.get()["ids"]
        if existing_ids:
            vector_store.delete(ids=existing_ids)
    except Exception as e:
        print(f"连接或清理数据库时提示: {e}")
    
    # 添加文档
    if documents:
        _ = vector_store.add_documents(documents)
    
    return vector_store

## 6. 创建 ReAct Agent

定义检索工具并初始化 Agent。

In [6]:
def create_react_agent_wrapper(vector_store: Chroma):
    """基于给定向量库创建带检索工具的 ReAct Agent。"""

    @tool(response_format="content_and_artifact")
    def retrieve_context(query: str):
        """基于向量库检索与问题最相关的文本片段。"""
        retrieved = vector_store.similarity_search(query, k=3)
        serialized = "\n\n".join(
            f"[{doc.metadata['source']}#{doc.metadata['chunk_id']}] {doc.page_content}"
            for doc in retrieved
        )
        return serialized, retrieved

    return create_agent(
        llm,
        tools=[retrieve_context],
        system_prompt=(
            "你可以使用检索工具获得参考资料。回答时结合检索到的内容，"
            "如有必要可以在答案中简单引用来源标识。"
        ),
    )

## 7. 运行演示

执行以下代码块开始提问。

In [7]:
# 1. 准备并构建向量库
vector_store = build_vector_store()
print('嵌入完成' + '\n')

# 2. 创建 Agent
agent = create_react_agent_wrapper(vector_store)

# 3. 提问
query = "公司的考勤方式是什么？"
print(f"Query: {query}\n" + "-" * 20)

input_payload = {"messages": [{"role": "user", "content": query}]}

for event in agent.stream(input_payload, stream_mode="values"):
    event["messages"][-1].pretty_print()

成功加载 8 个文档到向量库
嵌入完成

Query: 公司的考勤方式是什么？
--------------------

公司的考勤方式是什么？

为了找到公司的考勤方式，我将采取以下步骤进行搜索：

1. 首先确定公司是否有明确的考勤制度及相关规定。
2. 然后查找具体的考勤方式，例如打卡、签到等。

现在我开始第一步，通过检索来确定公司的考勤制度和相关规定。
Tool Calls:
  retrieve_context (call_9f3fed5b0751468f8a2ffd1f)
 Call ID: call_9f3fed5b0751468f8a2ffd1f
  Args:
    query: 公司的考勤制度和相关规定
Name: retrieve_context

[question.txt#0] 问题:公司的考勤方式是什么？
答案:公司实行弹性打卡。
权限: IT组、运维组
关键词:考勤方式

[question.txt#1] 问题:公司的考勤方式是什么？
答案:公司实行固定世界打卡。
权限: 运营组
关键词:考勤方式

[question.txt#5] 问题:休息不固定的人员考勤打卡有什么要求？
答案:休息不固定的人员在当月休息天数在正常规定休息天数范围内，不需提交调休或请假申请，上班照常打2次卡（上/下班各一次）。
权限:
关键词:弹性休息、考勤打卡要求

公司的考勤方式是弹性打卡。
