# 使用 Profile Schema 的 Chatbot

## 回顾

我们介绍了 [LangGraph Memory Store](https://langchain-ai.github.io/langgraph/reference/store/#langgraph.store.base.BaseStore),这是一种保存和检索长期记忆的方式。

我们构建了一个简单的 chatbot,它同时使用 `短期记忆(线程内)` 和 `长期记忆(跨线程)`。

它将长期 [语义记忆](https://langchain-ai.github.io/langgraph/concepts/memory/#semantic-memory)(关于用户的事实)保存在["热路径"](https://langchain-ai.github.io/langgraph/concepts/memory/#writing-memories)中,即在用户与其聊天时实时保存。

## 目标

我们的 chatbot 将记忆保存为字符串。在实践中,我们通常希望记忆具有结构化的形式。
 
例如,记忆可以是一个 [单一的、持续更新的 schema]((https://langchain-ai.github.io/langgraph/concepts/memory/#profile))。
 
在我们的案例中,我们希望这是一个单一的用户 profile。
 
我们将扩展我们的 chatbot,将语义记忆保存到单一的 [用户 profile](https://langchain-ai.github.io/langgraph/concepts/memory/#profile) 中。

我们还将介绍一个库,[Trustcall](https://github.com/hinthornw/trustcall),用于使用新信息更新这个 schema。

In [7]:
%%capture --no-stderr
%pip install -U langchain_openai langgraph trustcall langchain_core

In [None]:
import os, getpass

def _set_env(var: str):
    """
    辅助函数 - 设置环境变量
    
    Python 知识点:
    - os.environ: Python 的环境变量字典,可以读取和设置系统环境变量
    - getpass.getpass(): 安全地提示用户输入密码,输入时不会显示在屏幕上
    
    工作流程:
        1. 检查环境变量是否已设置
        2. 如果未设置,提示用户输入
        3. 将值设置到当前进程的环境变量中
    """
    # 检查环境变量是否已在 OS 环境中设置
    env_value = os.environ.get(var)
    if not env_value:
        # 如果未设置,提示用户输入
        env_value = getpass.getpass(f"{var}: ")
    
    # 为当前进程设置环境变量
    os.environ[var] = env_value

# ================== LangSmith 配置 ==================
# LangSmith: LangChain 的追踪和调试平台
# 用于监控和调试 LangChain 应用的运行情况

# 设置 LangSmith API 密钥
_set_env("LANGSMITH_API_KEY")

# 启用 LangSmith 追踪功能
os.environ["LANGSMITH_TRACING"] = "true"

# 设置 LangSmith 项目名称,所有追踪数据将保存到此项目
os.environ["LANGSMITH_PROJECT"] = "langchain-academy"

## 定义用户 profile schema

Python 有许多不同的 [结构化数据](https://python.langchain.com/docs/concepts/structured_outputs/#schema-definition) 类型,例如 TypedDict、Dictionaries、JSON 和 [Pydantic](https://docs.pydantic.dev/latest/)。

让我们从使用 TypedDict 定义用户 profile schema 开始。

In [None]:
from typing import TypedDict, List

# ================== 定义用户 Profile Schema ==================

class UserProfile(TypedDict):
    """
    用户 profile schema - 使用 TypedDict 定义
    
    Python 知识点:
    - TypedDict: Python 3.8+ 引入的类型提示工具
    - 它定义了字典的结构,指定每个键的类型
    - 与普通 dict 不同,TypedDict 提供类型检查支持
    - 运行时仍然是普通字典,但 IDE 和类型检查器可以验证类型
    
    为什么使用 TypedDict:
    - 轻量级:比 Pydantic 更简单,适合简单的数据结构
    - 类型安全:提供类型提示,帮助捕获错误
    - 兼容性:可以直接作为普通字典使用
    
    Schema 字段说明:
    - user_name: 用户的首选名称
    - interests: 用户的兴趣爱好列表
    """
    user_name: str  # 用户的首选名称
    interests: List[str]  # 用户的兴趣爱好列表

## 将 schema 保存到 store

[LangGraph Store](https://langchain-ai.github.io/langgraph/reference/store/#langgraph.store.base.BaseStore) 接受任何 Python 字典作为 `value`。

In [None]:
# ================== 创建 TypedDict 实例 ==================

# Python 知识点:
# - TypedDict 实例实际上就是普通的 Python 字典
# - 但类型检查器会验证字典的结构是否符合 TypedDict 定义
# - 运行时没有额外的开销,只是提供了类型安全

# 创建符合 UserProfile schema 的字典实例
user_profile: UserProfile = {
    "user_name": "Lance",  # 字符串类型
    "interests": ["biking", "technology", "coffee"]  # 字符串列表
}

# 显示创建的 profile
user_profile

我们使用 [put](https://langchain-ai.github.io/langgraph/reference/store/#langgraph.store.base.BaseStore.put) 方法将 TypedDict 保存到 store。

In [None]:
import uuid
from langgraph.store.memory import InMemoryStore

# ================== 初始化 Store 并保存 Schema ==================

# LangGraph 知识点:
# - InMemoryStore: LangGraph 的内存存储实现
# - 用于保存和检索长期记忆(跨线程)
# - 数据保存在内存中,进程重启后会丢失
# - 生产环境可以替换为持久化的 Store 实现

# 初始化内存 store
in_memory_store = InMemoryStore()

# ================== 定义 Namespace ==================
# Namespace: Store 中的命名空间,用于组织和隔离数据
# 格式: tuple 类型,通常为 (user_id, memory_type)
# 作用: 
# - 为每个用户创建独立的存储空间
# - 避免不同用户的数据混淆

user_id = "1"
namespace_for_memory = (user_id, "memory")

# ================== 保存记忆到 Store ==================
# Store 的 put 方法参数:
# - namespace: 命名空间,用于组织数据
# - key: 记忆的唯一标识符
# - value: 要保存的数据(任何 Python 字典)

key = "user_profile"
value = user_profile

# 将 user_profile 保存到 store
# Python 知识点: TypedDict 实例本质上是普通字典,可以直接保存
in_memory_store.put(namespace_for_memory, key, value)

我们使用 [search](https://langchain-ai.github.io/langgraph/reference/store/#langgraph.store.base.BaseStore.search) 通过 namespace 从 store 检索对象。

In [6]:
# Search 
for m in in_memory_store.search(namespace_for_memory):
    print(m.dict())

{'value': {'user_name': 'Lance', 'interests': ['biking', 'technology', 'coffee']}, 'key': 'user_profile', 'namespace': ['1', 'memory'], 'created_at': '2024-11-04T23:37:34.871675+00:00', 'updated_at': '2024-11-04T23:37:34.871680+00:00'}


我们也可以使用 [get](https://langchain-ai.github.io/langgraph/reference/store/#langgraph.store.base.BaseStore.get) 通过 namespace 和 key 检索特定对象。

In [7]:
# Get the memory by namespace and key
profile = in_memory_store.get(namespace_for_memory, "user_profile")
profile.value

{'user_name': 'Lance', 'interests': ['biking', 'technology', 'coffee']}

## 使用 profile schema 的 Chatbot

现在我们知道如何为记忆指定 schema 并将其保存到 store。

现在,我们如何实际*创建*具有这个特定 schema 的记忆?

在我们的 chatbot 中,我们[希望从用户聊天中创建记忆](https://langchain-ai.github.io/langgraph/concepts/memory/#profile)。

这就是 [structured outputs](https://python.langchain.com/docs/concepts/structured_outputs/#recommended-usage) 概念发挥作用的地方。

LangChain 的 [chat model](https://python.langchain.com/docs/concepts/chat_models/) 接口有一个 [`with_structured_output`](https://python.langchain.com/docs/concepts/structured_outputs/#recommended-usage) 方法来强制执行结构化输出。

当我们想要强制输出符合 schema 时,这很有用,它会为我们解析输出。

In [7]:
_set_env("OPENAI_API_KEY")

让我们将我们创建的 `UserProfile` schema 传递给 `with_structured_output` 方法。

然后我们可以使用一个 [messages](https://python.langchain.com/docs/concepts/messages/) 列表调用 chat model,并获得符合我们 schema 的结构化输出。

In [None]:
from pydantic import BaseModel, Field

from langchain_core.messages import HumanMessage
from langchain_openai import ChatOpenAI

# ================== 使用 with_structured_output ==================

# LangChain 知识点:
# - with_structured_output: LangChain chat model 的方法
# - 作用: 强制 LLM 输出符合指定 schema 的结构化数据
# - 底层: 使用 tool calling 或 JSON mode 实现
# - 优势: 自动解析和验证输出,确保类型安全

# 初始化 model
model = ChatOpenAI(model="gpt-4o", temperature=0)

# 将 schema 绑定到 model
# Python 知识点:
# - TypedDict 可以直接传递给 with_structured_output
# - model_with_structure 是一个新的 Runnable,返回类型为 UserProfile
model_with_structure = model.with_structured_output(UserProfile)

# ================== 调用 Model 生成结构化输出 ==================
# 工作流程:
# 1. 将 HumanMessage 传递给 model
# 2. Model 理解消息内容并提取信息
# 3. Model 生成符合 UserProfile schema 的输出
# 4. with_structured_output 自动解析和验证输出

# 创建 HumanMessage
# LangChain 知识点: HumanMessage 表示用户发送的消息
message = HumanMessage("My name is Lance, I like to bike.")

# 调用 model 并获取结构化输出
# 返回值: 符合 UserProfile schema 的字典
structured_output = model_with_structure.invoke([message])

# 显示结果
structured_output

现在,让我们在我们的 chatbot 中使用它。

这只需要对 `write_memory` 函数进行少量修改。

我们使用如上定义的 `model_with_structure` 来生成符合我们 schema 的 profile。

In [None]:
from IPython.display import Image, display

from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.store.base import BaseStore

from langchain_core.messages import HumanMessage, SystemMessage, AIMessage
from langchain_core.runnables.config import RunnableConfig

# ================== Chatbot 系统提示词 ==================

# Chatbot 指令 - 定义 chatbot 的行为和个性
MODEL_SYSTEM_MESSAGE = """You are a helpful assistant with memory that provides information about the user. 
If you have memory for this user, use it to personalize your responses.
Here is the memory (it may be empty): {memory}"""

# 记忆创建指令 - 指导 LLM 如何从对话中创建/更新记忆
CREATE_MEMORY_INSTRUCTION = """Create or update a user profile memory based on the user's chat history. 
This will be saved for long-term memory. If there is an existing memory, simply update it. 
Here is the existing memory (it may be empty): {memory}"""

# ================== call_model 节点 ==================

def call_model(state: MessagesState, config: RunnableConfig, store: BaseStore):
    """
    主对话节点 - 从 store 加载记忆并生成个性化响应
    
    LangGraph 知识点:
    - 这是 StateGraph 的一个节点函数
    - 节点函数签名: (state, config, store) -> dict
    - 返回的字典会更新 state
    
    工作流程:
        1. 从 config 获取 user_id
        2. 从 store 检索该用户的记忆
        3. 将记忆格式化并注入系统提示
        4. 使用记忆和对话历史生成响应
    
    参数说明:
    - state: MessagesState,包含对话历史 messages
    - config: RunnableConfig,包含配置信息(如 user_id, thread_id)
    - store: BaseStore,长期记忆存储
    """

    # ========== 1. 获取 user_id ==========
    # Python 知识点: config["configurable"] 是一个字典,存储用户自定义配置
    user_id = config["configurable"]["user_id"]

    # ========== 2. 从 store 检索记忆 ==========
    # LangGraph Store 知识点:
    # - namespace: (memory_type, user_id) 组织数据
    # - get(namespace, key): 检索特定记忆
    # - 返回 Item 对象或 None
    namespace = ("memory", user_id)
    existing_memory = store.get(namespace, "user_memory")

    # ========== 3. 格式化记忆 ==========
    # Python 知识点:
    # - existing_memory.value: 获取存储的字典数据
    # - dict.get(key, default): 安全地获取字典值,不存在时返回默认值
    if existing_memory and existing_memory.value:
        memory_dict = existing_memory.value
        # 将字典格式化为人类可读的字符串
        formatted_memory = (
            f"Name: {memory_dict.get('user_name', 'Unknown')}\n"
            f"Interests: {', '.join(memory_dict.get('interests', []))}"
        )
    else:
        # 如果没有记忆,设置为 None
        formatted_memory = None

    # ========== 4. 注入记忆到系统提示 ==========
    # Python 字符串格式化: .format() 方法替换 {memory} 占位符
    system_msg = MODEL_SYSTEM_MESSAGE.format(memory=formatted_memory)

    # ========== 5. 生成响应 ==========
    # LangChain 知识点:
    # - model.invoke(): 调用 LLM 生成响应
    # - 输入: [SystemMessage, ...历史消息]
    # - SystemMessage 在前,提供上下文和指令
    # - state["messages"] 包含完整的对话历史
    response = model.invoke([SystemMessage(content=system_msg)]+state["messages"])

    # ========== 6. 返回更新 ==========
    # LangGraph 知识点:
    # - 返回字典会与 state 合并
    # - {"messages": response} 将新消息添加到 state["messages"]
    return {"messages": response}

# ================== write_memory 节点 ==================

def write_memory(state: MessagesState, config: RunnableConfig, store: BaseStore):
    """
    记忆写入节点 - 反思对话并保存结构化记忆到 store
    
    关键改进:
    - 使用 with_structured_output 强制输出符合 UserProfile schema
    - 从头开始重新生成整个 profile(后续会改进为增量更新)
    
    工作流程:
        1. 从 config 获取 user_id
        2. 从 store 检索现有记忆
        3. 格式化现有记忆为提示
        4. 使用 model_with_structure 生成新的结构化 profile
        5. 覆盖保存到 store
    
    局限性:
    - 每次从头重新生成整个 profile,可能丢失信息
    - 对于大型 profile,浪费 tokens
    - 后续会使用 Trustcall 改进为增量更新
    """
    
    # ========== 1. 获取 user_id ==========
    user_id = config["configurable"]["user_id"]

    # ========== 2. 检索现有记忆 ==========
    namespace = ("memory", user_id)
    existing_memory = store.get(namespace, "user_memory")

    # ========== 3. 格式化现有记忆 ==========
    if existing_memory and existing_memory.value:
        memory_dict = existing_memory.value
        formatted_memory = (
            f"Name: {memory_dict.get('user_name', 'Unknown')}\n"
            f"Interests: {', '.join(memory_dict.get('interests', []))}"
        )
    else:
        formatted_memory = None
        
    # ========== 4. 创建提示 ==========
    system_msg = CREATE_MEMORY_INSTRUCTION.format(memory=formatted_memory)

    # ========== 5. 生成结构化 profile ==========
    # LangChain 知识点:
    # - model_with_structure: 之前用 with_structured_output 绑定的 model
    # - 输入: [SystemMessage, ...对话历史]
    # - 输出: 符合 UserProfile schema 的字典
    # - 自动解析和验证输出
    new_memory = model_with_structure.invoke([SystemMessage(content=system_msg)]+state['messages'])

    # ========== 6. 覆盖保存记忆 ==========
    # Store 知识点:
    # - put(namespace, key, value): 覆盖式保存
    # - 如果 key 已存在,会完全替换旧值
    # - value 必须是 Python 字典
    key = "user_memory"
    store.put(namespace, key, new_memory)

# ================== 构建 Graph ==================

# LangGraph 知识点:
# - StateGraph: 定义状态机的图结构
# - MessagesState: 内置状态类,包含 messages 列表
# - 图由节点(nodes)和边(edges)组成

# 定义图
builder = StateGraph(MessagesState)

# 添加节点
# - call_model: 主对话节点
# - write_memory: 记忆写入节点
builder.add_node("call_model", call_model)
builder.add_node("write_memory", write_memory)

# 添加边
# LangGraph 知识点:
# - add_edge(from, to): 添加有向边
# - START: 图的起始点
# - END: 图的终止点
# - 执行流程: START -> call_model -> write_memory -> END
builder.add_edge(START, "call_model")
builder.add_edge("call_model", "write_memory")
builder.add_edge("write_memory", END)

# ================== 初始化双记忆系统 ==================

# 长期记忆 (跨线程) - 使用 Store
# - 保存用户 profile
# - 不同线程间共享
# - 按 user_id 组织
across_thread_memory = InMemoryStore()

# 短期记忆 (线程内) - 使用 Checkpointer
# - 保存对话历史
# - 每个线程独立
# - 按 thread_id 组织
within_thread_memory = MemorySaver()

# ================== 编译 Graph ==================

# LangGraph 知识点:
# - compile(): 编译图为可执行的 Runnable
# - checkpointer: 启用短期记忆(对话历史)
# - store: 启用长期记忆(用户 profile)
# - 两个记忆系统协同工作,提供完整的记忆功能
graph = builder.compile(checkpointer=within_thread_memory, store=across_thread_memory)

# 可视化图结构
# xray=1: 展开节点内部细节
display(Image(graph.get_graph(xray=1).draw_mermaid_png()))

In [10]:
# We supply a thread ID for short-term (within-thread) memory
# We supply a user ID for long-term (across-thread) memory 
config = {"configurable": {"thread_id": "1", "user_id": "1"}}

# User input 
input_messages = [HumanMessage(content="Hi, my name is Lance and I like to bike around San Francisco and eat at bakeries.")]

# Run the graph
for chunk in graph.stream({"messages": input_messages}, config, stream_mode="values"):
    chunk["messages"][-1].pretty_print()


Hi, my name is Lance and I like to bike around San Francisco and eat at bakeries.

Hi Lance! It's great to meet you. Biking around San Francisco sounds like a fantastic way to explore the city, and there are so many amazing bakeries to try. Do you have any favorite bakeries or biking routes in the city?


让我们检查 store 中的记忆。

我们可以看到记忆是一个符合我们 schema 的字典。

In [11]:
# Namespace for the memory to save
user_id = "1"
namespace = ("memory", user_id)
existing_memory = across_thread_memory.get(namespace, "user_memory")
existing_memory.value

{'user_name': 'Lance', 'interests': ['biking', 'bakeries', 'San Francisco']}

## 什么时候会失败?

[`with_structured_output`](https://python.langchain.com/docs/concepts/structured_outputs/#recommended-usage) 非常有用,但如果我们处理更复杂的 schema 会发生什么?

[这里](https://github.com/hinthornw/trustcall?tab=readme-ov-file#complex-schema)是一个更复杂的 schema 示例,我们将在下面测试。

这是一个 [Pydantic](https://docs.pydantic.dev/latest/) model,描述了用户对通信和 trust fall 的偏好。

In [18]:
from typing import List, Optional

class OutputFormat(BaseModel):
    preference: str
    sentence_preference_revealed: str

class TelegramPreferences(BaseModel):
    preferred_encoding: Optional[List[OutputFormat]] = None
    favorite_telegram_operators: Optional[List[OutputFormat]] = None
    preferred_telegram_paper: Optional[List[OutputFormat]] = None

class MorseCode(BaseModel):
    preferred_key_type: Optional[List[OutputFormat]] = None
    favorite_morse_abbreviations: Optional[List[OutputFormat]] = None

class Semaphore(BaseModel):
    preferred_flag_color: Optional[List[OutputFormat]] = None
    semaphore_skill_level: Optional[List[OutputFormat]] = None

class TrustFallPreferences(BaseModel):
    preferred_fall_height: Optional[List[OutputFormat]] = None
    trust_level: Optional[List[OutputFormat]] = None
    preferred_catching_technique: Optional[List[OutputFormat]] = None

class CommunicationPreferences(BaseModel):
    telegram: TelegramPreferences
    morse_code: MorseCode
    semaphore: Semaphore

class UserPreferences(BaseModel):
    communication_preferences: CommunicationPreferences
    trust_fall_preferences: TrustFallPreferences

class TelegramAndTrustFallPreferences(BaseModel):
    pertinent_user_preferences: UserPreferences

现在,让我们尝试使用 `with_structured_output` 方法提取这个 schema。

In [19]:
from pydantic import ValidationError

# Bind schema to model
model_with_structure = model.with_structured_output(TelegramAndTrustFallPreferences)

# Conversation
conversation = """Operator: How may I assist with your telegram, sir?
Customer: I need to send a message about our trust fall exercise.
Operator: Certainly. Morse code or standard encoding?
Customer: Morse, please. I love using a straight key.
Operator: Excellent. What's your message?
Customer: Tell him I'm ready for a higher fall, and I prefer the diamond formation for catching.
Operator: Done. Shall I use our "Daredevil" paper for this daring message?
Customer: Perfect! Send it by your fastest carrier pigeon.
Operator: It'll be there within the hour, sir."""

# Invoke the model
try:
    model_with_structure.invoke(f"""Extract the preferences from the following conversation:
    <convo>
    {conversation}
    </convo>""")
except ValidationError as e:
    print(e)

1 validation error for TelegramAndTrustFallPreferences
pertinent_user_preferences.communication_preferences.semaphore
  Input should be a valid dictionary or instance of Semaphore [type=model_type, input_value=None, input_type=NoneType]
    For further information visit https://errors.pydantic.dev/2.9/v/model_type


如果我们天真地提取更复杂的 schema,即使使用像 `gpt-4o` 这样的高容量模型,也容易失败。

## 使用 Trustcall 创建和更新 profile schemas

正如我们所看到的,处理 schemas 可能很棘手。

复杂的 schemas 可能难以提取。

此外,即使是简单的 schemas,更新也会带来挑战。

考虑我们上面的 chatbot。

我们每次选择保存新记忆时,都会*从头开始*重新生成 profile schema。

这是低效的,如果 schema 包含大量信息需要每次重新生成,可能会浪费 model tokens。

更糟糕的是,当从头开始重新生成 profile 时,我们可能会丢失信息。

解决这些问题正是 [TrustCall](https://github.com/hinthornw/trustcall) 的动机!

这是一个由 LangChain 团队的 [Will Fu-Hinthorn](https://github.com/hinthornw) 开发的用于更新 JSON schemas 的开源库。

它的动机正是在处理记忆时遇到的这些挑战。

让我们首先展示在这个 [messages](https://python.langchain.com/docs/concepts/messages/) 列表上使用 TrustCall 进行提取的简单用法。

In [20]:
# Conversation
conversation = [HumanMessage(content="Hi, I'm Lance."), 
                AIMessage(content="Nice to meet you, Lance."), 
                HumanMessage(content="I really like biking around San Francisco.")]

我们使用 `create_extractor`,传入 model 以及我们的 schema 作为 [tool](https://python.langchain.com/docs/concepts/tools/)。

使用 TrustCall,可以以各种方式提供 schema。

例如,我们可以传递 JSON 对象/Python 字典或 Pydantic model。

在底层,TrustCall 使用 [tool calling](https://python.langchain.com/docs/concepts/tool_calling/) 从输入的 [messages](https://python.langchain.com/docs/concepts/messages/) 列表中生成 [structured output](https://python.langchain.com/docs/concepts/structured_outputs/)。

要强制 Trustcall 生成 [structured output](https://python.langchain.com/docs/concepts/structured_outputs/),我们可以在 `tool_choice` 参数中包含 schema 名称。

我们可以使用上面的对话调用 extractor。

In [None]:
from trustcall import create_extractor

# ================== 定义 Schema ==================

# Python 知识点:
# - 这次使用 Pydantic BaseModel 而不是 TypedDict
# - Field: Pydantic 字段,可以添加描述、验证规则等
# - description: 帮助 LLM 理解字段含义,提高提取准确性

class UserProfile(BaseModel):
    """用户 profile schema - 使用 Pydantic 定义"""
    user_name: str = Field(description="The user's preferred name")
    interests: List[str] = Field(description="A list of the user's interests")

# 初始化 model
model = ChatOpenAI(model="gpt-4o", temperature=0)

# ================== 创建 Trustcall Extractor ==================

# Trustcall 知识点:
# - create_extractor: Trustcall 的核心函数
# - 作用: 创建一个可以提取和更新结构化数据的 extractor
# - 底层: 使用 tool calling 实现结构化输出

# 参数说明:
# - model: 要使用的 LLM
# - tools: Schema 列表,可以是 Pydantic/TypedDict/JSON
# - tool_choice: 强制使用特定 tool,确保生成结构化输出

trustcall_extractor = create_extractor(
    model,
    tools=[UserProfile],  # 传入 Pydantic model
    tool_choice="UserProfile"  # 强制使用 UserProfile tool
)

# ================== 准备系统指令 ==================

# 系统消息 - 指导 LLM 从对话中提取 profile
system_msg = "Extract the user profile from the following conversation"

# ================== 调用 Extractor ==================

# Trustcall 工作流程:
# 1. 接收 messages 列表(SystemMessage + 对话历史)
# 2. 使用 tool calling 生成 tool call
# 3. 解析 tool call 参数为 UserProfile 实例
# 4. 返回包含多个字段的结果字典

# 调用 extractor
# - 输入: {"messages": [SystemMessage, ...对话消息]}
# - 输出: 包含 messages, responses, response_metadata 的字典
result = trustcall_extractor.invoke({"messages": [SystemMessage(content=system_msg)]+conversation})

当我们调用 extractor 时,我们得到几样东西:

* `messages`: 包含 tool calls 的 `AIMessages` 列表。
* `responses`: 与我们 schema 匹配的解析后的 tool calls 结果。
* `response_metadata`: 适用于更新现有 tool calls。它表示哪些 responses 对应于哪些现有对象。

In [17]:
for m in result["messages"]: 
    m.pretty_print()

Tool Calls:
  UserProfile (call_spGGUsoaUFXU7oOrUNCASzfL)
 Call ID: call_spGGUsoaUFXU7oOrUNCASzfL
  Args:
    user_name: Lance
    interests: ['biking around San Francisco']


In [18]:
schema = result["responses"]
schema

[UserProfile(user_name='Lance', interests=['biking around San Francisco'])]

In [19]:
schema[0].model_dump()

{'user_name': 'Lance', 'interests': ['biking around San Francisco']}

In [20]:
result["response_metadata"]

[{'id': 'call_spGGUsoaUFXU7oOrUNCASzfL'}]

让我们看看如何使用它来*更新* profile。

对于更新,TrustCall 接受一组 messages 以及现有的 schema。

核心思想是它提示 model 生成 [JSON Patch](https://jsonpatch.com/) 来仅更新 schema 的相关部分。

这比天真地覆盖整个 schema 更不容易出错。

它也更高效,因为 model 只需要生成已更改的 schema 部分。

我们可以将现有的 schema 保存为 dict。

我们可以使用 `model_dump()` 将 Pydantic model 实例序列化为 dict。

我们将它连同 schema 名称 `UserProfile` 传递给 `"existing"` 参数。

In [None]:
# ================== 更新对话 ==================

# 新增两条消息,用于测试 Trustcall 的更新功能
updated_conversation = [
    HumanMessage(content="Hi, I'm Lance."), 
    AIMessage(content="Nice to meet you, Lance."), 
    HumanMessage(content="I really like biking around San Francisco."),
    AIMessage(content="San Francisco is a great city! Where do you go after biking?"),
    HumanMessage(content="I really like to go to a bakery after biking."),  # 新信息
]

# ================== 更新系统指令 ==================

# 关键变化: 从 "Extract" 改为 "Update"
# - 指示 LLM 这是更新操作,不是从头创建
# - LLM 会生成 JSON Patch 来更新现有 schema
system_msg = f"""Update the memory (JSON doc) to incorporate new information from the following conversation"""

# ================== 使用 Trustcall 更新 Profile ==================

# Trustcall 更新机制:
# - existing 参数: 传入现有 schema 的字典表示
# - 格式: {tool_name: schema_dict}
# - Trustcall 会:
#   1. 分析现有 schema 和新对话
#   2. 生成 JSON Patch 表示变化
#   3. 应用 patch 到现有 schema
#   4. 返回更新后的 schema

# Python 知识点:
# - schema[0]: result["responses"] 是列表,取第一个元素
# - model_dump(): Pydantic 方法,将 model 实例转换为字典

# 调用 extractor 进行更新
result = trustcall_extractor.invoke(
    {"messages": [SystemMessage(content=system_msg)]+updated_conversation},  # 新对话
    {"existing": {"UserProfile": schema[0].model_dump()}}  # 现有 schema
)

# Trustcall 的优势:
# - 只更新变化的部分,不重新生成整个 schema
# - 更高效,节省 tokens
# - 更不容易丢失信息
# - 使用 JSON Patch 标准,更可靠

In [22]:
for m in result["messages"]: 
    m.pretty_print()

Tool Calls:
  UserProfile (call_WeZl0ACfQStxblim0ps8LNKT)
 Call ID: call_WeZl0ACfQStxblim0ps8LNKT
  Args:
    user_name: Lance
    interests: ['biking', 'visiting bakeries']


In [23]:
result["response_metadata"]

[{'id': 'call_WeZl0ACfQStxblim0ps8LNKT'}]

In [24]:
updated_schema = result["responses"][0]
updated_schema.model_dump()

{'user_name': 'Lance', 'interests': ['biking', 'visiting bakeries']}

LangSmith trace:

https://smith.langchain.com/public/229eae22-1edb-44c6-93e6-489124a43968/r

现在,让我们也在之前看到的[复杂 schema](https://github.com/hinthornw/trustcall?tab=readme-ov-file#complex-schema) 上测试 Trustcall。

In [None]:
# ================== 测试复杂 Schema ==================

# Trustcall 知识点:
# - Trustcall 专门设计用于处理复杂 schema
# - 使用迭代策略和 JSON Patch,比 with_structured_output 更可靠

# 创建 extractor for 复杂 schema
bound = create_extractor(
    model,
    tools=[TelegramAndTrustFallPreferences],  # 嵌套很深的 Pydantic model
    tool_choice="TelegramAndTrustFallPreferences",  # 强制使用此 tool
)

# ================== 准备测试对话 ==================

# 这是一个关于电报和 trust fall 偏好的复杂对话
conversation = """Operator: How may I assist with your telegram, sir?
Customer: I need to send a message about our trust fall exercise.
Operator: Certainly. Morse code or standard encoding?
Customer: Morse, please. I love using a straight key.
Operator: Excellent. What's your message?
Customer: Tell him I'm ready for a higher fall, and I prefer the diamond formation for catching.
Operator: Done. Shall I use our "Daredevil" paper for this daring message?
Customer: Perfect! Send it by your fastest carrier pigeon.
Operator: It'll be there within the hour, sir."""

# ================== 调用 Trustcall 提取 ==================

# Trustcall 如何处理复杂 schema:
# 1. 将复杂 schema 分解为多个简单的子任务
# 2. 迭代处理每个子 schema
# 3. 使用 JSON Patch 合并结果
# 4. 验证最终结果是否符合完整 schema

# 调用 extractor
result = bound.invoke(
    f"""Extract the preferences from the following conversation:
<convo>
{conversation}
</convo>"""
)

# 提取并显示结果
# - result["responses"]: 包含解析后的 Pydantic model 实例列表
# - result["responses"][0]: 第一个(也是唯一的)提取结果
result["responses"][0]

# 为什么 Trustcall 成功而 with_structured_output 失败:
# - with_structured_output: 一次性生成整个 schema,容易出错
# - Trustcall: 分步处理,更容易处理复杂嵌套结构
# - Trustcall: 使用多轮交互和验证,确保完整性

Trace: 

https://smith.langchain.com/public/5cd23009-3e05-4b00-99f0-c66ee3edd06e/r

要获取更多示例,你可以在[这里](https://www.youtube.com/watch?v=-H4s0jQi-QY)查看概述视频。

## 使用 profile schema 更新的 Chatbot

现在,让我们将 Trustcall 引入我们的 chatbot 来*创建和更新*记忆 profile。

In [None]:
from IPython.display import Image, display

from langchain_core.messages import HumanMessage, SystemMessage
from langgraph.graph import StateGraph, MessagesState, START, END
from langchain_core.runnables.config import RunnableConfig
from langgraph.checkpoint.memory import MemorySaver
from langgraph.store.base import BaseStore

# ================== 初始化 Model ==================

# 初始化 LLM
model = ChatOpenAI(model="gpt-4o", temperature=0)

# ================== 定义 Schema ==================

# 扩展的 UserProfile schema
# 新增 user_location 字段,展示如何处理更复杂的 profile
class UserProfile(BaseModel):
    """完整的用户 profile schema"""
    user_name: str = Field(description="The user's preferred name")
    user_location: str = Field(description="The user's location")
    interests: list = Field(description="A list of the user's interests")

# ================== 创建 Trustcall Extractor ==================

# Trustcall 知识点:
# - 这个 extractor 将用于 write_memory 节点
# - 相比 with_structured_output,Trustcall 提供:
#   1. 更好的复杂 schema 支持
#   2. 增量更新能力(使用 JSON Patch)
#   3. 更少的信息丢失
#   4. 更高的 token 效率

trustcall_extractor = create_extractor(
    model,
    tools=[UserProfile],
    tool_choice="UserProfile",  # 强制使用 UserProfile,确保结构化输出
)

# ================== 系统提示词 ==================

# Chatbot 主对话提示
MODEL_SYSTEM_MESSAGE = """You are a helpful assistant with memory that provides information about the user. 
If you have memory for this user, use it to personalize your responses.
Here is the memory (it may be empty): {memory}"""

# Trustcall 提取提示
# 关键词 "Create or update" 告诉 Trustcall 这是创建/更新操作
TRUSTCALL_INSTRUCTION = """Create or update the memory (JSON doc) to incorporate information from the following conversation:"""

# ================== call_model 节点 ==================

def call_model(state: MessagesState, config: RunnableConfig, store: BaseStore):
    """
    主对话节点 - 从 store 加载记忆并生成个性化响应
    
    与 cell-18 的 call_model 基本相同,但使用了扩展的 UserProfile schema
    
    工作流程:
        1. 从 config 获取 user_id
        2. 从 store 检索该用户的记忆
        3. 格式化记忆(现在包含 location)
        4. 注入系统提示并生成响应
    """
    
    # 获取 user_id
    user_id = config["configurable"]["user_id"]

    # 从 store 检索记忆
    namespace = ("memory", user_id)
    existing_memory = store.get(namespace, "user_memory")

    # 格式化记忆 - 现在包含三个字段
    if existing_memory and existing_memory.value:
        memory_dict = existing_memory.value
        formatted_memory = (
            f"Name: {memory_dict.get('user_name', 'Unknown')}\n"
            f"Location: {memory_dict.get('user_location', 'Unknown')}\n"  # 新增字段
            f"Interests: {', '.join(memory_dict.get('interests', []))}"      
        )
    else:
        formatted_memory = None

    # 格式化系统消息
    system_msg = MODEL_SYSTEM_MESSAGE.format(memory=formatted_memory)

    # 生成响应
    response = model.invoke([SystemMessage(content=system_msg)]+state["messages"])

    return {"messages": response}

# ================== write_memory 节点 ==================

def write_memory(state: MessagesState, config: RunnableConfig, store: BaseStore):
    """
    记忆写入节点 - 使用 Trustcall 创建和更新记忆 profile
    
    关键改进 (相比 cell-18):
    - 使用 Trustcall 而不是 with_structured_output
    - 支持增量更新,避免从头重新生成
    - 更不容易丢失信息
    - 更高效的 token 使用
    
    工作流程:
        1. 从 config 获取 user_id
        2. 从 store 检索现有记忆
        3. 将现有记忆转换为 Trustcall 可接受的格式
        4. 调用 Trustcall extractor 进行更新
        5. 保存更新后的 profile
    
    Trustcall vs with_structured_output:
    - with_structured_output: 每次从头生成整个 schema
    - Trustcall: 生成 JSON Patch,只更新变化的部分
    """
    
    # 获取 user_id
    user_id = config["configurable"]["user_id"]

    # 检索现有记忆
    namespace = ("memory", user_id)
    existing_memory = store.get(namespace, "user_memory")
        
    # ========== 准备 existing 参数 ==========
    # Trustcall 知识点:
    # - existing 参数告诉 Trustcall 这是更新操作
    # - 格式: {tool_name: schema_dict}
    # - 如果 existing_memory 存在,使用其值
    # - 如果不存在,设置为 None,Trustcall 会创建新 profile
    
    # Python 知识点:
    # - 三元表达式: value_if_true if condition else value_if_false
    # - {"UserProfile": dict} 创建嵌套字典
    existing_profile = {"UserProfile": existing_memory.value} if existing_memory else None
    
    # ========== 调用 Trustcall Extractor ==========
    # 工作流程:
    # 1. Trustcall 接收对话历史和现有 profile
    # 2. 如果是更新:
    #    a. 分析对话中的新信息
    #    b. 生成 JSON Patch 表示变化
    #    c. 应用 patch 到现有 profile
    # 3. 如果是创建:
    #    a. 从对话中提取所有信息
    #    b. 生成完整的新 profile
    # 4. 验证结果是否符合 UserProfile schema
    
    result = trustcall_extractor.invoke(
        {"messages": [SystemMessage(content=TRUSTCALL_INSTRUCTION)]+state["messages"]}, 
        {"existing": existing_profile}  # 传入现有 profile
    )
    
    # ========== 提取和保存 Profile ==========
    # result["responses"]: Trustcall 返回的 Pydantic model 实例列表
    # [0]: 取第一个(通常是唯一的)结果
    # .model_dump(): 将 Pydantic model 转换为字典
    updated_profile = result["responses"][0].model_dump()

    # 保存到 store
    # Python 知识点: 这是覆盖式保存,但实际内容是增量更新的
    key = "user_memory"
    store.put(namespace, key, updated_profile)

# ================== 构建 Graph ==================

# 定义图 - 与 cell-18 相同的结构
builder = StateGraph(MessagesState)

# 添加节点
builder.add_node("call_model", call_model)
builder.add_node("write_memory", write_memory)

# 添加边 - 简单的线性流程
# START -> call_model -> write_memory -> END
builder.add_edge(START, "call_model")
builder.add_edge("call_model", "write_memory")
builder.add_edge("write_memory", END)

# ================== 初始化双记忆系统 ==================

# 长期记忆 (跨线程) - Store
# - 保存用户 profile
# - 使用 Trustcall 管理
# - 增量更新,避免信息丢失
across_thread_memory = InMemoryStore()

# 短期记忆 (线程内) - Checkpointer  
# - 保存对话历史
# - 每个线程独立
within_thread_memory = MemorySaver()

# ================== 编译 Graph ==================

# LangGraph 知识点:
# - checkpointer + store: 双记忆架构
# - checkpointer: 管理对话历史 (MessagesState)
# - store: 管理长期记忆 (用户 profile)
# - 两者协同工作,提供完整的记忆能力

graph = builder.compile(
    checkpointer=within_thread_memory, 
    store=across_thread_memory
)

# 可视化图结构
display(Image(graph.get_graph(xray=1).draw_mermaid_png()))

# ================== 总结: Trustcall Chatbot 的优势 ==================
#
# 1. 增量更新:
#    - 只更新变化的字段,不重新生成整个 profile
#    - 减少信息丢失的风险
#
# 2. Token 效率:
#    - 不需要每次生成完整 profile
#    - 只生成 JSON Patch,节省 tokens
#
# 3. 复杂 Schema 支持:
#    - 比 with_structured_output 更可靠
#    - 可以处理深度嵌套的 schema
#
# 4. 灵活性:
#    - 同一个 extractor 可以用于创建和更新
#    - 通过 existing 参数控制行为
#
# 5. 标准化:
#    - 使用 JSON Patch 标准
#    - 更容易调试和理解变化

In [23]:
# We supply a thread ID for short-term (within-thread) memory
# We supply a user ID for long-term (across-thread) memory 
config = {"configurable": {"thread_id": "1", "user_id": "1"}}

# User input 
input_messages = [HumanMessage(content="Hi, my name is Lance")]

# Run the graph
for chunk in graph.stream({"messages": input_messages}, config, stream_mode="values"):
    chunk["messages"][-1].pretty_print()


Hi, my name is Lance

Hello, Lance! It's nice to meet you. How can I assist you today?


In [24]:
# User input 
input_messages = [HumanMessage(content="I like to bike around San Francisco")]

# Run the graph
for chunk in graph.stream({"messages": input_messages}, config, stream_mode="values"):
    chunk["messages"][-1].pretty_print()


I like to bike around San Francisco

That sounds like a great way to explore the city! San Francisco has some beautiful routes and views. Do you have any favorite trails or spots you like to visit while biking?


In [25]:
# Namespace for the memory to save
user_id = "1"
namespace = ("memory", user_id)
existing_memory = across_thread_memory.get(namespace, "user_memory")
existing_memory.dict()

{'value': {'user_name': 'Lance',
  'user_location': 'San Francisco',
  'interests': ['biking']},
 'key': 'user_memory',
 'namespace': ['memory', '1'],
 'created_at': '2024-11-04T23:51:17.662428+00:00',
 'updated_at': '2024-11-04T23:51:41.697652+00:00'}

In [26]:
# The user profile saved as a JSON object
existing_memory.value

{'user_name': 'Lance',
 'user_location': 'San Francisco',
 'interests': ['biking']}

In [27]:
# User input 
input_messages = [HumanMessage(content="I also enjoy going to bakeries")]

# Run the graph
for chunk in graph.stream({"messages": input_messages}, config, stream_mode="values"):
    chunk["messages"][-1].pretty_print()


I also enjoy going to bakeries

Biking and visiting bakeries sounds like a delightful combination! San Francisco has some fantastic bakeries. Do you have any favorites, or are you looking for new recommendations to try out?


在新线程中继续对话。

In [28]:
# We supply a thread ID for short-term (within-thread) memory
# We supply a user ID for long-term (across-thread) memory 
config = {"configurable": {"thread_id": "2", "user_id": "1"}}

# User input 
input_messages = [HumanMessage(content="What bakeries do you recommend for me?")]

# Run the graph
for chunk in graph.stream({"messages": input_messages}, config, stream_mode="values"):
    chunk["messages"][-1].pretty_print()


What bakeries do you recommend for me?

Since you're in San Francisco and enjoy going to bakeries, here are a few recommendations you might like:

1. **Tartine Bakery** - Known for its delicious bread and pastries, it's a must-visit for any bakery enthusiast.
2. **B. Patisserie** - Offers a delightful selection of French pastries, including their famous kouign-amann.
3. **Arsicault Bakery** - Renowned for its croissants, which have been praised as some of the best in the country.
4. **Craftsman and Wolves** - Known for their inventive pastries and the "Rebel Within," a savory muffin with a soft-cooked egg inside.
5. **Mr. Holmes Bakehouse** - Famous for their cruffins and other creative pastries.

These spots should offer a great variety of treats for you to enjoy. Happy bakery hopping!


Trace:

https://smith.langchain.com/public/f45bdaf0-6963-4c19-8ec9-f4b7fe0f68ad/r

## Studio

![Screenshot 2024-10-30 at 11.26.31 AM.png](https://cdn.prod.website-files.com/65b8cd72835ceeacd4449a53/6732d0437060f1754ea79908_Screenshot%202024-11-11%20at%207.48.53%E2%80%AFPM.png)