<br>
<a href="https://www.nvidia.cn/training/">
    <div style="width: 55%; background-color: white; margin-top: 50px;">
    <img src="https://dli-lms.s3.amazonaws.com/assets/general/nvidia-logo.png"
         width="400"
         height="186"
         style="margin: 0px -25px -5px; width: 300px"/>
</a>
<h1 style="line-height: 1.4;"><font color="#76b900"><b>使用大语言模型（LLM）构建 AI 智能体程序</h1>
<h2><b>评估热身：</b> 创建一个基本的检索节点</h2>
<br>

我们现在已经了解了 ReAct 这个概念，能够构建一个展示此属性的玩具系统。不过，把它当作理想范式不一定总是正确的。它非常灵活，也的确有其用途。当通过一个强大的 LLM 进行组织时，这种循环可以持续相当长的时间，因为工具可以用来隐藏主循环中的细节。将这个系统与某些上下文重归类的步骤整合，理论上您可以无限制地进行下去。

从实现的角度来看，创建一个连贯的系统其实原则上相当简单。这是个不错的练习，但在本课程中不值得花费精力来构建，因为它没有展示任何新功能：

> **提示：** 这是来自第三部分的智能体循环，但 LLM 绑定到调用功能，停止条件是未调用任何工具，并且需要一些努力来确保工具响应确实有助于强有力的提示策略。

这个范式非常适合***横向智能体（horizontal agents）***和***主管风格节点（supervisor-style nodes）***，您可以不断地将更多的功能（“与环境交互的方式”）扔给 LLM，并期望它会有所收获。因此，这种模式在更强的 LLM 上表现更佳，因为很容易“让它好用”。

在这个 Notebook 中，我们将尝试实现一个***工具型智能体***系统，针对特定问题进行调整，旨在将其运行时的细节隐藏于任何可能监督它的主要事件循环（即用户，整体的 ReAct 循环，或其它一些主管等）之外。在此过程中，我们将重新发现 RAG 课程中的一些接口，同时将它们重新融入到 LangGraph 工作流中。

**这个练习旨在为评估做准备！**

In [None]:
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_nvidia import ChatNVIDIA

from transformers import PreTrainedTokenizerFast
llama_tokenizer = PreTrainedTokenizerFast(tokenizer_file="tokenizer.json", clean_up_tokenization_spaces=True)
def token_len(text):
    return len(llama_tokenizer.encode(text=text))

# !pip install --upgrade langgraph colorama
llm = ChatNVIDIA(base_url="http://nim-llm:8000/v1", model="meta/llama-3.1-8b-instruct")

<hr><br>

## **第一部分：** 引入一些基本样板

首先引入我们可靠的基本规范，用于构建一个简单的多轮系统。为了使这个及后续过程更容易，我们将切换使用完全基于命令的路由方案，并尝试在需要整合时复用组件。

In [None]:
import uuid
from typing import Annotated, Optional
from typing_extensions import TypedDict

from langgraph.checkpoint.memory import MemorySaver
from langgraph.constants import START, END
from langgraph.graph import StateGraph
from langgraph.types import interrupt, Command
from langgraph.graph.message import add_messages
from functools import partial
from colorama import Fore, Style
from copy import deepcopy
import operator
from course_utils import stream_from_app

##################################################################

class State(TypedDict):
    messages: Annotated[list, add_messages]
    
##################################################################

def user(state: State):
    update = {"messages": [("user", interrupt("[User]:"))]}
    return Command(update=update, goto="agent")
    
def agent(state: State, config=None):
    update = {"messages": [llm.invoke(state.get("messages"), config=config)]}
    if "stop" in state.get("messages")[-1].content: 
        return update
    return Command(update=update, goto="start")
    
##################################################################

builder = StateGraph(State)
builder.add_node("start", lambda state: {})
builder.add_node("user", user)
builder.add_node("agent", agent)
builder.add_edge(START, "start")
builder.add_edge("start", "user")
app = builder.compile(checkpointer=MemorySaver())
config = {"configurable": {"thread_id": uuid.uuid4()}}
app_stream = partial(app.stream, config=config)

for token in stream_from_app(app_stream, verbose=False, debug=False):
    print(token, end="", flush=True)

<br>

