<center><a href="https://www.nvidia.cn/training/"><img src="https://dli-lms.s3.amazonaws.com/assets/general/DLI_Header_White.png" width="400" height="186" /></a></center>

# <font color="#76b900"> **7.5:** LangGraph

**恭喜您（几乎）完成课程！** 

希望您在这个过程中获得了创建高级语言模型应用的宝贵技能。 
- **在 notebook 8 中，** 您将能够将这些技能付诸实践，构建一个跨越多个领域的集成系统。 
- **在这个 notebook 中，** 我们将简要介绍 LangGraph，这是一个流行的多智能体编排框架，它做出了一些有用的设计决策，对于希望深入这个领域的人来说是一个绝佳的起点！

### **设置**

在开始之前，让我们导入必要的库并初始化我们的语言模型。

In [None]:
import requests
from langchain_nvidia_ai_endpoints import ChatNVIDIA
from langchain_openai import ChatOpenAI

## USE THIS ONE TO START OUT WITH. NOTE IT'S INTENTED USE AS A VISUAL LANGUAGE MODEL FIRST
# model_path="http://localhost:9000/v1"
## USE THIS ONE FOR GENERAL USE AS A SMALL-BUT-PURPOSE CHAT MODEL BEING RAN LOCALLY VIA NIM
model_path="http://nim:8000/v1"
# ## USE THIS ONE FOR ACCESS TO CATALOG OF RUNNING NIM MODELS IN `build.nvidia.com`
# model_path="http://llm_client:9000/v1"

model_name = requests.get(f"{model_path}/models").json().get("data", [{}])[0].get("id")
%env NVIDIA_BASE_URL=$model_path
%env NVIDIA_DEFAULT_MODE=open

if "llm_client" in model_path:
    model_name = "meta/llama-3.1-70b-instruct"

llm = ChatNVIDIA(model=model_name, base_url=model_path, max_tokens=5000, temperature=0, streaming=True)

----

最后，让我们加载 notebook 名称以及之前计算的 notebook 摘要字典。我们将在整个 notebook 中默认使用这些摘要，但您可以随意尝试。 

In [None]:
import json
import os

with open('notebook_chunks.json', 'r') as fp:
    nbsummary = json.load(fp)

filenames = nbsummary.get("filenames")
outlines = "\n\n".join([v.get("outline") for k,v in nbsummary.items() if isinstance(v, dict)])
# outlines

<hr>
<br>

## **8.1：智能 notebook 检索**

在评估的第一部分中——旨在展示智能工作流的新抽象——我们将创建一个能够从 notebook 中检索信息并与用户进行有意义交互的智能体。

**具体来说，我们的智能体将：**
- 与用户交互以理解他们的查询。
- 从一组 Jupyter notebook 中访问和检索信息。
- 根据检索到的信息提供简洁且有帮助的响应。

为了有一个不错的起点，让我们重新创建上一个 notebook 中的对话智能体，并在这里保留我们的提示词以方便自定义：

In [None]:
from chatbot.conv_tool_caller import ConversationalToolCaller
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain.pydantic_v1 import Field


tool_instruction: str = (
    "In addition to your directive, you have access to the tools listed in the toolbank."
    " Use tools only within the \n<function></function> tags."
    " Select tools to handle uncertain, imprecise, or complex computations that an LLM would find it hard to answer."
    " You can only call one tool at a time, and the tool cannot accept complex multi-step inputs."
    "\n\n<toolbank>{toolbank}</toolbank>\n"
    "Examples (WITH HYPOTHETICAL TOOLS):"
    "\nSure, let me call the tool in question.\n<function=\"foo\">[\"input\": \"hello world\"]</function>"
    "\nSure, first, I need to calculate the expression of 5 + 10\n<function=\"calculator\">[\"expression\": \"5 + 10\"]</function>"
    "\nSure! Let me look up the weather in Tokyo\n<function=\"weather\">[\"location\"=\"Tokyo\"])</function>"
)

