图节点测试用例

In [4]:
from typing import TypedDict, Annotated, Literal
from langchain_ollama import ChatOllama
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.constants import START, END
from langgraph.graph import add_messages, StateGraph
from langgraph.types import interrupt, Command


# 定义 Agent 的状态结构，包含消息列表
class AgentState(TypedDict):
    messages: Annotated[list, add_messages]


# 初始化本地大语言模型，配置模型名称和推理模式
llm = ChatOllama(
    model="deepseek-r1:32b",
    base_url="http://localhost:12356"
)


# 聊天机器人函数，用于处理对话状态并生成回复
def chatbot(state: AgentState):
    return {"messages": [llm.invoke(state['messages'])]}


def human_approval(state: AgentState) -> Command[Literal["chatbot", END]]:
    question = "是否同意调用大语言模型？(y/n): "
    while True:
        response = input(question).strip().lower()
        if response in ("y", "yes"):
            return Command(goto="chatbot")
        elif response in ("n", "no"):
            print("❌ 已拒绝，流程结束。")
            return Command(goto=END)
        else:
            print("⚠️ 请输入 y 或 n。")


# 构建状态图结构
graph_builder = StateGraph(AgentState)

# 每个节点都与对应的处理函数进行绑定，构成工作流的基本单元
graph_builder.add_node("human_approval", human_approval)
graph_builder.add_node("chatbot", chatbot)

# 添加边：从 START 到 chatbot，然后到 END
graph_builder.add_edge(START, "human_approval")

checkpointer = InMemorySaver()
# 编译图结构，并绘制可视化图表
graph = graph_builder.compile(checkpointer=checkpointer)
# print(graph.get_graph())
graph.get_graph().draw_png('./graph.png')
config = {"configurable": {"thread_id": "chat-1"}}
response1 = graph.invoke({"messages": ["当前模型"]}, config)
print(response1["messages"][-1].content)
# 确认执行
final_result = graph.invoke(Command(resume=True), config)
print(final_result["messages"][-1].content)
# 取消执行
# final_result = graph.invoke(Command(resume=False),config)
# print(final_result["messages"][-1].content)

<think>

</think>

您好！我是由中国的深度求索（DeepSeek）公司开发的智能助手DeepSeek-R1。有关模型和产品的详细内容请参考官方文档。
<think>

</think>

您好！我是由中国的深度求索（DeepSeek）公司开发的智能助手DeepSeek-R1。有关模型和产品的详细内容请参考官方文档。


状态管理聊天

In [6]:
from typing import TypedDict, Annotated
from langchain_ollama import ChatOllama
from langgraph.constants import START, END
from langgraph.graph import add_messages, StateGraph


# 定义 Agent 的状态结构，包含消息列表
class AgentState(TypedDict):
    messages: Annotated[list, add_messages]


# 初始化本地大语言模型，配置模型名称和推理模式
llm = ChatOllama(
    model="deepseek-r1:32b",
    base_url="http://localhost:12356"
)


# 聊天机器人函数，用于处理对话状态并生成回复
def chatbot(state: AgentState):
    return {"messages": [llm.invoke(state['messages'])]}


# 构建状态图结构
graph_builder = StateGraph(AgentState)

# 每个节点都与对应的处理函数进行绑定，构成工作流的基本单元
graph_builder.add_node("chatbot", chatbot)

# 添加边：从 START 到 chatbot，然后到 END
graph_builder.add_edge(START, "chatbot")
graph_builder.add_edge("chatbot", END)

# 编译图结构，并绘制可视化图表
graph = graph_builder.compile()
# print(graph.get_graph())
graph.get_graph().draw_png('./graph02.png')

response1 = graph.invoke({"messages": ["当前模型"]})

print(response1["messages"][-1].content)

Graph(nodes={'__start__': Node(id='__start__', name='__start__', data=RunnableCallable(tags=None, recurse=True, explode_args=False, func_accepts={}), metadata=None), 'chatbot': Node(id='chatbot', name='chatbot', data=chatbot(tags=None, recurse=True, explode_args=False, func_accepts={}), metadata=None), '__end__': Node(id='__end__', name='__end__', data=None, metadata=None)}, edges=[Edge(source='__start__', target='chatbot', data=None, conditional=False), Edge(source='chatbot', target='__end__', data=None, conditional=False)])
<think>

</think>

您好！我是由中国的深度求索（DeepSeek）公司开发的智能助手DeepSeek-R1。有关模型和产品的详细内容请参考官方文档。


