# 如何组合多个 LangGraph 示例

本示例展示如何将 `examples` 文件夹中的多个示例组合在一起，构建更强大的应用。

我们将展示几种常见的组合模式：

1. **子图组合**：将不同的图作为子图嵌入主图
2. **节点调用**：在节点中直接调用其他图
3. **工具集成**：将不同功能封装为工具，供 Agent 使用
4. **状态路由**：根据状态条件路由到不同的处理流程


## 安装依赖


In [None]:
! pip install -U langchain langchain-openai langgraph langchain-community chromadb


## 方法 1: 使用子图（Subgraphs）组合

这是最推荐的组合方式。可以将独立的图作为子图嵌入到主图中。


In [None]:
from typing import Annotated, TypedDict, Literal
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import AnyMessage, add_messages

# 定义状态
class MainState(TypedDict):
    messages: Annotated[list[AnyMessage], add_messages]
    query_type: Literal["code", "rag", "general"]
    result: str

# 初始化 LLM
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)


In [None]:
# 示例：创建一个简单的代码助手节点（类似 code_assistant 示例）
def code_assistant_node(state: MainState):
    """代码助手节点"""
    from langchain_core.prompts import ChatPromptTemplate
    
    prompt = ChatPromptTemplate.from_messages([
        ("system", "You are a helpful coding assistant. Provide clear, executable code."),
        ("placeholder", "{messages}")
    ])
    
    chain = prompt | llm
    response = chain.invoke({"messages": state["messages"]})
    
    return {
        "messages": [response],
        "result": response.content
    }

# 示例：创建一个简单的 RAG 节点（类似 adaptive_rag 示例）
def rag_node(state: MainState):
    """RAG 检索节点"""
    from langchain_core.prompts import ChatPromptTemplate
    
    prompt = ChatPromptTemplate.from_messages([
        ("system", "You are a helpful assistant that answers questions based on retrieved context."),
        ("placeholder", "{messages}")
    ])
    
    chain = prompt | llm
    response = chain.invoke({"messages": state["messages"]})
    
    return {
        "messages": [response],
        "result": response.content
    }

# 通用助手节点
def general_assistant_node(state: MainState):
    """通用助手节点"""
    from langchain_core.prompts import ChatPromptTemplate
    
    prompt = ChatPromptTemplate.from_messages([
        ("system", "You are a helpful general assistant."),
        ("placeholder", "{messages}")
    ])
    
    chain = prompt | llm
    response = chain.invoke({"messages": state["messages"]})
    
    return {
        "messages": [response],
        "result": response.content
    }


In [None]:
# 路由函数：决定使用哪个节点（类似 branching 示例）
def route_query(state: MainState) -> Literal["code_assistant", "rag", "general"]:
    """根据查询类型路由到不同的处理节点"""
    query_type = state.get("query_type")
    
    if query_type == "code":
        return "code_assistant"
    elif query_type == "rag":
        return "rag"
    else:
        return "general"

# 构建主图
builder = StateGraph(MainState)

# 添加节点（这些节点可以是从其他示例中提取的子图）
builder.add_node("code_assistant", code_assistant_node)
builder.add_node("rag", rag_node)
builder.add_node("general", general_assistant_node)

# 添加条件路由（类似 branching 示例）
builder.add_conditional_edges(
    START,
    route_query,
    {
        "code_assistant": "code_assistant",
        "rag": "rag",
        "general": "general"
    }
)
builder.add_edge("code_assistant", END)
builder.add_edge("rag", END)
builder.add_edge("general", END)

combined_graph = builder.compile()


## 方法 2: 在节点中直接调用其他图

可以在一个图的节点中直接调用另一个已编译的图。


In [None]:
# 创建一个简单的代码生成图（类似 code_assistant 示例）
def simple_code_generator(state: MainState):
    """简单的代码生成函数"""
    from langchain_core.prompts import ChatPromptTemplate
    
    prompt = ChatPromptTemplate.from_messages([
        ("system", "Generate Python code only."),
        ("placeholder", "{messages}")
    ])
    
    chain = prompt | llm
    response = chain.invoke({"messages": state["messages"]})
    return {"messages": [response]}

# 创建一个独立的代码生成图
code_graph_builder = StateGraph(MainState)
code_graph_builder.add_node("generate", simple_code_generator)
code_graph_builder.add_edge(START, "generate")
code_graph_builder.add_edge("generate", END)
code_graph = code_graph_builder.compile()

