In [11]:
from langchain_deepseek import ChatDeepSeek

model = ChatDeepSeek(model="deepseek-chat", temperature=0.7)

# SummarizationMiddleware

> 源码：langchain/libs/langchain_v1/langchain/agents/middleware/summarization.py

随着对话轮次的增加，上下文会变得越来越长。这不仅会导致计算成本飙升，还会让模型因为“信息过载”而忘记最初的指令或关键任务。消息摘要中间件就是为了解决这个问题而生的，它的主要作用是在before_model节点，调用压缩模型对历史消息进行压缩，从而保留核心信息，控制上下文窗口的长度。

```python
SummarizationMiddleware(
    model: str | BaseChatModel, # 用于生成摘要的模型
    *,
    trigger: ContextSize | list[ContextSize] | None = None, # 触发摘要压缩的上下文大小阈值，列表的话满足任意一个就触发压缩
    keep: ContextSize = ("messages", _DEFAULT_MESSAGES_TO_KEEP), # 摘要后保持的上下文数量
    token_counter: TokenCounter = count_tokens_approximately, # token计算方法，默认使用基于字符的计数方式
    summary_prompt: str = DEFAULT_SUMMARY_PROMPT, # 用于摘要的自定义提示模板。若未指定，则使用内置模板。
    trim_tokens_to_summarize: int | None = _DEFAULT_TRIM_TOKEN_LIMIT, # 生成摘要时最多包含的令牌数量。在摘要前，消息将被裁剪以适应此限制。
    **deprecated_kwargs: Any,
)
```

## ContextSize对象

ContextSize = ContextFraction | ContextTokens | ContextMessages

用于指定上下文大小的联合类型。

该类型可以是以下任意一种：
- ContextFraction：模型最大输入令牌数的一个分数（比例）。
- ContextTokens：令牌数的绝对值。
- ContextMessages：消息数量的绝对值。

示例：
```python
# ContextFraction - 使用比例
context_size: ContextSize = ("fraction", 0.5)  # 50% 的最大容量

# ContextTokens - 使用令牌数
context_size: ContextSize = ("tokens", 3000)   # 精确到 3000 个令牌

# ContextMessages - 使用消息数
context_size: ContextSize = ("messages", 50)   # 最近 50 条消息
```

## 处理流程

trigger触发，keep确定保留策略（最近的消息或token），剩余的进行摘要。

```python
# 1. 检查 token 数量
total_tokens = self.token_counter(messages)  # 假设是 5000

# 2. 判断是否触发摘要
if total_tokens >= 4000:  # 5000 >= 4000，触发
    return True

# 3. 确定 cutoff_index
cutoff_index = self._determine_cutoff_index(messages)
# 根据 keep 参数决定保留多少消息
# 假设 keep=("messages", 5)，保留最近5条，其他全部进行摘要

# 4. 分割消息
messages_to_summarize = messages[:cutoff_index]  # 前15条
preserved_messages = messages[cutoff_index:]   # 后5条

# 5. 生成摘要
summary = self._create_summary(messages_to_summarize)

# 6. 构建新消息列表
new_messages = [summary, *preserved_messages]

# 7. 返回更新
return {"messages": [RemoveMessage(id=REMOVE_ALL_MESSAGES), *new_messages]}
```

## 示例

In [None]:
from langgraph.checkpoint.memory import InMemorySaver
from langchain.agents import create_agent
from langchain.agents.middleware import SummarizationMiddleware
from langchain_deepseek import ChatDeepSeek
from langchain_core.runnables import RunnableConfig

model = ChatDeepSeek(
    model = "deepseek-chat",
    temperature=0.7
)


checkpointer = InMemorySaver()

agent = create_agent(
    model=model,
    tools=[],
    middleware=[
        SummarizationMiddleware(
            model=model,
            trigger=("tokens", 4000),
            keep=("messages", 20)
        )
    ],
    checkpointer=checkpointer,
)

config: RunnableConfig = {"configurable": {"thread_id": "1"}}
agent.invoke({"messages": "你好，我的名字叫Pet"}, config)
agent.invoke({"messages": "写一首关于猫的诗歌"}, config)
agent.invoke({"messages": "再写一首关于狗的诗歌"}, config)
final_response = agent.invoke({"messages": "我叫什么名字"}, config)