In [15]:
import json
import dotenv
from loguru import logger
from pydantic import Field, BaseModel
from langchain_core.tools import tool

# 加载环境变量配置
dotenv.load_dotenv()


class WeatherQuery(BaseModel):
    """
    天气查询参数模型类，用于定义天气查询工具的输入参数结构。

    :param city: 城市名称，字符串类型，表示要查询天气的城市
    """
    city: str = Field(description="城市名称")


class WriteQuery(BaseModel):
    """
    写入查询模型类

    用于定义需要写入文档的内容结构，继承自BaseModel基类

    属性:
        content (str): 需要写入文档的具体内容，包含详细的描述信息
    """
    content: str = Field(description="需要写入文档的具体内容")


@tool(args_schema=WeatherQuery)
def get_weather(city):
    """
    查询指定城市的即时天气信息。

    :param city: 必要参数，字符串类型，表示要查询天气的城市名称。
                 注意：中国城市需使用其英文名称，如 "Beijing" 表示北京。
    :return: 返回 OpenWeather API 的响应结果，URL 为
             https://api.openweathermap.org/data/2.5/weather。
             响应内容为 JSON 格式的字符串，包含详细的天气数据。
    """

    # 发送 GET 请求并获取响应
    # response = httpx.get(url, params=params)
    if city.lower() == "beijing":
        response = {
            "coord": {"lon": 116.4074, "lat": 39.9042},
            "weather": [{"id": 801, "main": "Clouds", "description": "few clouds", "icon": "02d"}],
            "base": "stations",
            "main": {"temp": 12.5, "feels_like": 11.9, "temp_min": 12.5, "temp_max": 12.5, "pressure": 1012,
                     "humidity": 52},
            "visibility": 6000,
            "wind": {"speed": 3.6, "deg": 80},
            "clouds": {"all": 23},
            "dt": 1671673146,
            "sys": {"type": 1, "id": 9246, "country": "CN", "sunrise": 1671650426, "sunset": 1671693807},
            "timezone": 28800,
            "id": 1816670,
            "name": "Beijing",
            "cod": 200
        }

    elif city.lower() == "shanghai":
        response = {
            "coord": {"lon": 121.4737, "lat": 31.2304},
            "weather": [{"id": 802, "main": "Clouds", "description": "scattered clouds", "icon": "03d"}],
            "base": "stations",
            "main": {"temp": 18.2, "feels_like": 17.8, "temp_min": 18.2, "temp_max": 18.2, "pressure": 1010, "humidity": 72},
            "visibility": 5000,
            "wind": {"speed": 4.1, "deg": 150},
            "clouds": {"all": 38},
            "dt": 1671673146,
            "sys": {"type": 1, "id": 9261, "country": "CN", "sunrise": 1671650044, "sunset": 1671692578},
            "timezone": 28800,
            "id": 1796236,
            "name": "Shanghai",
            "cod": 200
        }
    else:
        return {
            "error": "city_not_found",
            "message": "城市未找到，请输入有效城市名称"
        }

    # 将响应解析为 JSON 并序列化为字符串返回
    # data = response.json()
    # logger.info(f"查询天气结果：{json.dumps(data)}")
    logger.info(f"查询天气结果：{response}")
    return response


@tool(args_schema=WriteQuery)
def write_file(content):
    """
    将指定内容写入本地文件

    参数:
        content (str): 要写入文件的文本内容

    返回值:
        str: 表示写入操作成功完成的提示信息
    """
    # 将内容写入res.txt文件，使用utf-8编码确保中文字符正确保存
    with open('res.txt', 'w', encoding='utf-8') as f:
        f.write(content)
        logger.info(f"已成功写入本地文件，写入内容：{content}")
        return "已成功写入本地文件。"

In [1]:
from typing import TypedDict, Annotated
from langchain_core.messages import SystemMessage
from langchain_ollama import ChatOllama
from langgraph.constants import START, END
from langgraph.graph import add_messages, StateGraph
from langgraph.prebuilt import ToolNode

from tools import get_weather, write_file


# 定义 Agent 的状态结构，包含消息列表
class AgentState(TypedDict):
    messages: Annotated[list, add_messages]


# 初始化本地大语言模型，配置模型名称和推理模式
llm = ChatOllama(
    # model="deepseek-r1:32b",
    model="llama3.1:8b",
    base_url="http://localhost:12356"
)
tools = [get_weather, write_file]
llm_with_tools = llm.bind_tools(tools)