在这个 Notebook 中，我们将把简单的 LangGraph 应用与之前的深度学习培训中心（DLI）讲师的提示词逻辑结合起来。您可能还记得那种实现看起来像下面这样：

In [None]:
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_nvidia import ChatNVIDIA
from langchain_openai import ChatOpenAI
from functools import partial

## Back-and-forth loop
core_prompt = ChatPromptTemplate.from_messages([
    ("system",
         "You are a helpful instructor assistant for NVIDIA Deep Learning Institute (DLI). "
         " Please help to answer user questions about the course. The first message is your context."
         " Restart from there, and strongly rely on it as your knowledge base. Do not refer to your 'context' as 'context'."
    ),
    ("user", "<context>\n{context}</context>"),
    ("ai", "Thank you. I will not restart the conversation and will abide by the context."),
    ("placeholder", "{messages}")
])

## Am LCEL chain to pass into chat_with_generator
chat_chain = core_prompt | llm | StrOutputParser()

with open("simple_long_context.txt", "r") as f:
    full_context = f.read()

long_context_state = {
    "messages": [],
    "context": full_context,
}

from course_utils import chat_with_chain

chat = partial(chat_with_chain, chain=chat_chain)
chat(long_context_state)

<br>

您可能还会记得，这个系统一次只能处理几个问题，因为上下文长度很快就会超出部署模型的限制。我们会在这个练习中尝试修复这个问题！

<hr><br>

## **第二部分：** 过滤细节

理解到内容聊天机器人上下文长度限制太小，您可能会觉得有必要再进一步精炼，甚至找到更小的上下文，但我们的当前条目已经相对较短了。

In [None]:
from langchain_core.documents import Document

context_entries = full_context.split("\n\n")
context_docs = [Document(page_content=entry) for entry in context_entries if len(entry.split("\n")) > 2]
context_lens = [token_len(d.page_content) for d in context_docs]
print(f"Context Token Length: {sum(context_lens)} ({sum(context_lens)/len(context_lens):.2f} * {len(context_lens)})")
print(f"Document Token Range: [{min(context_lens)}, {max(context_lens)}]")

或许可以调用一些启发式的方法来帮助我们知道在任何给定问题上集中注意哪些内容。幸运的是，有几种可行的启发式方法，形式为**嵌入模型**！这些在其它课程中已经详细讲解过，这里给个简要概述：

**与其从另一个序列进行*自回归*的输出作为响应/续篇，不如用编码器将序列嵌入到每个 token 的嵌入中，采用一个子集（零次条目 zero-th entry、子集、整个序列）作为输入的语义编码。** 让我们看看可以使用哪些模型选项。

- 一个**重排序模型**默认情况下按相关性对一组文档对进行排序。此类模型通常使用*交叉编码器*实现，能够同时将两个序列作为输入，直接预测一个相关性得分，并积极考虑两个序列。
- 一个**嵌入模型**默认情况下将文档嵌入到语义嵌入空间。此类模型通常使用*双编码器*实现，逐个处理一个序列以生成嵌入。然而，可以使用某些相似性度量（即余弦相似度）比较两个嵌入条目。

这两种模型理论上都可以用于检索，所以让我们尝试这两种选项！

In [None]:
from langchain_nvidia import NVIDIAEmbeddings
from langchain.vectorstores import FAISS

## First, we can try out the embedding model, which is commonly used by first constructing a vectorstore.
## - Pros: If you have m documents and n queries, you need n inference-time embeddings and m*n similarity comparisons. 
## - Cons: Prediction of d_i sim q_j uses learned embeddings Emb_D(d_i) and Emb_Q(q_i),
##         not a joint learned representation Emb(d_i, q_j). In other words, somewhat less accurate.

question = "Can you tell me about multi-turn agents?"

embed_d = NVIDIAEmbeddings(model="nvidia/nv-embedqa-e5-v5", base_url='http://nim-embedding:8000/v1', truncate='END', max_batch_size=128)
embed_q = NVIDIAEmbeddings(model="nvidia/nv-embedqa-e5-v5", base_url='http://nim-embedding:8000/v1', truncate='END', max_batch_size=128) ## Not necessary
vectorstore = FAISS.from_documents(context_docs, embed_d)
vectorstore.embedding_function = embed_q
retriever = vectorstore.as_retriever()
%time retriever.invoke(question, k=5)
# %time retriever.invoke(question, k=1)