final_response["messages"][-1].pretty_print()

# ModelCallLimitMiddleware

限制模型调用的次数（线程级别和运行级别），以防止无限循环或产生过高成本。模型调用限制适用于以下场景：
- 防止失控的智能体发起过多 API 调用；
- 在生产环境中实施成本控制；
- 在特定调用预算内测试智能体的行为。

## 参数

| 参数 | 类型 | 默认值 | 说明 |
|------|------|--------|------|
| `thread_limit` | int \| None | None | 每个线程允许的最大模型调用次数。`None` 表示无限制 |
| `run_limit` | int \| None | None | 每次运行（调用）允许的最大模型调用次数。`None` 表示无限制 |
| `exit_behavior` | Literal["end", "error"] | "end" | 超过限制时的处理方式：`"end"` 跳转到结束，`"error"` 抛出异常 |



## 限制类型

Thread-level（线程级别）
- 跟踪模型调用次数
- 跨多次运行（调用）持久化调用计数

Run-level（运行级别）
- 跟踪单次运行（调用）期间的模型调用次数


## 退出行为

| exit_behavior | 行为 |
|---------------|------|
| `"end"` | 跳转到代理执行结束，并注入一条 AI 消息表示限制已超过 |
| `"error"` | 抛出 `ModelCallLimitExceededError` 异常 |


## 工作逻辑
在state中新增两个字段，分别是`thread_model_call_count`和`run_model_call_count`，用于记录线程级别的模型调用次数和运行级别的模型调用次数，由中间件维护，初始值为0。通过在`before_model`中检查这两个字段是否超过限制，来决定是否抛出异常或跳转到结束。

```python
# 1. 在 before_model 中检查限制
thread_count = state.get("thread_model_call_count", 0)
run_count = state.get("run_model_call_count", 0)

# 2. 检查是否超过限制
if thread_count >= thread_limit or run_count >= run_limit:
    if exit_behavior == "error":
        raise ModelCallLimitExceededError(...)
    if exit_behavior == "end":
        return {"jump_to": "end", "messages": [AIMessage(content="...")]}

# 3. 在 after_model 中增加计数
return {
    "thread_model_call_count": thread_count + 1,
    "run_model_call_count": run_count + 1,
}
```




## 异常类型

```python
class ModelCallLimitExceededError(Exception):
    """当模型调用限制超过时抛出的异常"""
```

## 示例

In [None]:
from langchain.agents import create_agent
from langchain.agents.middleware import ModelCallLimitMiddleware

agent = create_agent(
    model=model,
    tools=[],
    middleware=[
        ModelCallLimitMiddleware(
            thread_limit=10, # 整个会话最多10次
            run_limit=5, # 单个会话最多5次
            exit_behavior="end",
        ),
    ],
)

# ToolCallLimitMiddleware

用于跟踪工具调用次数并在达到限制时终止或限制代理执行。支持线程级别和运行级别的调用计数，并具有可配置的退出行为。



## 参数

| 参数 | 类型 | 默认值 | 说明 |
|------|------|--------|------|
| `tool_name` | str \| None | None | 要限制的特定工具名称。`None` 表示限制所有工具 |
| `thread_limit` | int \| None | None | 每个线程允许的最大工具调用次数。`None` 表示无限制 |
| `run_limit` | int \| None | None | 每次运行（调用）允许的最大工具调用次数。`None` 表示无限制 |
| `exit_behavior` | ExitBehavior | "continue" | 超过限制时的处理方式 |



## 退出行为

| exit_behavior | 行为 |
|---------------|------|
| `"continue"` | 阻止超限的工具，让其他工具继续执行（默认）。模型决定何时结束 |
| `"error"` | 抛出 `ToolCallLimitExceededError` 异常 |
| `"end"` | 立即停止执行，为超限的工具调用生成 `ToolMessage` + AI 消息。如果有多个并行的工具调用，抛出 `NotImplementedError` |


## 工作逻辑