# 聊天机器人节点，用于处理对话状态并生成回复，并告诉模型可以调用哪些工具
def chat_node(state: AgentState):
    messages = state["messages"]
    system_prompt = """你是一个智能助手，具备以下能力：
                    1. 查询天气信息
                    2. 结果写入文件
                    请根据用户的需求，选择合适的工具来完成任务。回答要准确、友好、专业。"""
    # 构建完整的消息列表（系统提示词 + 用户消息）,如果第一条消息不是系统消息，则添加系统提示词
    if not any(isinstance(msg, SystemMessage) for msg in messages):
        messages = [SystemMessage(
            content=system_prompt)] + messages
    result = llm_with_tools.invoke(messages)
    return {"messages": [result]}


# 定义工具节点（系统预置 ToolNode 会自动解析 tool_calls）
tool_node = ToolNode(tools=tools)


# 动态路由：chat_node → tool_node 或 END
def route_after_chat(state: AgentState):
    """判断是否需要进入工具节点"""
    last_message = state["messages"][-1]
    if hasattr(last_message, "tool_calls") and last_message.tool_calls:
        return "tool_node"
    return END


# 构建状态图结构
graph_builder = StateGraph(AgentState)

# 每个节点都与对应的处理函数进行绑定，构成工作流的基本单元
graph_builder.add_node("chat_node", chat_node)
graph_builder.add_node("tool_node", tool_node)

# 添加边：从 START 到 chatbot，然后到 END
graph_builder.add_edge(START, "chat_node")
# 添加条件边：根据是否有工具调用来判断是否需要进入工具节点
graph_builder.add_conditional_edges("chat_node", route_after_chat, ["tool_node", END])
# 工具节点执行完后回到 chat_node，继续多轮对话
graph_builder.add_edge("tool_node", "chat_node")

# 编译图结构，并绘制可视化图表
graph = graph_builder.compile()
graph.get_graph().draw_png('./graph3.png')

response1 = graph.invoke({"messages": ["北京天气怎么样"]})

print(response1["messages"][-1].content)