In [None]:
from langchain_nvidia import NVIDIARerank

## Next, we can try out the reranking model, which is queried directly to get predicted relevance scores.
## - Pros: Literally predicts Emb(d_i, q_i), so better joint relationships can be learned. 
## - Cons: If you have m documents and n queries, you need n*m inference-time embeddings. 

question = "Can you tell me about multi-turn agents?"

reranker = NVIDIARerank(model="nvidia/nv-rerankqa-mistral-4b-v3", base_url='http://nim-ranking:8000/v1', top_n=5, max_batch_size=128)
%time reranker.compress_documents(context_docs, question)

In [None]:
# reranker._client.last_inputs
# reranker._client.last_response.json()
# embed_d._client.last_inputs
# embed_d._client.last_response.json()
# embed_q._client.last_inputs
# embed_q._client.last_response.json()

<br>

如我们所见，这个过程在识别相似性方面非常快速，并且在这个小数据池中产生了相当不错的排名！更一般来说：
- **在处理小值池时，更偏好使用重排序模型，** 因为它利用联合条件。
- **在处理大文档池时，更偏好使用嵌入模型，** 因为我们可以将大部分嵌入负担转移到预处理阶段。

对于我们的有限用例，选择实际上没什么大的区别，您可以自由选择任何让您觉得最吸引的选项。话虽如此，接下来请定义一个 `retrieve` 函数来抽象这个决策。此外，为了简化我们后续的处理，尽量只返回最终的字符串内容，这样后面就能降低一些问题的麻烦。

In [None]:
def retrieve_via_query(query: str, k=5):
    reranker = NVIDIARerank(model="nvidia/nv-rerankqa-mistral-4b-v3", base_url='http://nim-ranking:8000/v1', top_n=k, max_batch_size=128)
    rets = reranker.compress_documents(context_docs, query)
    return [entry.page_content for entry in rets]

retrieve_via_query(question)

<br>

接下来，我们可以把它做成一个“模式函数”、“工具”或“节点”，之间的主要区别如下：
- **模式函数**可以绑定到一个 LLM，强制输出符合模式。
- **工具**也是一种模式函数，但它是隐式定义的（即输入的结构从签名中推断），更容易放入工具库。
- **节点**在图的状态缓冲区上进行操作并写入，因此它应接受 `state` + `config`，对状态变量进行操作，并输出一个状态缓冲区修改请求。

在这个练习中，我们实际上会将检索作为“检索”智能体的常开功能，因此我们可以跳过前两个，直接创建我们的节点函数。我们假设：
- 我们希望节点在用户提交消息后对之前的消息进行检索（即我们希望在用户发送任何内容后进行检索）。
- 我们希望将检索结果写入状态缓冲区中的一个值 `context`，以便下一个节点（生成 LLM）能够使用这个上下文。
    - 我们希望逐渐积累 `context`，包括所有相关的检索结果。这样，我们就可以将检索信息放入系统消息中，并持续影响所有后续的输出。这意味着我们想要将值存储在一个集合中...

In [None]:
def retrieval_node(state: State, config=None, out_key="context"):
    ## NOTE: Very Naive; Assumes user question is a good query
    ret = retrieve_via_query(state.get("messages")[-1].content, k=3)
    return {out_key: set(ret)}

## After we define the node, we can assess whether or not it would work.

## Given an initial empty state...
state = {
    "messages": [], 
    "context": set(),
}

## Given an update rule explaining how to handle state updates...
add_sets = (lambda x,y: x.union(y))

## Will the continued accumulation of messages, followed by a continued accumulation of retrievals, function properly?
state["messages"] = add_messages(state["messages"], [("user", "Can you tell me about agents?")])
state["context"] = add_sets(state["context"],  retrieval_node(state)["context"])
print(f"Retriever: {state['context']} ({len(state['context'])})")

state["messages"] = add_messages(state["messages"], [("user", "How about earth simulations?")])
state["context"] = add_sets(state["context"],  retrieval_node(state)["context"])
print(f"\nContext: {state['context']} ({len(state['context'])})")

