# ONE-DATA-STUDIO RAG Pipeline 示例

本 Notebook 演示如何使用 LangChain 与 ONE-DATA-STUDIO (Bisheng) 构建完整的 RAG (Retrieval-Augmented Generation) 流水线。

## 架构概览

```
文档 → 分块 → 向量化 → Milvus 存储
                              ↓
查询 → 向量检索 → 上下文增强 → LLM 生成 → 回答
```

## 前置条件

1. ONE-DATA-STUDIO 服务已启动
2. Milvus 向量数据库可用
3. 已部署 LLM 和 Embedding 模型

In [None]:
# 安装依赖
# !pip install langchain langchain-core langchain-openai requests tiktoken

## 1. 配置连接

In [None]:
import os
import sys

# 添加父目录到路径
sys.path.insert(0, os.path.dirname(os.getcwd()))

# 配置
BISHENG_API_BASE = os.getenv("BISHENG_API_BASE", "http://localhost:8000")
BISHENG_API_KEY = os.getenv("BISHENG_API_KEY", "")
MODEL_NAME = os.getenv("MODEL_NAME", "qwen-7b-chat")
EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "bge-large-zh")
COLLECTION_NAME = "demo_rag_collection"

print(f"API Base: {BISHENG_API_BASE}")
print(f"LLM Model: {MODEL_NAME}")
print(f"Embedding Model: {EMBEDDING_MODEL}")
print(f"Collection: {COLLECTION_NAME}")

## 2. 初始化组件

In [None]:
from bisheng_llm import BishengLLM
from bisheng_vectorstore import BishengVectorStore

# 创建 LLM
llm = BishengLLM(
    api_base=BISHENG_API_BASE,
    model_name=MODEL_NAME,
    api_key=BISHENG_API_KEY,
    temperature=0.3,  # RAG 场景使用较低温度
    max_tokens=2048,
)

# 创建向量存储
vectorstore = BishengVectorStore(
    api_base=BISHENG_API_BASE,
    collection_name=COLLECTION_NAME,
    api_key=BISHENG_API_KEY,
)

print("Components initialized!")

## 3. 准备示例文档

In [None]:
# 示例文档 - ONE-DATA-STUDIO 介绍
documents = [
    {
        "content": """ONE-DATA-STUDIO 是一个企业级数据 + AI + LLM 融合平台。
它整合了三个核心平台：Alldata（数据治理）、Cube Studio（MLOps）和 Bisheng（LLMOps）。
平台采用四层架构设计，从下到上分别是：基础设施层、数据底座层、算法引擎层和应用编排层。""",
        "metadata": {"source": "overview", "section": "introduction"}
    },
    {
        "content": """Alldata 是数据治理与开发平台，负责数据集成、ETL、数据治理、特征存储和向量存储。
它支持多种数据源接入，包括 MySQL、PostgreSQL、Hive、Kafka 等。
Alldata 的元数据管理功能可以自动发现数据资产，追踪数据血缘关系。""",
        "metadata": {"source": "alldata", "section": "features"}
    },
    {
        "content": """Cube Studio 是云原生 MLOps 平台，提供 Notebook 开发、分布式训练和模型服务能力。
它支持 TensorFlow、PyTorch、Hugging Face 等主流框架。
模型服务采用 vLLM 或 TGI 进行部署，通过 OpenAI 兼容 API 对外暴露。""",
        "metadata": {"source": "cube_studio", "section": "features"}
    },
    {
        "content": """Bisheng 是大模型应用开发平台，专注于 RAG 流水线、Agent 编排和 Prompt 管理。
它提供可视化工作流编辑器，支持拖拽式构建复杂的 AI 应用。
Bisheng 内置了多种节点类型：LLM 调用、向量检索、条件分支、循环、Agent 等。""",
        "metadata": {"source": "bisheng", "section": "features"}
    },
    {
        "content": """Text-to-SQL 是 ONE-DATA-STUDIO 的核心功能之一。
它将 Alldata 的元数据（表结构、字段说明、关系）注入到 Prompt 中，
让 LLM 能够根据用户的自然语言查询生成准确的 SQL 语句。
系统还支持 SQL 解释和结果可视化。""",
        "metadata": {"source": "text2sql", "section": "features"}
    },
    {
        "content": """ONE-DATA-STUDIO 的工作流节点包括：
- 输入/输出节点：定义工作流的入口和出口
- LLM 节点：调用大语言模型生成内容
- 检索节点：从向量数据库检索相关文档
- 条件节点：根据条件进行分支处理
- Agent 节点：自主决策和工具调用
- 并行节点：同时执行多个分支""",
        "metadata": {"source": "workflow", "section": "nodes"}
    },
    {
        "content": """平台支持多种 Agent 工具：
- WebBrowserTool：网页内容抓取
- FileReaderTool：读取 CSV、Excel、JSON 文件
- CodeExecutorTool：安全沙箱中执行 Python 代码
- NotificationTool：发送 Slack、钉钉通知
所有工具都有安全限制，如 URL 白名单、执行超时等。""",
        "metadata": {"source": "agent", "section": "tools"}
    },
    {
        "content": """部署架构采用 Kubernetes 容器编排。
数据库使用 MySQL 主从复制 + ProxySQL 实现读写分离。
缓存使用 Redis Sentinel 模式保证高可用。
向量数据库使用 Milvus，支持十亿级向量检索。
服务通过 Istio 进行流量管理和服务发现。""",
        "metadata": {"source": "deployment", "section": "infrastructure"}
    }
]

