# langgraph入门

使用图数据结构来定义流程走向

# 根据用户输入来处理
user_inpit = input("User")


graph.stream这个是用于流式处理对话的。拿到的event，实际是有响应的node

# 工具使用

如果需要自定义一个工具使用的逻辑的话，可以自定一个基工具类，然后实现__call__函数即可
这样就可以把一些列的工作，当作一个节点使用

In [None]:
class BasicToolNode:
    def __init__(self, tools: list):
        self.tools_by_name = {tool.name: tool for tool in tools}

    def __call__(self, input: dict):
        if messages := input.get("messages", []):
            message = messages[-1]
        else:
            raise ValueError("No messages provided")

        outputs = []
        # 大模型只要确认调用工具，就会把需要调用的工具信息给返回
        for tool_call in message.tool_calls:
            tool_result = self.tools_by_name[tool_call["name"]].invoke(
                tool_call["args"]
            )
            outputs.append(
                ToolMessage(
                    content=json.dumps(tool_result),
                    name=tool_call["name"],
                    tool_call_id=tool_call["id"]
                )
            )

        return {"messages": outputs}


# 记忆

在langgraph中，支持内存、sqlite、pg进行内存存储
内存的话，断电就失效，sqlite和pg支持持久化

同时，可以根据不同的线程id进行会话记忆隔离

In [None]:
# 使用sqlite进行持久化
conn = sqlite3.connect("checkpoints.sqlite", check_same_thread=False)
memory = SqliteSaver(conn)

def build_graph():
    graph_builder = StateGraph(State)
    
    graph_builder.add_node("chatbot", chatbot)
    
    graph_builder.add_edge(START, "chatbot")
    graph_builder.add_edge("chatbot", END)
    
    # 编译图时，带上
    return graph_builder.compile(checkpointer=memory)


    
def main():
    while True:
        user_input = input("User: ")
        if user_input.lower() in ["exit", "quit", "q"]:
            print("goodbye!")
            break
        
        graph = build_graph()
        # 设置会话的线程id，进行隔离
        config = {"configurable": {"thread_id": "1"}}
        events = graph.stream(
            {"messages": [("user", user_input)]},
            config,
            stream_mode="values"
        )


# 人为介入

有几种情况
1、某个节点调用前   interrupt_before
2、在某个节点中     interrupt
3、某个节点调用后   interruppt_after


## 原理：

    langgraph会保存每个node的全局状态，当发现有interrupt时，整个状态进行保存，然后等待人为输入的完成以后，从中断节点开始重新执行

这里会存在一个问题，中断节点中这种时候，前面执行过的逻辑，都会再执行一边


支持人为的修改执行结果




In [None]:
# 编译状态图，指定在工具节点之前进行中断
graph = graph_builder.compile(
    checkpointer=memory,  # 使用 MemorySaver 作为检查点系统
    interrupt_before=["tools"],  # 在进入 "tools" 节点前进行中断
)

# 查询对话历史

可以通过graph获取


In [None]:
history = graph.get_state_history({"configurable": {"thread_id": "1"}})

# 遍历历史记录，打印每个状态中的所有消息
for state in history:
    print("=== 对话历史 ===")
    # 遍历每个状态中的消息记录
    for message in state.values["messages"]:
        if isinstance(message, BaseMessage):
            # 根据消息类型区分用户与机器人
            if "user" in message.content.lower():
                print(f"User: {message.content}")
            else:
                print(f"Assistant: {message.content}")


这里是支持根据不同线程id，进行会话区分的

## events只能获取当前对话的步骤

由于langgraph的设定，graph.stream只会获取当前会话的历史记录

## 如何对于历史会话去重？

- 原因：重复的状态快照和无消息的中间状态导致了多次重复的对话历史和空对话历史。
- 解决办法：通过检查消息类型和ID来过滤无效和重复的消息，只输出真正有对话内容的状态快照

In [None]:
# 获取指定线程 ID 的所有历史状态
history = graph.get_state_history({"configurable": {"thread_id": "3"}})

# 使用集合存储已处理过的消息 ID
seen_message_ids = set()

# 遍历历史记录，打印每个状态中的所有消息
for state in history:
    # 获取状态中的消息列表
    messages = state.values.get("messages", [])
    
    # 检查是否存在至少一条未处理的 BaseMessage 类型的消息
    valid_messages = [msg for msg in messages if isinstance(msg, BaseMessage) and msg.id not in seen_message_ids]
    
    if valid_messages:
        print("=== 对话历史 ===")
        for message in valid_messages:
            seen_message_ids.add(message.id)  # 记录消息 ID，避免重复处理
            if "user" in message.content.lower():
                print(f"User: {message.content}")
            else:
                print(f"Assistant: {message.content}")
    else:
        print("=== 空对话历史（无有效消息） ===")