tool_prompt: str = (
    "You are an expert at selecting tools to answer questions. Consider the context of the problem,"
    " what has already been solved, and what the immediate next step to solve the problem should be."
    " Do not predict any arguments which are not present in the context; if there's any ambiguity, use no_tool."
    "\n\n<toolbank>{toolbank}</toolbank>\n"
    "\n\nSchema Instructions: The output should be formatted as a JSON instance that conforms to the JSON schema."
    "\n\nExamples (WITH HYPOTHETICAL TOOLS):"
    "\n<function=\"search\">[\"query\": \"current events in Japan\"]</function>"
    "\n<function=\"translation\">[\"text\": \"Hello, how are you?\", \"language\": \"French\"]</function>"
    "\n<function=\"calculator\">[\"expression\": \"5 + 10\"]</function>"
)

conv_llm = ConversationalToolCaller(
    tool_instruction=tool_instruction, 
    tool_prompt=tool_prompt, 
    llm=llm
).get_tooled_chain()

在您进行修改时，请随意调整这些提示词，因为您可以进行各种调整以提高智能体的性能。

<br>

### **7.5.1：介绍 LangGraph**

**[LangGraph 框架](https://github.com/langchain-ai/langgraph)**是一个新增加的工具，它允许我们使用状态图来管理对话流。通过 LangGraph，我们能以结构化的方式定义智能体的状态、转换和动作，省去了完全自定义事件循环的需要。这个框架增强了可扩展性和可维护性，特别是在处理多智能体系统或复杂工作流时。

#### LangGraph 如何增强我们的工作流：
- **状态管理：**LangGraph 允许清晰地区分对话中的不同状态，使得跟踪和管理智能体的进展和决策变得更容易。
- **条件转换：**使用 LangGraph，我们可以定义条件边，根据特定的触发器或条件来决定对话的流向。
- **模块化：**该框架通过允许不同节点（函数）处理特定任务来促进模块化，从而便于更新和扩展。

#### 为什么 LangGraph 比自定义方案更好？
- **为多智能体系统设计：**与我们可以调整为可行的多状态系统的 while 循环不同，LangGraph 采用状态图的方法来建模智能体的遍历过程。因此，它结合了自然扩展到非顺序甚至动态例程的设计模式。
- **简化的集成：**作为一个相对流行的框架，LangGraph 积累了大量免费和付费的集成，极大地改善了开发和部署体验。开发团队发布了像 LangServe、LangSmith 和 LangGraph-Studio 的集成，整个社区也贡献了多种示例应用，展示了特定领域的应用和模块化的即插即用组件。如果您想要一个相对新颖的智能体范例，很多人可能会用 LangGraph 实现。

#### LangGraph 在什么情况下不如自定义方案？
- **可能过度设计：**为了考虑各种多智能体特定的功能集和边缘案例，LangGraph 实施了一些强假设，这大大增加了它的学习曲线。如果您能在基本的 LangChain 中实现解决方案，那么运行时范式就足够简化您的工作流，避免 LangGraph 引入的多层复杂性。而如果您知道想要扩展应用并能受益于其深思熟虑的功能/示例，那么也许值得深入了解并熟悉它。
- **受限的抽象：**尽管 LangGraph 非常出色，但在 LangGraph 之外仍然有更深层次的优化和更强的模块化空间。那些希望构建高度专业化微服务的人可能会对自定义多线程/多进程方案、先进的图算法和高级资源管理策略感兴趣，而这些 LangGraph 可能无法提供。对此感兴趣的人，可以参考 [**Knowledge-Graph-RAG**](https://github.com/NVIDIA/GenerativeAIExamples/tree/main/community/knowledge_graph_rag)。

----

在接下来的 notebook 中，我们将使用 LangGraph 来管理智能体与其工具集之间的流动，以重现之前的手动循环。虽然我们的应用可能没有复杂到必须要用它，但练习 LangGraph 有助于加速您对更大多智能体生态系统的掌握。

接下来，让我们定义一个典型的图，将人类输入节点与智能体响应节点连接起来：

In [None]:
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import END, StateGraph, START
from langgraph.prebuilt import tools_condition
from langgraph.graph.message import AnyMessage, add_messages
from langchain_core.runnables import RunnableConfig
from typing import Annotated
from typing_extensions import TypedDict
import operator
import uuid
import datetime
from IPython.display import Image, display

##################################################################

class State(TypedDict):
    ## Dictates what kind of buffer the agent nodes can write to to pass information
    ## This one says "nodes can write to messages buffer, writing is equivalent to adding a message"
    ## NOTE: To override a message, you can add a message with the target message's ID. 
    ## NOTE: To delete a message, you can add a RemoveMessage with the target's message ID. 
    messages: Annotated[list[AnyMessage], add_messages]
    directives: Annotated[list[AnyMessage], add_messages]

def create_graph(nodes, edges, conditional_edges=[], state=State, thread_id="42", plot=True):
    graph = StateGraph(state)
    [graph.add_node(*node) for node in nodes]
    [graph.add_edge(*edge) for edge in edges]
    [graph.add_conditional_edges(*cedge) for cedge in conditional_edges]
    
    ## The checkpointer lets the graph persist its state
    ## Thread used to select buffer / memory compartment / etc to operate on 
    config = {"configurable": {"thread_id": thread_id}}
    memory = MemorySaver()
    app = graph.compile(checkpointer=memory)

    if plot: display(Image(app.get_graph(xray=True).draw_mermaid_png()))
    return app, memory, config

#################################################################

chat_prompt = ChatPromptTemplate.from_messages([
    ("system", (
        "You are a helpful DLI (Deep Learning Institute) Chatbot who can request and reason about notebooks."
        " Be as concise as necessary, but follow directions as best as you can."
        " Please help the user out by answering any of their questions and following their instructions."
    )),
    ("human", f"Here is the info I want you to work with for all future correspondences: {outlines}"),
    ("ai", "Awesome! I will proceed from scratch with this understanding."),
    ("placeholder", "{messages}")
])


def human_fn(state):
    ## Simple function to get user input and publish it to LangGraph message buffer
    return {"messages": ("human", input("[Human]"))} ## Adds to the message buffer associated with current thread


async def assistant_fn(state, config: RunnableConfig, **kwargs):
    ## Agent response function, which prompts the LLM with the message buffer and writes the results
    chain = chat_prompt | llm | StrOutputParser()
    response = await chain.ainvoke(state, config)    ## Config has callbacks which intercept stream generation
    return {"messages": [("ai", response)]}          ## Adds to the message buffer associated with current thread


app, memory, config = create_graph(
    ## These are the functions we want to include in our graph.
    nodes = [
        ("assistant", assistant_fn), 
        ("human", human_fn)
    ],
    ## These are the edges that connect our nodes to define agentic flow.
    edges = [
        (START, "human"),
        ("human", "assistant"),
        ("assistant", END),
    ]
)

要从这个编译的图中流式传输，您可以*大致*将其用作一个运行时，并进行一些额外的配置和选项：

In [None]:
## Invoking from compiled LG app
output = await app.ainvoke({"messages": []}, config=config)
print(output.get("messages")[-1].content)

## Streaming from compiled LG app
async for msg, meta in app.astream({"messages": []}, stream_mode="messages", config=config):
    print(msg.content, end="")

**在您的探索中，请注意以下功能:**
- 编译的图保持了对话历史！这是因为我们有一个**检查点（checkpointer）**（在图编译时指定），它在后台跟踪线程为“42”的对话（由我们的配置决定的）。请注意，检查点对于涉及回溯、归档、人工参与等的集成也非常有用。
- 您会注意到元数据中有一些不错的缓冲信息，这对于输出处理、过滤、归档等可能很有帮助。

<hr>
<br>

### **7.5.2：重现我们的 ReAct 循环**

在之前的 notebook 中，我们使用状态字典的自定义缓冲协议、while 循环和在响应未调用工具时触发的中断条件实现了基本的 ReAct 循环。在 LangGraph 中，这是一种常见的范式，通常用以下图形表示：

> <div><img src="imgs/lg_react.png" width="600"/></div>
>
> **来源: [带结构化输出的 ReAct 智能体 | LangGraph 使用指南](https://langchain-ai.github.io/langgraph/how-tos/react-agent-structured-output/)**

浏览这些资源时，您会发现多种实现方式，它们协同作用于节点逻辑、边缘逻辑和后处理逻辑，以构建一个连贯的流式系统。

为了补充这些范式，我们可以将之前的想法在这里重新应用，并做一些关键修改：
- 为了与我们之前的 while 循环方法保持一致，提供了一个集成的 `react` 示例。
- 由于 while 循环方法通过合并可能合理的中间状态节点，而失去灵活性，所以提供了一个更模块化的`智能体 + 工具`选项，推荐您使用。
- 为了避免重新指定流式处理程序，定义了一个轻量级的流式函数。

在这个单元之后，您将不需要再次实现这些组件；只需将它们引入并参数化以适应您的需求。

In [None]:
from langchain.tools import tool
from typing import Literal
from langgraph.prebuilt import ToolNode
from langchain_core.messages import ToolMessage
from functools import partial

################################################################################################
# ## Combined agent + tools. Less flexible
# async def react_fn(state, config: RunnableConfig, llm = conv_llm, tool_node = None, **kwargs):
#     chain = chat_prompt | llm.bind(config=config)
#     out_msgs = []
#     while True:
#         new_state = {**state, "messages": state.get("messages") + out_msgs}
#         response = await chain.ainvoke(new_state)
#         out_msgs += [response]
#         if response.tool_calls:
#             out_msgs += [
#                 f"\n<RESULT>\n{result}\n</RESULT>" 
#                 for result in tool_node.invoke({"messages": [response]})["messages"]
#             ]
#         else: 
#             break
#     return {"messages": out_msgs}
################################################################################################

async def set_directive_fn(state, config: RunnableConfig):
    return {"directives": [state.get("messages")[-1]]}
    

async def agent_fn(
    state, config: RunnableConfig, 
    llm = conv_llm, chat_prompt = chat_prompt, **kwargs
):
    chain = chat_prompt | llm
    response = await chain.ainvoke(state, config=config)
    ## This invocation makes a new message, so this return is an appending of a new message
    return {"messages": [response]}

    
async def tools_fn(
    state, config: RunnableConfig, 
    tool_node = (lambda x: x), **kwargs
):
    last_msg = state.get("messages")[-1]
    if last_msg.tool_calls:
        results = tool_node.invoke({"messages": [last_msg]})["messages"]
        for result in results:
            last_msg.content += f"\n<RESULT>{result.content}</RESULT>"

    directive = state.get("directives")[-1].content
    new_msgs = [last_msg, (
        "human", f"Great! Now continue responding to the original user directive: {directive}."
            " You've executed at least one tool, so continue your thought process. DO NOT redo any past processes."
    )]
    return {"messages": new_msgs}

################################################################################################

def loop_or_end(state: Literal["loop", "end"], config: RunnableConfig):
    ## Return the state to route to based on whether a tool is called
    return "loop" if state.get("messages")[-1].tool_calls else "end"

app, memory, config = create_graph(
    nodes = [
        ("enter", set_directive_fn), 
        ("agent", agent_fn), 
        ("tools", tools_fn), 
        # ("react", react_fn), 
    ],
    edges = [
        (START, "enter"),
        ("enter", "agent"),
        ("tools", "agent"),
        # (START, "react"), ("react", END),
    ],
    conditional_edges = [
        ("agent", loop_or_end, {"loop": "tools", "end": END})
    ]
)

################################################################################################

async def stream_response(
    new_message,
    app, config,
    print_stream=False,  ## If true, print messages from buffer. Otherwise, just prints tokens. 
    truncate=200,        ## Maximum length to give to each streamed value
    show_meta=True,      ## Whether to show message metadata i.e. buffer, producing node, etc.
    silences_nodes=[]    ## Nodes whos' results you don't want to see
):
    buffers = {}
    new_messages = {"messages": [("human", new_message)]}
    async for msg, meta in app.astream(new_messages, stream_mode="messages", config=config):
        if meta.get("langgraph_node") in silences_nodes: continue
        if msg.id not in buffers:
            delim = "*" * 84
            print(f"\n\n{delim}\n** Found {msg.__class__.__name__} with id {msg.id}\n{delim}")
            if show_meta: print(f"{meta}\n{delim}")
        buffers[msg.id] = msg if not buffers.get(msg.id) else (buffers.get(msg.id) + msg)
        if print_stream: 
            print(repr(msg) if not truncate else str(repr(msg))[:truncate])
        elif not isinstance(msg, ToolMessage):
            print(msg.content, end="")

################################################################################################

await stream_response(
    input("[Human]"), 
    app, config, 
    print_stream=True
)

<hr>
<br>

### **7.5.3：为我们的智能体装备工具**

鉴于我们所有的构建模块，现在可以在不需要太多代码的情况下创建一个初始的工具化 LLM 智能体。为了举例说明一个入门智能体，我们将提供一个简单但强大的工具：`read_notebook`。这将允许智能体根据需要丰富其上下文，获取 notebook 的完整内容。

In [None]:
from functools import partial
from typing import Literal
from chatbot.jupyter_tools import FileLister

@tool
def read_notebook(
    filename: str, 
) -> str:
    """Displays a file to yourself and the end-user. These files are long, so only use it as a last resort."""
    return FileLister().to_string(files=[filename], workdir=".")

## Advanced Note: The schema can be strategically modified to tell the server how to grammar enforce
## In this case, specifying the finite options for the files. 
## To discover this, try type-hinting filename: Literal["file1", "file2"] and printing schema
read_notebook.args_schema.schema()["properties"]["filename"]["enum"] = filenames

################################################################################################

toolset = [read_notebook]
tooled_agent_fn = partial(agent_fn, llm = conv_llm.bind_tools(toolset), chat_prompt = chat_prompt)
tooled_tools_fn = partial(tools_fn, tool_node = ToolNode(toolset))

################################################################################################

app, memory, config = create_graph(
    nodes = [
        ("enter", set_directive_fn), 
        ("agent", tooled_agent_fn), 
        ("tools", tooled_tools_fn), 
    ],
    edges = [
        (START, "enter"),
        ("enter", "agent"),
        ("tools", "agent"),
    ],
    conditional_edges = [
        ("agent", loop_or_end, {"loop": "tools", "end": END})
    ],
    plot=False,
)

question = "Give me an interesting code snippet from Notebook 5."
question = "Show me how the notebook explains diffusion. I believe it's part of the multimodal section."
await stream_response(question, app, config, print_stream=False, show_meta=False)

虽然这很有趣，但请注意，单靠这种策略会导致上下文超载。不过，这是朝着智能体方向迈出的坚实一步，并为构建有趣的状态管理应用提供了良好的实践参考。

<hr>
<br>

### **7.5.4：继续使用 LangGraph？**

在这门课之后，我们希望您能继续应用生成式 AI 和智能体范式，打造出惊人且具有影响力的系统！虽然我们没有在 LangGraph 上花太多时间，但非常鼓励您尝试阅读一些[**教程**](https://langchain-ai.github.io/langgraph/tutorials/)，并关注新出现的有趣智能体范式！

<br>
<hr>