[32m2025-12-17 10:19:44.619[0m | [1mINFO    [0m | [36mtools[0m:[36mget_weather[0m:[36m89[0m - [1m查询天气结果：{'coord': {'lon': 116.4074, 'lat': 39.9042}, 'weather': [{'id': 801, 'main': 'Clouds', 'description': 'few clouds', 'icon': '02d'}], 'base': 'stations', 'main': {'temp': 12.5, 'feels_like': 11.9, 'temp_min': 12.5, 'temp_max': 12.5, 'pressure': 1012, 'humidity': 52}, 'visibility': 6000, 'wind': {'speed': 3.6, 'deg': 80}, 'clouds': {'all': 23}, 'dt': 1671673146, 'sys': {'type': 1, 'id': 9246, 'country': 'CN', 'sunrise': 1671650426, 'sunset': 1671693807}, 'timezone': 28800, 'id': 1816670, 'name': 'Beijing', 'cod': 200}[0m


北京天气很好，今天是多云的，温度为12.5度。


In [1]:
from typing import TypedDict, Annotated
import json
from langchain_core.messages import SystemMessage, ToolMessage
from langchain_ollama import ChatOllama
from langgraph.constants import START, END
from langgraph.graph import add_messages, StateGraph
from langgraph.prebuilt import ToolNode
from langgraph.checkpoint.memory import MemorySaver
from tools import get_weather, write_file


# 定义 Agent 的状态结构
class AgentState(TypedDict):
    messages: Annotated[list, add_messages]


# 初始化本地大语言模型
llm = ChatOllama(
    # model="deepseek-r1:32b",
    model="llama3.1:8b",
    base_url="http://localhost:12356"
)

tools = [get_weather, write_file]
llm_with_tools = llm.bind_tools(tools)


# 聊天机器人节点
def chat_node(state: AgentState):
    messages = state["messages"]
    system_prompt = """你是一个智能助手，具备以下能力：
                    1. 查询天气信息
                    2. 结果写入文件
                    请根据用户的需求，选择合适的工具来完成任务。回答要准确、友好、专业。"""

    if not any(isinstance(msg, SystemMessage) for msg in messages):
        messages = [SystemMessage(content=system_prompt)] + messages

    result = llm_with_tools.invoke(messages)
    return {"messages": [result]}


# 定义工具节点
tool_node = ToolNode(tools=tools)


# 动态路由：chat_node 之后
def route_after_chat(state: AgentState):
    """判断是否需要调用工具"""
    last_message = state["messages"][-1]
    if hasattr(last_message, "tool_calls") and last_message.tool_calls:
        return "tool_node"
    return END


# 构建状态图
graph_builder = StateGraph(AgentState)

# 添加节点
graph_builder.add_node("chat_node", chat_node)
graph_builder.add_node("tool_node", tool_node)

# 添加边
graph_builder.add_edge(START, "chat_node")
graph_builder.add_conditional_edges("chat_node", route_after_chat, ["tool_node", END])
graph_builder.add_edge("tool_node", "chat_node")

# 编译图结构 - 关键：使用 interrupt_before 在工具节点前中断
memory = MemorySaver()
graph = graph_builder.compile(
    checkpointer=memory,
    interrupt_before=["tool_node"]  # 在执行工具前中断，等待人工确认
)

# 绘制可视化图表
graph.get_graph().draw_png('./graph4.png')


def run_with_approval():
    """运行带人工确认的工作流"""
    config = {"configurable": {"thread_id": "1"}}

    # 第一步：发送用户消息，执行到中断点
    print("【用户】北京天气怎么样\n")
    result = graph.invoke({"messages": ["北京天气怎么样"]}, config)

    # 检查是否中断（等待人工确认）
    snapshot = graph.get_state(config)

    if snapshot.next:  # 如果有下一个节点，说明被中断了
        print("工具调用需要人工确认")

        # 获取待执行的工具调用信息
        last_message = snapshot.values["messages"][-1]
        if hasattr(last_message, "tool_calls") and last_message.tool_calls:
            for idx, tool_call in enumerate(last_message.tool_calls, 1):
                print(f"\n[{idx}] 工具名称: {tool_call['name']}")
                print(f"    调用参数: {json.dumps(tool_call['args'], ensure_ascii=False, indent=4)}")

            approval = input("是否批准执行？(yes/no): ").strip().lower()

            if approval in ['yes', 'y']:
                print("工具调用已批准，继续执行...\n")
                # 继续执行（resume）
                result = graph.invoke(None, config)

            elif approval in ['no', 'n']:
                print("工具调用已被拒绝\n")
                # 手动添加拒绝消息，然后继续
                tool_messages = []
                for tool_call in last_message.tool_calls:
                    tool_messages.append(
                        ToolMessage(
                            content="工具调用被用户拒绝，请询问用户是否需要调整方案或提供更多信息。",
                            tool_call_id=tool_call["id"]
                        )
                    )
                # 更新状态并跳过工具节点
                graph.update_state(config, {"messages": tool_messages})
                result = graph.invoke(None, config)

    # 输出最终结果
    print("最终回复:")
    final_message = result["messages"][-1]
    print(final_message.content if hasattr(final_message, 'content') else str(final_message))


# 测试运行
if __name__ == "__main__":
    run_with_approval()

【用户】北京天气怎么样

工具调用需要人工确认

[1] 工具名称: get_weather
    调用参数: {
    "city": "Beijing"
}
工具调用已被拒绝

最终回复:
工具调用被用户拒绝，请询问用户是否需要调整方案或提供更多信息。


In [None]:
from typing import TypedDict, Annotated
from langchain_core.messages import SystemMessage, AIMessage
from langchain_ollama import ChatOllama
from langgraph.checkpoint.memory import MemorySaver
from langgraph.constants import START, END
from langgraph.graph import add_messages, StateGraph
from langgraph.prebuilt import ToolNode
from tools import get_weather


# 定义 Agent 的状态结构，包含消息列表和审核状态
class AgentState(TypedDict):
    """
    描述 Agent 当前状态的数据结构。

    属性:
        messages (Annotated[list, add_messages]): 包含历史交互信息的消息列表，
                                                  使用 `add_messages` 合并新旧消息。
        user_approved (bool): 标记用户是否同意执行工具调用
    """
    messages: Annotated[list, add_messages]
    user_approved: bool


# 初始化本地大语言模型，配置基础URL、模型名称和推理模式
llm = ChatOllama(
    # model="deepseek-r1:32b",
    model="llama3.1:8b",
    base_url="http://localhost:12356"
)
tools = [get_weather]
model = llm.bind_tools(tools)


def call_model(state: AgentState):
    """
    调用绑定工具的大语言模型以生成响应。

    参数:
        state (AgentState): 包含当前会话中所有消息的状态对象。

    返回:
        dict: 新增模型响应后的更新状态（仅追加最新一条回复）。
    """
    system_prompt = SystemMessage("你是一个AI助手，可以依据用户提问产生回答，你还具备调用天气函数的能力")
    response = model.invoke([system_prompt] + state["messages"])
    return {"messages": [response]}


# --- 人在闭环 (HITL) 节点 ---
def human_review(state: AgentState):
    """
    在执行工具调用之前请求人工审核确认。

    如果最后一条消息包含待执行的工具调用，则提示用户进行确认。
    若用户拒绝，则终止流程；否则允许进入工具执行阶段。

    参数:
        state (AgentState): 包含当前会话状态的对象。

    返回:
        dict: 根据用户选择决定下一步操作：
              - 用户拒绝时返回系统提示消息并标记为未批准；
              - 允许继续则标记为已批准。
    """
    last_message = state["messages"][-1]

    if hasattr(last_message, "tool_calls") and last_message.tool_calls:
        call = last_message.tool_calls[0]
        tool_name = call["name"]
        tool_args = call["args"]

        print(f"[HITL] 模型计划调用工具 `{tool_name}`，参数：{tool_args}")
        confirm = input("[HITL] 是否确认执行？(y/n): ")

        if confirm.lower() != "y":
            # 用户拒绝，返回提示消息并标记为未批准
            return {
                "messages": [AIMessage(content="用户拒绝了工具调用，无法获取相关信息。")],
                "user_approved": False
            }

        # 用户同意，标记为已批准
        return {"user_approved": True}

    # 没有工具调用，直接标记为已批准
    return {"user_approved": True}


def should_review(state: AgentState):
    """
    判断是否需要进行人工审核。

    参数:
        state (AgentState): 当前状态

    返回:
        str: 下一个节点名称
    """
    last_message = state["messages"][-1]
    if hasattr(last_message, "tool_calls") and last_message.tool_calls:
        return "human_review"
    return END


def should_execute_tools(state: AgentState):
    """
    判断是否应该执行工具。

    参数:
        state (AgentState): 当前状态

    返回:
        str: 下一个节点名称
    """
    if state.get("user_approved", False):
        return "tools"
    return END


# 创建工具节点，用于执行工具调用
tool_node = ToolNode(tools)

# 构建状态图结构
graph_builder = StateGraph(AgentState)

# 每个节点都与对应的处理函数进行绑定，构成工作流的基本单元
graph_builder.add_node("agent", call_model)
graph_builder.add_node("human_review", human_review)
graph_builder.add_node("tools", tool_node)

# 添加边：从 START 到 agent
graph_builder.add_edge(START, "agent")

# 添加条件边：根据是否有工具调用来判断是否需要人工审核
graph_builder.add_conditional_edges(
    "agent",
    should_review,
    {"human_review": "human_review", END: END}
)

# 添加条件边：根据用户是否同意来决定是否执行工具或结束
graph_builder.add_conditional_edges(
    "human_review",
    should_execute_tools,
    {"tools": "tools", END: END}
)

# 工具执行完成后重新回到 agent 继续对话循环
graph_builder.add_edge("tools", "agent")

# 创建内存保存器
memory = MemorySaver()

# 编译图结构，并绘制可视化图表
graph = graph_builder.compile(checkpointer=memory)
graph.get_graph().draw_png('./graph5.png')

# 配置对话线程ID
config = {"configurable": {"thread_id": "chat-1"}}

# 运行第一轮：问北京天气
print("\n" + "=" * 50)
print("第一轮对话：询问北京天气")
print("=" * 50)
response1 = graph.invoke({"messages": ["北京天气怎么样"]}, config=config)

print("\n=== 第一次结果 ===")
print(response1["messages"][-1].content)

# 打印已保存的检查点
print("\n" + "=" * 50)
print("检查点历史")
print("=" * 50)
states = list(graph.get_state_history(config))

for i, state in enumerate(states):
    print(f"\n=== 检查点 {i} (next: {state.next}) ===")
    print(f"Checkpoint ID: {state.config['configurable']['checkpoint_id']}")
    if state.values.get("messages"):
        print(f"Messages count: {len(state.values['messages'])}")

# 从第二个检查点恢复并注入新问题
print("\n" + "=" * 50)
print("第二轮对话：从检查点恢复并询问上海天气")
print("=" * 50)
new_config = graph.update_state(
    states[1].config,
    values={"messages": [{"role": "user", "content": "上海天气怎么样"}]}
)

response2 = graph.invoke(None, config=new_config)

print("\n=== 第二次结果 ===")
print(response2["messages"][-1].content)