# 在主图的节点中调用另一个图
def orchestrator_node(state: MainState):
    """编排节点：可以调用其他图"""
    
    # 方法 1: 直接调用其他图
    if "code" in state["messages"][-1].content.lower():
        # 调用代码生成图
        code_result = code_graph.invoke(state)
        return code_result
    
    # 方法 2: 调用其他函数/工具
    return general_assistant_node(state)

# 构建编排图
orchestrator_builder = StateGraph(MainState)
orchestrator_builder.add_node("orchestrator", orchestrator_node)
orchestrator_builder.add_edge(START, "orchestrator")
orchestrator_builder.add_edge("orchestrator", END)
orchestrator_graph = orchestrator_builder.compile()


## 方法 3: 使用工具封装功能

将不同示例的功能封装为工具，供 React Agent 使用（类似 create-react-agent 示例）。


In [None]:
from langchain_core.tools import tool
from langgraph.prebuilt import create_react_agent

# 将代码生成功能封装为工具（类似 code_assistant 示例）
@tool
def generate_code(question: str) -> str:
    """Generate Python code based on a question."""
    from langchain_core.prompts import ChatPromptTemplate
    
    prompt = ChatPromptTemplate.from_messages([
        ("system", "You are a code generator. Return only valid Python code."),
        ("user", "{question}")
    ])
    
    chain = prompt | llm
    result = chain.invoke({"question": question})
    return result.content

# 将 RAG 功能封装为工具（类似 adaptive_rag 示例）
@tool
def search_knowledge_base(query: str) -> str:
    """Search the knowledge base for information."""
    # 这里可以调用实际的向量数据库检索
    # 简化示例
    return f"Retrieved information about: {query}"

# 创建带有多个工具的 React Agent（类似 create-react-agent 示例）
tools = [generate_code, search_knowledge_base]
agent = create_react_agent(llm, tools)


## 方法 4: 组合持久化、流式处理和记忆功能

结合多个特性：持久化（persistence）、流式处理（streaming）和记忆管理（memory）。


In [None]:
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import MessagesState
import uuid

# 创建一个带持久化的图（类似 persistence 示例）
memory = MemorySaver()

# 定义状态（类似 memory 示例）
class AgentState(MessagesState):
    context: dict

def agent_node(state: AgentState):
    from langchain_core.prompts import ChatPromptTemplate
    
    prompt = ChatPromptTemplate.from_messages([
        ("system", "You are a helpful assistant with access to conversation history."),
        ("placeholder", "{messages}")
    ])
    
    chain = prompt | llm
    response = chain.invoke({"messages": state["messages"]})
    return {"messages": [response]}

# 构建图（带持久化）
persistent_builder = StateGraph(AgentState)
persistent_builder.add_node("agent", agent_node)
persistent_builder.add_edge(START, "agent")
persistent_builder.add_edge("agent", END)

# 编译时添加持久化检查点（类似 persistence 示例）
persistent_graph = persistent_builder.compile(checkpointer=memory)

# 使用 thread_id 进行持久化会话（类似 memory 示例）
thread_id = str(uuid.uuid4())
config = {"configurable": {"thread_id": thread_id}}

# 流式处理示例（类似 streaming 示例）
def stream_example():
    events = persistent_graph.stream(
        {"messages": [("user", "Hello!")]},
        config,
        stream_mode="values"  # 或 "updates", "messages"
    )
    
    for event in events:
        print(event["messages"][-1].content)

# stream_example()  # 取消注释以运行


## 完整示例：组合代码助手 + RAG + 持久化 + 流式处理

这是一个完整的示例，展示如何组合多个功能。


In [None]:
from typing import Annotated, TypedDict, Literal
from langchain_core.prompts import ChatPromptTemplate
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import AnyMessage, add_messages
from langgraph.checkpoint.memory import MemorySaver
import uuid

# 定义组合状态
class CombinedState(TypedDict):
    messages: Annotated[list[AnyMessage], add_messages]
    query_type: str
    code_result: str
    rag_result: str
    final_answer: str