state["messages"] = add_messages(state["messages"], [("user", "How about earthly agents?")])
state["context"] = add_sets(state["context"],  retrieval_node(state)["context"])
print(f"\nContext: {state['context']} ({len(state['context'])})")

<hr><br>

## **第三部分：** 将检索添加到我们的图中 

现在我们有了一个适用的基节点假设，现在将其整合到之前的对话循环中，看看效果如何。

In [None]:
import uuid
from typing import Annotated, Optional
from typing_extensions import TypedDict

from langgraph.checkpoint.memory import MemorySaver
from langgraph.constants import START, END
from langgraph.graph import StateGraph
from langgraph.types import interrupt, Command
from langgraph.graph.message import add_messages
from functools import partial
from colorama import Fore, Style
from copy import deepcopy
import operator

##################################################################
## Define the authoritative state system (environment) for your use-case

class State(TypedDict):
    """The Graph State for your Agent System"""
    messages: Annotated[list, add_messages]
    context: Annotated[set, (lambda x,y: x.union(y))]

agent_prompt = ChatPromptTemplate.from_messages([
    ("system",
         "You are a helpful instructor assistant for NVIDIA Deep Learning Institute (DLI). "
         " Please help to answer user questions about the course. The first message is your context."
         " Restart from there, and strongly rely on it as your knowledge base. Do not refer to your 'context' as 'context'."
    ),
    ("user", "<context>\n{context}</context>"),
    ("ai", "Thank you. I will not restart the conversation and will abide by the context."),
    ("placeholder", "{messages}")
])
    
##################################################################

def user(state: State):
    update = {"messages": [("user", interrupt("[User]:"))]}
    return Command(update=update, goto="retrieval_router")

## TODO: Add the retrieval between user and agent
def retrieval_router(state: State):
    return Command(update=retrieval_node(state), goto="agent")
    
def agent(state: State, config=None):
    update = {"messages": [(agent_prompt | llm).invoke(state, config=config)]}
    if "stop" in state.get("messages")[-1].content: 
        return update
    return Command(update=update, goto="start")
    
##################################################################

builder = StateGraph(State)
builder.add_node("start", lambda state: {})
builder.add_node("user", user)
## TODO: Register the new router to the nodepool
builder.add_node("retrieval_router", retrieval_router)
builder.add_node("agent", agent)
builder.add_edge(START, "start")
builder.add_edge("start", "user")
app = builder.compile(checkpointer=MemorySaver())
config = {"configurable": {"thread_id": uuid.uuid4()}}
app_stream = partial(app.stream, config=config)

for token in stream_from_app(app_stream, verbose=False, debug=False):
    print(token, end="", flush=True)import uuid
from typing import Annotated, Optional
from typing_extensions import TypedDict

from langgraph.checkpoint.memory import MemorySaver
from langgraph.constants import START, END
from langgraph.graph import StateGraph
from langgraph.types import interrupt, Command
from langgraph.graph.message import add_messages
from functools import partial
from colorama import Fore, Style
from copy import deepcopy
import operator

##################################################################
## Define the authoritative state system (environment) for your use-case

class State(TypedDict):
    """The Graph State for your Agent System"""
    messages: Annotated[list, add_messages]
    context: Annotated[set, (lambda x,y: x.union(y))]

agent_prompt = ChatPromptTemplate.from_messages([
    ("system",
         "You are a helpful instructor assistant for NVIDIA Deep Learning Institute (DLI). "
         " Please help to answer user questions about the course. The first message is your context."
         " Restart from there, and strongly rely on it as your knowledge base. Do not refer to your 'context' as 'context'."
    ),
    ("user", "<context>\n{context}</context>"),
    ("ai", "Thank you. I will not restart the conversation and will abide by the context."),
    ("placeholder", "{messages}")
])
    
##################################################################

def user(state: State):
    update = {"messages": [("user", interrupt("[User]:"))]}
    return Command(update=update, goto="retrieval_router")

## TODO: Add the retrieval between user and agent
def retrieval_router(state: State):
    return Command(update=retrieval_node(state), goto="agent")
    
def agent(state: State, config=None):
    update = {"messages": [(agent_prompt | llm).invoke(state, config=config)]}
    if "stop" in state.get("messages")[-1].content: 
        return update
    return Command(update=update, goto="start")
    
