# Langgraph（图驱动工作流引擎）

LangGraph 是 LangChain 生态中专门用于 **构建复杂状态化 AI 应用** 的核心模块，其核心设计理念是通过 **图结构（Graph）** 管理多步骤任务流，并 **提供持久化状态管理** 能力。它解决了传统 LangChain 在处理循环、条件分支、多智能体协作等场景时的局限性，是构建企业级 AI 系统的关键工具。

LangGraph 通过图结构 + 状态管理，将复杂 AI 应用的开发从 “链式编程” 升级为 “可视化建模”。其核心价值在于：

1. 可维护性：图结构清晰展示执行逻辑，降低协作成本。

2. 鲁棒性：自动状态持久化和错误恢复，减少生产故障。

3. 扩展性：模块化设计支持增量开发，适配从原型到生产的全生命周期。

通过掌握 LangGraph，开发者能够构建真正具备 “长期记忆” 和 “动态决策” 能力的 AI 系统，为企业级应用提供坚实基础。后续的 Memory 模块正是基于 LangGraph 的状态管理机制实现的，理解上述内容是深入学习 Memory 的关键。

## 一、LangGraph 的核心定位与优势

**1. 与LangChain 的本质区别**

|**维度**|	**LangChain**|	**LangGraph**|
|--|--|--|
|**核心抽象**|	链式流程（Chains）|	图结构（Graph）|
|**状态管理**|	有限的内存模块（如 ConversationBufferMemory）|	原生支持持久化状态（Checkpointer+Store）|
|**控制流**|	线性执行（DAG）|	支持循环、条件分支、多智能体协作|
|**适用场景**|	简单问答、单步骤任务|	复杂对话、自动化工作流、多智能体系统|


**2. 核心优势**

- **动态流程控制**：支持条件分支（如`if-else`）和循环（如重试逻辑），适合需要多轮推理的场景。

- **状态持久化**：自动保存执行过程中的状态快照（Checkpoint），支持断点续传和时间回溯。

- **人机协作**：允许人工干预流程（如审核AI生成的内容），适合医疗、金融等需要合规性的场景。

- **可视化调试**：提供 `Mermaid` 格式的图结构可视化工具，帮助理解复杂的执行路径。

## 二、核心组件与原理

### 1. StateGraph：工作流的“蓝图”

- **定义**：用有向图描述任务流，节点（Node）代表原子操作，边（Edge）定义执行顺序。

- **关键特性**：

    - **节点类型**：支持LLM调用、工具执行、数据处理等自定义函数。
 
    - **条件边**：基于状态动态选择执行路径（如根据用户输入决定下一步操作）。
 
    - **循环支持**：通过 `add_edge` 连接节点形成闭环、实现重试或迭代逻辑。

In [1]:
from typing import TypedDict
from langgraph.graph import StateGraph, START, END

# 定义状态结构
class AgentState(TypedDict):
    input: str
    output: str

# 定义节点函数
def process_input(state: AgentState) -> AgentState:
    return {"output": f"处理输入：{state['input']}"}

def generate_response(state: AgentState) -> AgentState:
    return {"output": f"响应：{state['output']}"}

# 构建图：
graph = StateGraph(AgentState)
graph.add_node("process", process_input)
graph.add_node("respond", generate_response)
graph.add_edge(START, "process")
graph.add_edge("process", "respond")
graph.add_edge("respond", END)

# 编译并执行
compiled_graph = graph.compile()
result = compiled_graph.invoke({"input": "你好！"})
print(result["output"])  # 输出：响应：处理输入：你好！


响应：处理输入：你好！


### 2. State：贯穿全流程的数据载体

- **定义**：就是图中的全局数据，通常用于存储对话历史、中间结果等关键信息，用`TypedDict`或字典描述。

- **核心机制**：

    - **Reducer**：定义状态更新规则（如合并列表、覆盖至），避免数据冲突。
 
    - **注入与访问**：节点函数通过参数直接获取当前状态，更新后返回增量数据。

In [14]:
from typing import TypedDict, Annotated
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, RemoveMessage
from langgraph.graph import add_messages #官网提供的消息Reducer

# 定义图的状态
class AgentState(TypedDict):
    messages: Annotated[list[BaseMessage], add_messages] # 使用 add_messages 合并消息
    content: str # 新增： 存储用户输入的cnotent参数（供add_user_message读取）

def add_user_message(state: AgentState) -> AgentState:
    # 从状态中读取content参数
    user_content = state["content"]
    return {"messages": [HumanMessage(content=user_content)]} # 自动合并到原有messages

from langchain_openai import ChatOpenAI
from dotenv import load_dotenv
import os

load_dotenv()