print(f"Prepared {len(documents)} documents")

## 4. 文档向量化与存储

In [None]:
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.documents import Document

# 创建文档分割器
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=200,
    chunk_overlap=50,
    separators=["\n\n", "\n", "。", "，", " "]
)

# 转换为 LangChain Document
langchain_docs = []
for doc in documents:
    chunks = text_splitter.split_text(doc["content"])
    for i, chunk in enumerate(chunks):
        langchain_docs.append(Document(
            page_content=chunk,
            metadata={**doc["metadata"], "chunk_id": i}
        ))

print(f"Split into {len(langchain_docs)} chunks")

# 显示前几个 chunk
for i, doc in enumerate(langchain_docs[:3]):
    print(f"\nChunk {i}:")
    print(f"  Content: {doc.page_content[:100]}...")
    print(f"  Metadata: {doc.metadata}")

In [None]:
# 添加文档到向量存储
texts = [doc.page_content for doc in langchain_docs]
metadatas = [doc.metadata for doc in langchain_docs]

ids = vectorstore.add_texts(texts, metadatas)
print(f"Added {len(ids)} documents to vector store")

## 5. 相似性搜索测试

In [None]:
# 测试相似性搜索
query = "什么是 Text-to-SQL?"
results = vectorstore.similarity_search_with_score(query, k=3)

print(f"Query: {query}\n")
for doc, score in results:
    print(f"Score: {score:.4f}")
    print(f"Content: {doc.page_content}")
    print(f"Metadata: {doc.metadata}")
    print("-" * 50)

## 6. 构建 RAG Chain

In [None]:
from langchain.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough

# RAG Prompt 模板
RAG_TEMPLATE = """你是 ONE-DATA-STUDIO 平台的智能助手。请根据以下上下文信息回答用户的问题。

上下文信息：
{context}

用户问题：{question}

请基于上下文信息提供准确、有帮助的回答。如果上下文中没有相关信息，请诚实地说明。

回答："""

prompt = ChatPromptTemplate.from_template(RAG_TEMPLATE)

# 创建检索器
retriever = vectorstore.as_retriever(
    search_type="similarity",
    search_kwargs={"k": 3}
)

# 格式化文档函数
def format_docs(docs):
    return "\n\n".join([doc.page_content for doc in docs])

# 构建 RAG Chain
rag_chain = (
    {"context": retriever | format_docs, "question": RunnablePassthrough()}
    | prompt
    | llm
    | StrOutputParser()
)