```python
# 1. 在 after_model 中检查工具调用
last_ai_message = state["messages"][-1]  # 获取最后一条 AI 消息
tool_calls = last_ai_message.tool_calls

# 2. 分割工具调用为允许和阻止的
allowed_calls, blocked_calls, new_thread_count, new_run_count = self._separate_tool_calls(
    tool_calls, current_thread_count, current_run_count
)

# 3. 更新计数
thread_counts[count_key] = new_thread_count  # 只计算允许的调用
run_counts[count_key] = new_run_count + len(blocked_calls)  # 包括阻止的调用

# 4. 处理阻止的工具调用
if exit_behavior == "error":
    raise ToolCallLimitExceededError(...)
elif exit_behavior == "end":
    # 生成错误消息
    tool_msg = ToolMessage(content="...", tool_call_id=blocked_call["id"])
    ai_msg = AIMessage(content="...")
    return {"messages": [tool_msg, ai_msg], "jump_to": "end"}
elif exit_behavior == "continue":
    # 生成错误消息，让模型继续
    tool_msgs = [ToolMessage(content="...", tool_call_id=call["id"]) for call in blocked_calls]
    return {"messages": tool_msgs}
```



## 状态字段

| 字段 | 类型 | 说明 |
|------|------|------|
| `thread_tool_call_count` | dict[str, int] | 线程级别的工具调用计数，key 为工具名称或 `__all__` |
| `run_tool_call_count` | dict[str, int] | 运行级别的工具调用计数，key 为工具名称或 `__all__` |



## 异常

```python
class ToolCallLimitExceededError(Exception):
    """当工具调用限制超过时抛出的异常"""
```

一个中间件只能指定一个限制的tool，如果要指定2个以上，需要创建对应数量的中间件。

In [None]:
from langchain.agents import create_agent
from langchain.agents.middleware import ToolCallLimitMiddleware

agent = create_agent(
    model=model,
    tools=[search_tool, database_tool],
    middleware=[
        # 不指定具体tool name则全局限制
        ToolCallLimitMiddleware(thread_limit=20, run_limit=10),
        # 指定具体的工具
        ToolCallLimitMiddleware(
            tool_name="search",
            thread_limit=5,
            run_limit=3,
            exit_behavior='continue'|'error'|'end'
        ),
    ],
)

# ModelFallbackMiddleware

模型降级：当主模型失效时，会按顺序自动切换到备用模型，直到成功或所有模型都失败。

## 参数

| 参数 | 类型 | 默认值 | 说明 |
|------|------|--------|------|
| `first_model` | str \| BaseChatModel | - | 第一个备用模型（字符串名称或实例）|
| `*additional_models` | str \| BaseChatModel | - | 额外的备用模型，按顺序尝试 |


## 工作逻辑

```python
# 1. 尝试主模型
try:
    return handler(request)
except Exception as e:
    last_exception = e

# 2. 按顺序尝试备用模型
for fallback_model in self.models:
    try:
        return handler(request.override(model=fallback_model))
    except Exception as e:
        last_exception = e
        continue

# 3. 所有模型都失败，抛出最后的异常
raise last_exception
```

## 模型尝试顺序

1. 主模型（`create_agent` 中指定的）
2. 第一个备用模型（`first_model`）
3. 第二个备用模型（`additional_models[0]`）
4. 第三个备用模型（`additional_models[1]`）
5. ...


## 异常处理

| 场景 | 行为 |
|------|------|
| 主模型成功 | 返回结果 |
| 主模型失败，备用模型成功 | 返回备用模型的结果 |
| 所有模型都失败 | 抛出最后一个异常 |


In [None]:
from langchain.agents import create_agent
from langchain.agents.middleware import ModelFallbackMiddleware

agent = create_agent(
    model=model,
    tools=[],
    middleware=[
        ModelFallbackMiddleware(
            "gpt-4o-mini",
            "claude-3-5-sonnet-20241022", # 按顺序调用备用模型
        ),
    ],
)

# PIIMiddleware

> PII——Personally Identifiable Information

`PIIMiddleware` 用于检测和处理对话中的个人身份信息（PII）。可以检测电子邮件、信用卡号、IP 地址、MAC 地址和 URL，并应用可配置的策略来处理它们。

## 内置 PII 类型

| 类型 | 说明 | 验证方式 |
|------|------|---------|
| `email` | 电子邮件地址 | 正则表达式 |
| `credit_card` | 信用卡号（Luhn 算法验证）| Luhn 算法 |
| `ip` | IP 地址（stdlib 验证）| stdlib |
| `mac_address` | MAC 地址 | 正则表达式 |
| `url` | URL（http/https 和裸 URL）| 正则表达式 |