##################################################################

builder = StateGraph(State)
builder.add_node("start", lambda state: {})
builder.add_node("user", user)
## TODO: Register the new router to the nodepool
builder.add_node("retrieval_router", retrieval_router)
builder.add_node("agent", agent)
builder.add_edge(START, "start")
builder.add_edge("start", "user")
app = builder.compile(checkpointer=MemorySaver())
config = {"configurable": {"thread_id": uuid.uuid4()}}
app_stream = partial(app.stream, config=config)

for token in stream_from_app(app_stream, verbose=False, debug=False):
    print(token, end="", flush=True)

<hr>

如您所见，将其与这个图形系统的定义集成并不困难。我们现在有了一个“常开”的检索系统，它会简单地获取我们的最后一条消息，并为我们的查询检索最相关的资源……理论上是这样的。不过，如果您稍微玩一玩，就会发现原始输入可能并不是最优的，所以大多数设置都喜欢首先将输入重新表述成嵌入模型的标准输入形式……但这会引入延迟，增加首次输出的时间，从而降低我们系统的响应速度。



<br><hr>

## **第四部分：** 添加“深思熟虑（Think Deeper）”机制

这一部分会借鉴一些 ReAct 的灵感，为系统提供多个深度思考的层次。由于之前的检索过程相对轻量，足以满足大多数用例，所以我们就保留它。但会添加一个更严格的思考过程，强制执行**查询细化**和**网络搜索**作为执行的一部分。

这种机制通常被称为“反思”机制，因为它能评估 LLM 的输出并尝试纠正执行流程。它主要的思路是，验证一个输出是否合理比第一次生成输出更容易。

我们可以通过一个结构化的输出架构实现查询逻辑，下面就试一试：

In [None]:
from course_utils import SCHEMA_HINT
from langchain_core.output_parsers import PydanticOutputParser
from pydantic import BaseModel, Field
from typing import List, Dict

class Queries(BaseModel):
    """Queries to help you research across semantic and web resources for more information. Specifically focus on the most recent question."""
    big_questions: List[str] = Field(description="Outstanding questions that need research, in natural language")
    semantic_queries: List[str] = Field(description="Questions (3 or more) to ask an expert to get more info to help, expressed in different ways.")
    web_search_queries: List[str] = Field(description="Questions (3 or more) that will be sent over to a web-based search engine to gather info.")

def query_node(state: State):
    if not state.get("messages"): return {"queries": []}
    chat_msgs = [
        ("system", SCHEMA_HINT.format(schema_hint = Queries.model_json_schema())),
        ("user", "Corrent Conversation:\n" + "\n\n".join([f"[{msg.type}] {msg.content}" for msg in state.get("messages")])),
    ]
    schema_llm = llm.with_structured_output(schema=Queries.model_json_schema(), strict=True)
    response = Queries(**schema_llm.invoke(chat_msgs))
    return {"queries": [response]}

add_queries = (lambda l,x: l+x) 

state = {
    "messages": [], 
    "queries": [],
}
state["messages"] = add_messages(state["messages"], [("user", "Can you tell me about agents?")])
state["queries"] = add_queries(state["queries"],  query_node(state)["queries"])
print("Queries:", state["queries"])

state["messages"] = add_messages(state["messages"], [("user", "How about earth simulations?")])
state["queries"] = add_queries(state["queries"],  query_node(state)["queries"])
print("\nQueries:", state["queries"])

<br>

现在就来实际满足这些请求，所以我们引入本 Notebook 的检索函数和之前 Notebook 的 DuckDuckGo 搜索工具，来真正满足这些请求。

In [None]:
## HINT: You can paste the retrieval node and search tools directly and just resolve them in fulfill_query

# @tool
# def search_internet(user_question: List[str], context: List[str], final_query: str):
#     """Search the internet for answers. Powered by search engine, in Google search format."""
#     from langchain_community.utilities import DuckDuckGoSearchAPIWrapper
#     return DuckDuckGoSearchAPIWrapper(backend="html").results(final_query, max_results=10, source="text")