print("RAG Chain created!")

## 7. RAG 问答测试

In [None]:
# 测试 RAG 问答
questions = [
    "ONE-DATA-STUDIO 是什么平台？",
    "平台支持哪些 Agent 工具？",
    "如何部署 ONE-DATA-STUDIO？",
    "Bisheng 提供了哪些工作流节点？",
]

for question in questions:
    print(f"\n{'='*60}")
    print(f"Q: {question}")
    print(f"{'='*60}")
    
    answer = rag_chain.invoke(question)
    print(f"\nA: {answer}")

## 8. 带来源的 RAG Chain

In [None]:
from langchain_core.runnables import RunnableParallel

# 带来源的 RAG Chain
rag_chain_with_source = RunnableParallel(
    {"context": retriever, "question": RunnablePassthrough()}
).assign(
    answer=lambda x: llm.invoke(
        prompt.format(
            context=format_docs(x["context"]),
            question=x["question"]
        )
    )
)

# 测试
question = "Cube Studio 支持哪些机器学习框架？"
result = rag_chain_with_source.invoke(question)

print(f"Question: {question}\n")
print(f"Answer: {result['answer']}\n")
print("Sources:")
for doc in result['context']:
    print(f"  - {doc.metadata.get('source', 'unknown')}: {doc.page_content[:80]}...")

## 9. 对话式 RAG

In [None]:
from langchain.memory import ConversationBufferWindowMemory

# 对话式 RAG Prompt
CONVERSATIONAL_RAG_TEMPLATE = """你是 ONE-DATA-STUDIO 平台的智能助手。

对话历史：
{chat_history}

上下文信息：
{context}

用户问题：{question}

请基于对话历史和上下文信息，提供准确、连贯的回答："""

conversational_prompt = ChatPromptTemplate.from_template(CONVERSATIONAL_RAG_TEMPLATE)

# 对话记忆 (保留最近 5 轮)
memory = ConversationBufferWindowMemory(
    k=5,
    return_messages=False,
    memory_key="chat_history"
)

def conversational_rag(question: str) -> str:
    # 检索相关文档
    docs = retriever.invoke(question)
    context = format_docs(docs)
    
    # 获取对话历史
    chat_history = memory.load_memory_variables({})["chat_history"]
    
    # 生成回答
    formatted_prompt = conversational_prompt.format(
        chat_history=chat_history,
        context=context,
        question=question
    )
    answer = llm.invoke(formatted_prompt)
    
    # 保存到记忆
    memory.save_context({"input": question}, {"output": answer})
    
    return answer

In [None]:
# 多轮对话测试
conversations = [
    "ONE-DATA-STUDIO 的架构是什么样的？",
    "其中的数据层是哪个组件？",
    "它支持哪些数据源？",
    "如何与 Bisheng 集成？"
]

for q in conversations:
    print(f"\nUser: {q}")
    answer = conversational_rag(q)
    print(f"Assistant: {answer}")

## 10. 清理资源

In [None]:
# 可选：删除测试集合中的数据
# vectorstore.delete(filter={"source": {"$exists": True}})
# print("Cleaned up test data")

## 总结

本 Notebook 演示了完整的 RAG 流水线：

1. **组件初始化** - 配置 LLM 和 VectorStore
2. **文档处理** - 分块和向量化
3. **相似性搜索** - 测试检索效果
4. **RAG Chain** - 构建检索增强生成链
5. **带来源 RAG** - 返回答案和引用来源
6. **对话式 RAG** - 支持多轮对话的 RAG

### 最佳实践

- **分块策略**：根据文档类型选择合适的分块大小和重叠
- **检索数量**：通常 3-5 个相关文档效果较好
- **温度设置**：RAG 场景建议使用较低温度 (0.1-0.3)
- **Prompt 优化**：明确指示 LLM 基于上下文回答
- **结果评估**：定期评估检索准确率和回答质量