## 处理策略
保留身份：处理后的信息是否还能追溯到原始的个人身份 。

| 策略 | 保留身份？| 适用场景 |
|------|---------|---------|
| `block` | N/A | 完全避免PII，检测到抛出异常 |
| `redact` | 否 | 一般合规、日志清理 |
| `mask` | 否 | 人类可读性、客户服务 UI |
| `hash` | 是（假名）| 分析、调试 |


## 参数

| 参数 | 类型 | 默认值 | 说明 |
|------|------|--------|------|
| `pii_type` | Literal["email", "credit_card", "ip", "mac_address", "url"] \| str | - | 要检测的 PII 类型。可以是内置类型或自定义类型名称 |
| `strategy` | Literal["block", "redact", "mask", "hash"] | "redact" | 如何处理检测到的 PII |
| `detector` | Callable[[str], list[PIIMatch]] \| str \| None | None | 自定义检测器函数或正则表达式模式。如果是 `None`，使用 `pii_type` 的内置检测器 |
| `apply_to_input` | bool | True | 是否在模型调用前检查用户消息 |
| `apply_to_output` | bool | False | 是否在模型调用后检查 AI 消息 |
| `apply_to_tool_results` | bool | False | 是否在工具执行后检查工具结果消息 |

## 工作逻辑

```python
# 1. before_model：检查用户输入和工具结果
if apply_to_input:
    # 获取最后一条用户消息
    last_user_msg = state["messages"][-1]
    
    # 检测 PII
    content = str(last_user_msg.content)
    new_content, matches = self._process_content(content)
    
    # 根据策略处理
    if strategy == "block":
        raise PIIDetectionError(...)
    elif strategy == "redact":
        new_content = "[REDACTED_email]"
    elif strategy == "mask":
        new_content = "****@example.com"
    elif strategy == "hash":
        new_content = "<email_hash:a1b2c3d4>"

# 2. after_model：检查 AI 输出
if apply_to_output:
    last_ai_msg = state["messages"][-1]
    # 同样的检测和处理逻辑
```

## 异常

```python
class PIIDetectionError(Exception):
    """当检测到 PII 且策略为 'block' 时抛出的异常"""
```

## 案例

In [None]:
from langchain.agents import create_agent
from langchain.agents.middleware import PIIMiddleware

agent = create_agent(
    model=model,
    tools=[],
    middleware=[
        PIIMiddleware("email", strategy="redact", apply_to_input=True),
        PIIMiddleware("credit_card", strategy="mask", apply_to_input=True),
    ],
)

### 自定义PII类型

可以通过以下方式创建自定义检测器：
- 正则表达式字符串：用于简单的模式匹配
- 自定义函数：用于实现带有验证逻辑的复杂检测

In [None]:
from langchain.agents import create_agent
from langchain.agents.middleware import PIIMiddleware
import re


# 方法 1: 使用正则表达式字符串模式
agent1 = create_agent(
    model=model,
    tools=[],
    middleware=[
        PIIMiddleware(
            "api_key", # 检测目标的名称
            detector=r"sk-[a-zA-Z0-9]{32}", # 匹配 OpenAI API Key 的正则表达式
            strategy="block", # 策略：发现即阻止
        ),
    ],
)

# 方法 2: 使用已编译的正则表达式模式
agent2 = create_agent(
    model=model,
    tools=[],
    middleware=[
        PIIMiddleware(
            "phone_number", # 检测目标的名称
            detector=re.compile(r"\+?\d{1,3}[\s.-]?\d{3,4}[\s.-]?\d{4}"), # 匹配电话号码的正则
            strategy="mask", # 策略：发现后掩码（打码）
        ),
    ],
)

# 方法 3: 使用自定义检测函数
def detect_ssn(content: str) -> list[dict[str, str | int]]:
    """检测 SSN（社会保障号）并进行校验。
    
    返回包含 'text'（文本）、'start'（起始位置）和 'end'（结束位置）键的字典列表。
    """
    # ssn：美国社会保障号
    import re
    matches = []
    pattern = r"\d{3}-\d{2}-\d{4}"
    for match in re.finditer(pattern, content):
        ssn = match.group(0)
        # 校验：前三位不能是 000, 666 或 900-999
        first_three = int(ssn[:3])
        if first_three not in [0, 666] and not (900 <= first_three <= 999):
            matches.append({
                "text": ssn,
                "start": match.start(),
                "end": match.end(),
            })
    return matches