# def retrieval_node(state: State, config=None, out_key="context"):
#     ## NOTE: Very Naive; Assumes user question is a good query
#     ret = retrieve_via_query(get_nth_message(state, n=-1), k=3)
#     return {out_key: set(ret)}

def fulfill_queries(queries: Queries, verbose=False):
    # big_questions: List[str]
    # semantic_queries: List[str]
    # web_search_queries: List[str]
    from langchain_community.utilities import DuckDuckGoSearchAPIWrapper
    web_queries = queries.web_search_queries + queries.big_questions
    sem_queries = queries.semantic_queries + queries.big_questions
    # if verbose: print(f"Querying for retrievals via {web_queries = } and {sem_queries = }")
    web_ret_fn = lambda q: [
        str(f"{v.get('snippet')} [Snippet found from '{v.get('title')}' ({v.get('link')})]") 
        for v in DuckDuckGoSearchAPIWrapper(backend="html").results(q, max_results=4, source="text")
    ]
    sem_ret_fn = retrieve_via_query
    web_retrievals = [web_ret_fn(web_query) for web_query in web_queries]
    sem_retrievals = [sem_ret_fn(sem_query) for sem_query in sem_queries]
    # if verbose: print(f"Generated retrievals: {web_retrievals = } and {sem_retrievals = }")
    return set(sum(web_retrievals + sem_retrievals, []))

retrievals = set()
new_rets = fulfill_queries(state["queries"][0], verbose=True)
retrievals = retrievals.union(new_rets)
print(f"Retrieved {len(new_rets)} chunks from the internet and the knowledge base")
new_rets

<br>

完美！我们现在有了一个无法使用的超长上下文，虽然确实考虑得更周到，但长度几乎让人难以处理。幸运的是，有一种非常简化的方法可以通过我们的检索系统来子集化，如果能再进一步抽象化一点就好了。

在接下来的单元中，请实现一个 `format_retrieval` 函数，以创建系统的实际上下文。

In [None]:
def filter_retrieval(
    queries: Queries, 
    new_retrievals: list[str], 
    existing_retrievals: set[str] = set(), 
    k=5
):
    # big_questions: List[str]
    # semantic_queries: List[str]
    # web_search_queries: List[str]
    reranker = NVIDIARerank(model="nvidia/nv-rerankqa-mistral-4b-v3", base_url='http://nim-ranking:8000/v1', top_n=(k + len(existing_retrievals)), max_batch_size=128)
    docs = [Document(page_content = ret) for ret in new_retrievals]
    rets = reranker.compress_documents(docs, "\n".join(queries.big_questions))
    return [entry.page_content for entry in rets if entry.page_content not in existing_retrievals][:k]

filtered_retrieval = filter_retrieval(state["queries"][0], new_rets)
filtered_retrieval

<br>

为了总结所有内容，请继续发出一个统一的节点调用，将这个过程纳入常规执行中，最好是在最终生成的新检索结果之前，不要写入状态缓冲区。

**我们将最后的组合留作练习，但对感兴趣的人员提供了解决方案。**毕竟，这也应该是评估的准备。

<details>
    <summary><b>提示：</b></summary>
    <code>retrieval_router</code> 目前手动注入了一个上下文为 ""，可能只需用最少的包装运行我们的检索函数就可以了？  
</details>

<details>
    <summary><b>参考答案：</b></summary>

```python
## TODO: Add the retrieval between user and agent
def retrieval_router(state: State):
    return Command(update=retrieval_node(state), goto="agent")

def retrieval_node(state: State, config=None, out_key="context"):
    ## NOTE: Very Naive; Assumes user question is a good query
    ret = retrieve_via_query(state.get("messages")[-1].content, k=3)
    return {out_key: set(ret)}
```

</details>

<hr><br>

### **第五部分：** 对这次练习的反思

就这样，我们有了一个类似 ReAct 风格的循环，虽然能力有限。尽管这不是一种“工具池”方式，但肯定是一个具备内置路由的“反思系统”。它也并不是真正的“深度研究者”，因为它并没有阅读文章的完整内容，也无法进一步扩展材料，但确实展示了非常基础的检索简化，从而能够实现更长的交流窗口。

**在下一部分，准备好尝试评估，您将基于本 Notebook 中呈现的技术实现推理和搜索功能！**