llm = ChatOpenAI(
    api_key=os.getenv("UIUIAPI_API_KEY"),
    base_url=os.getenv("UIUIAPI_BASE_URL"),
    model="gpt-3.5-turbo",
    temperature=0.3
)

def ai_respond(state: AgentState) -> AgentState:
    response = llm.invoke(state["messages"]) # 使用当前对话历史调用LLM
    return {"messages": [response]}

### 3. Checkpointer：短期记忆的“保险箱”

- **作用**：按对话线程（`thread_id`）保存状态快照，支持多轮对话上下文连续性。

- **核心实现**：

    - **内存级**：`InMemorySaver`（开发测试用）。
 
    - **生产级**： `MongoDBSaver`、`PostgresSaver`等。

In [18]:
from langgraph.checkpoint.memory import InMemorySaver

# 初始化Checkpointer
checkpointer = InMemorySaver()

# 构建图并绑定Checkpointer
graph = StateGraph(AgentState)
graph.add_node("add_message", add_user_message)
graph.add_node("get_response", ai_respond)

graph.add_edge(START, "add_message")
graph.add_edge("add_message", "get_response")
graph.add_edge("get_response", END)

# 绑定Checkpointer 并 编译图
compiled_graph = graph.compile(checkpointer=checkpointer)

# 多轮对话
config = {"configurable": {"thread_id": "user_changyun_001"}}

result = compiled_graph.invoke(
    input={
        "content": "Python是世界上最好的语言", # 传入content参数（存入state）
        "messages": [] # 初始的对话历史为空
    },
    config=config
)

result2 = compiled_graph.invoke(
    input={
        "content": "请重复一下我刚才的提问内容",
    },
    config=config
)

print(result2['messages'][-1].content)


您刚才提问的内容是：Python是世界上最好的语言。


### 4. Store：长期存储的“知识库”

- **作用**：跨对话线程 **存储持久化数据** （如用户偏好、历史记录）支持向量检索

- **核心接口**：

    - `put/get`：按键值对读写数据。
 
    - `seach`：基于内容相似度检索（需配合嵌入函数）。

In [26]:
from langgraph.store.memory import InMemoryStore

# 初始化Store
store = InMemoryStore()

# help(store)

store.put(
    namespace=("user_changyun", "preferences"),
    key="language",
    value={"preferred_lang": "English"}
)

store.put(
    namespace=("user_changyun", "info"),
    key="last_name",
    value="changyun"
)

# 跨线程读取
result = store.get(namespace=("user_changyun", "preferences"), key="language")
print(result.value["preferred_lang"])
result2 = store.get(namespace=("user_changyun", "info"), key="last_name")
print(result2.value)

English
changyun


## 三、关键实践：从基础到高级

### 1. 条件分支

In [47]:
from typing import TypedDict, Annotated
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from langgraph.graph.message import add_messages
from langchain_core.prompts import ChatPromptTemplate
from langgraph.graph import StateGraph, START, END

# 定义图的状态
class BranchState(TypedDict):
    messages: Annotated[list[BaseMessage], add_messages]
    content: str

def router(state: BranchState) -> str:
    user_input = state["content"].lower()  # 转为小写，使判断更健壮
    return "hello_case" if "hello" in user_input else "other_case"

# 定义节点处理函数
def process_a(state: BranchState) -> BranchState:
    """处理包含hello的输入"""
    return {
        "messages": [AIMessage(content="Hello! How can I help you?")],
        "content": state["content"]
    }

def process_b(state: BranchState) -> BranchState:
    """处理不包含hello的输入"""
    return {
        "messages": [AIMessage(content="Goodbye! Have a nice day!")],
        "content": state["content"]  # 保留原始内容
    }

# 创建状态图
graph = StateGraph(BranchState)

# 添加节点
graph.add_node("router", router)
graph.add_node("process_a", process_a)
graph.add_node("process_b", process_b)

# 定义边
# graph.add_edge(START, "router")
# help(graph.add_conditional_edges)
graph.add_conditional_edges(
    source=START,  # 源节点
    path=router,   # 条件处理函数: 使用路由函数返回值作为条件
    path_map={
        "hello_case": "process_a",
        "other_case": "process_b",
    }
)
graph.add_edge("process_a", END)
graph.add_edge("process_b", END)

# 编译图
compiled_graph = graph.compile()

config = {"configurable": {"thread_id": "user_changyun_002"}}

try:
    result = compiled_graph.invoke(
        input={
            "content": "hello! gpt!",
            "messages": []
        },
        config=config
    )
    
    print(result["messages"][-1].content)
except Exception as e:
    print(f"Error occurred: {str(e)}")


Hello! How can I help you?