agent3 = create_agent(
    model="gpt-4o",
    tools=[],
    middleware=[
        PIIMiddleware(
            "ssn", # 检测目标的名称
            detector=detect_ssn, # 使用自定义的检测函数
            strategy="hash", # 策略：发现后哈希化
        ),
    ],
)

### 完整案例

In [None]:
from langchain.agents import create_agent
from langchain_deepseek import ChatDeepSeek
from langchain.agents.middleware import PIIMiddleware
from langgraph.checkpoint.memory import InMemorySaver


model = ChatDeepSeek(
    model="deepseek-chat",
    temperature=0.7
)

agent = create_agent(
    model=model,
    tools=[],
    middleware=[
        PIIMiddleware(
            "phone_number",
            detector=r"\+?\d{1,3}[\s.-]?\d{3,4}[\s.-]?\d{4}",
            strategy="redact",
            apply_to_input=True, # 默认就是True
            # apply_to_output=True, 
        ),
    ],
    checkpointer=InMemorySaver()
)

agent.invoke(
    input={
        "messages": [{"role": "user", "content": "你好，我叫pet，手机号是13888888888"}]
    },
    config={
        "configurable": {"thread_id": "1"}
    }
)
final_response = agent.invoke(
    input={
        "messages": [{"role": "user", "content": "我叫什么名字？手机号是多少"}]
    },
    config={
        "configurable": {"thread_id": "1"}
    }
)

for msg in final_response['messages']:
    msg.pretty_print()

# TodoListMiddleware

`TodoListMiddleware` 为Agent提供待办事项列表管理功能。该中间件添加了 `write_todos` 工具，允许代理创建和管理结构化的任务列表，用于复杂的多步骤操作。它旨在帮助代理跟踪进度、组织复杂任务，并为用户提供任务完成状态的可见性。

## 参数

| 参数 | 类型 | 默认值 | 说明 |
|------|------|--------|------|
| `system_prompt` | str | `WRITE_TODOS_SYSTEM_PROMPT` | 指导代理使用 todo 工具的自定义系统提示 |
| `tool_description` | str | `WRITE_TODOS_TOOL_DESCRIPTION` | `write_todos` 工具的自定义描述 |

## Todo 类型

```python
class Todo(TypedDict):
    content: str  # 待办事项的内容/描述
    status: Literal["pending", "in_progress", "completed"]  # 当前状态
```

## 任务状态

| 状态 | 说明 |
|------|------|
| `pending` | 任务尚未开始 |
| `in_progress` | 正在处理（可以同时有多个任务）|
| `completed` | 任务成功完成 |

## write_todos 工具

代理可以调用 `write_todos` 工具来创建和管理任务列表：

```python
# 代理调用示例
write_todos([
    {"content": "Analyze codebase", "status": "in_progress"},
    {"content": "Refactor module A", "status": "pending"},
    {"content": "Write tests", "status": "pending"},
])

## 工作逻辑

```python
# 1. 在 wrap_model_call 中更新系统消息
if request.system_message is not None:
    # 在现有系统提示后追加 todo 系统提示
    new_system_content = [
        *request.system_message.content_blocks,
        {"type": "text", "text": f"\n\n{self.system_prompt}"},
    ]
else:
    # 创建新的系统消息
    new_system_content = [{"type": "text", "text": self.system_prompt}]

new_system_message = SystemMessage(content=new_system_content)
return handler(request.override(system_message=new_system_message))
```

In [None]:

from langchain.agents.middleware.todo import TodoListMiddleware
from langchain.agents import create_agent

# 创建中间件
agent = create_agent(
    model=model,
    middleware=[TodoListMiddleware()],
)

# 代理现在可以访问 write_todos 工具和 todo 状态跟踪
result = await agent.invoke({
    "messages": [HumanMessage("Help me refactor my codebase")]
})

print(result["todos"])  # 包含状态跟踪的待办事项数组