# 节点 1: 查询分类（类似 branching 示例）
def classify_query(state: CombinedState):
    """分类查询类型"""
    from langchain_core.pydantic_v1 import BaseModel, Field
    
    class QueryType(BaseModel):
        type: Literal["code", "rag", "general"] = Field(description="Query type")
    
    prompt = ChatPromptTemplate.from_template(
        "Classify this query as 'code', 'rag', or 'general': {query}"
    )
    
    classifier = prompt | llm.with_structured_output(QueryType)
    query_type = classifier.invoke({"query": state["messages"][-1].content})
    
    return {"query_type": query_type.type}

# 节点 2: 代码生成（类似 code_assistant 示例）
def generate_code_node(state: CombinedState):
    """生成代码"""
    prompt = ChatPromptTemplate.from_messages([
        ("system", "You are a code assistant. Provide executable Python code."),
        ("placeholder", "{messages}")
    ])
    
    chain = prompt | llm
    response = chain.invoke({"messages": state["messages"]})
    
    return {"code_result": response.content, "messages": [response]}

# 节点 3: RAG 检索（类似 adaptive_rag 示例）
def rag_retrieval_node(state: CombinedState):
    """RAG 检索"""
    prompt = ChatPromptTemplate.from_messages([
        ("system", "Answer based on knowledge base."),
        ("placeholder", "{messages}")
    ])
    
    chain = prompt | llm
    response = chain.invoke({"messages": state["messages"]})
    
    return {"rag_result": response.content, "messages": [response]}

# 节点 4: 通用回答
def general_answer_node(state: CombinedState):
    """通用回答"""
    prompt = ChatPromptTemplate.from_messages([
        ("system", "You are a helpful assistant."),
        ("placeholder", "{messages}")
    ])
    
    chain = prompt | llm
    response = chain.invoke({"messages": state["messages"]})
    
    return {"final_answer": response.content, "messages": [response]}

# 路由函数
def route_by_type(state: CombinedState) -> Literal["code", "rag", "general"]:
    return state["query_type"]

# 构建组合图
combined_builder = StateGraph(CombinedState)

# 添加节点
combined_builder.add_node("classify", classify_query)
combined_builder.add_node("code", generate_code_node)
combined_builder.add_node("rag", rag_retrieval_node)
combined_builder.add_node("general", general_answer_node)

# 添加边
combined_builder.add_edge(START, "classify")
combined_builder.add_conditional_edges(
    "classify",
    route_by_type,
    {
        "code": "code",
        "rag": "rag",
        "general": "general"
    }
)
combined_builder.add_edge("code", END)
combined_builder.add_edge("rag", END)
combined_builder.add_edge("general", END)

# 添加持久化（类似 persistence 示例）
memory = MemorySaver()
final_graph = combined_builder.compile(checkpointer=memory)

print("组合图构建完成！")


In [None]:
# 使用组合图（带流式处理，类似 streaming 示例）
thread_id = str(uuid.uuid4())
config = {"configurable": {"thread_id": thread_id}}

# 流式处理
events = final_graph.stream(
    {"messages": [("user", "Write a Python function to calculate fibonacci")]},
    config,
    stream_mode="values"
)

for event in events:
    if "messages" in event and event["messages"]:
        print(f"Message: {event['messages'][-1].content}\\n")
    if "query_type" in event:
        print(f"Query Type: {event['query_type']}\\n")


## 总结：组合模式最佳实践

1. **子图模式**：将独立的图作为子图，适合模块化设计
2. **节点调用**：在节点中调用其他图，适合复杂编排
3. **工具封装**：将功能封装为工具，适合 Agent 使用
4. **状态路由**：根据状态条件路由，适合多分支流程
5. **特性叠加**：持久化、流式、记忆等功能可以叠加使用

### 组合建议

- **RAG + Agent**：将 RAG 作为 Agent 的工具（adaptive_rag + create-react-agent）
- **代码助手 + 持久化**：保存代码生成历史（code_assistant + persistence）
- **多 Agent + 子图**：每个 Agent 是一个子图（multi_agent + subgraph）
- **流式 + 记忆**：实时反馈 + 历史记录（streaming + memory）
- **分支 + 路由**：根据查询类型动态选择处理路径（branching + state routing）

### 参考示例

- `subgraph.ipynb` - 了解子图的使用
- `branching.ipynb` - 了解条件路由
- `code_assistant/` - 了解代码生成图的结构
- `rag/adaptive_rag.ipynb` - 了解 RAG 图的构建
- `persistence.ipynb` - 了解如何添加持久化
- `streaming-*.ipynb` - 了解流式处理
- `memory/` - 了解记忆管理
