# Workflow

在本节中，你将学习如何使用 `GraphFlow`（简称 “flow”）创建一个多智能体工作流。GraphFlow 采用结构化执行方式，可以精确控制各个代理之间的交互流程，以完成特定任务。

## 什么是 GraphFlow？

GraphFlow 是 AutoGen 中的一种团队（Team）结构，基于有向图（DiGraph）来控制代理（Agent）之间的执行流程。它支持以下复杂行为：

- 顺序执行（Sequential）

- 并行执行（Parallel）

- 条件分支（Conditional）

- 循环（Looping）

> langgraph 可以类比学习


## 何时应该使用 GraphFlow？
你应该在以下场景中使用 GraphFlow：

- 你需要严格控制代理的执行顺序

- 不同结果需要触发不同后续步骤

- 任务是一个多步骤流程，并可能包含条件分支或循环

- 需要确定性执行，而非随意聊天风格

👉 如果你的任务是非结构化对话，建议优先使用更简单的团队结构，如：

- `RoundRobinGroupChat`

- `SelectorGroupChat`

在任务变得复杂或需要结构控制时，再切换到 `GraphFlow`。




## 配置 Client

In [4]:
import os
from dotenv import load_dotenv
from autogen_ext.models.openai import OpenAIChatCompletionClient

load_dotenv()
siliconflow_api_key = os.getenv("SILICONFLOW_API_KEY") # 读取你的 OPENAI API key

# 初始化 OpenAIChatCompletionClient 客户端，连接到硅基流动平台的 Qwen3-8B 模型
model_client = OpenAIChatCompletionClient(
    model="Qwen/Qwen3-14B",                # 指定要调用的模型名称，硅基流动平台上 Qwen 3-8B 模型
    base_url="https://api.siliconflow.cn/v1",  # 硅基流动平台的 API 访问地址
    api_key=siliconflow_api_key,  # 你的 API 密钥
    model_info={                        
        "family": "qwen",              
        "context_length": 8192,        
        "max_output_tokens": 2048,     
        "tool_choice_supported": True, 
        "tool_choice_required": False,  
        "structured_output": True,     
        "vision": False,                
        "function_calling": True,      
        "json_output": True           
    },
)

## Creating and Running a Flow

`DiGraphBuilder` 是一个流式接口工具，能够让你轻松构建用于工作流的执行图。它支持构建：

- 顺序链 Sequential chains
- 并行分支 Parallel fan-outs
- 条件分支 Conditional branching
- 带安全退出条件的循环 Loops with safe exit conditions

图中的每个节点代表一个智能体，边定义了允许的执行路径。边还可以根据智能体的消息设置执行条件。

参考：https://microsoft.github.io/autogen/stable/reference/python/autogen_agentchat.teams.html#autogen_agentchat.teams.DiGraphBuilder


### Sequential Flow

我们将从创建一个简单的工作流开始：一个写作者起草一段文字，随后一个审阅者提供反馈。该流程在审阅者完成评论后结束。

注意，流程会自动计算图中的所有源节点和叶节点，执行从所有源节点开始，当没有剩余节点可执行时，流程结束。

![绘图2.png](resources\asset\绘图2.png)


In [2]:
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
from autogen_ext.models.openai import OpenAIChatCompletionClient

# 创建 OpenAI 模型客户端
client = model_client

# 创建写作智能体
writer = AssistantAgent(
    "writer",
    model_client=client,
    system_message="Draft a short paragraph on climate change."
)

# 创建审稿智能体
reviewer = AssistantAgent(
    "reviewer",
    model_client=client,
    system_message="Review the draft and suggest improvements."
)

# 构建有向图（工作流）
builder = DiGraphBuilder()
builder.add_node(writer).add_node(reviewer) # 添加智能体节点, 注意添加的时候直接输入 智能体对象即可，该方法会自动读取智能体的名称
builder.add_edge(writer, reviewer)  # writer 执行后交给 reviewer


# 构建并校验图结构
graph = builder.build()

# 创建 GraphFlow 工作流
flow = GraphFlow([writer, reviewer], graph=graph)

In [3]:
# 如果在脚本中运行，请用 asyncio.run(...) 包裹主函数
# 推荐用 Console(flow.run_stream(...)) 获得更好的控制台格式化输出

stream = flow.run_stream(task="Write a short paragraph about climate change.")
async for event in stream:  # type: ignore
    print(event)

# 或者更推荐这样写，输出更美观：
# await Console(flow.run_stream(task="Write a short paragraph about climate change."))

id='48d5c65f-d96f-4c87-ac1c-3bfb6cb24059' source='user' models_usage=None metadata={} created_at=datetime.datetime(2025, 7, 19, 8, 21, 25, 285063, tzinfo=datetime.timezone.utc) content='Write a short paragraph about climate change.' type='TextMessage'
id='2facc959-9229-4072-a2ef-a63b94766804' source='writer' models_usage=None metadata={} created_at=datetime.datetime(2025, 7, 19, 8, 21, 30, 299144, tzinfo=datetime.timezone.utc) content="\nOkay, the user wants a short paragraph on climate change. Let me start by recalling the key points. Climate change refers to long-term changes in temperature and weather patterns, primarily due to human activities like burning fossil fuels. That increases greenhouse gases, leading to global warming. I should mention the main consequences: rising temperatures, extreme weather, sea level rise, and impacts on ecosystems and human societies.\n\nWait, the user might need this for a school project or a general overview. They probably want concise and clear i

### Parallel Flow with Join

我们将创建一个稍微复杂的工作流，流程如下：

1. 写作者撰写一段文字；

2. 然后流程并行分发给两位编辑：

3. 一位负责语法修改；

4. 一位负责文风优化；

5. 最后，两位编辑的修改结果被传递给最终审阅者进行汇总与评审。

该流程从写作者节点启动，随后并行执行两个编辑节点，最后在审阅者节点完成汇总并终止。

In [4]:
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient

# 创建 OpenAI 模型客户端
client = model_client

# 创建写作智能体
writer = AssistantAgent("writer", model_client=client, system_message="Draft a short paragraph on climate change.")

# 创建两个编辑智能体
editor1 = AssistantAgent("editor1", model_client=client, system_message="Edit the paragraph for grammar.")
editor2 = AssistantAgent("editor2", model_client=client, system_message="Edit the paragraph for style.")

# 创建最终审稿智能体
final_reviewer = AssistantAgent(
    "final_reviewer",
    model_client=client,
    system_message="Consolidate the grammar and style edits into a final version.",
)

# 构建工作流图
builder = DiGraphBuilder()
builder.add_node(writer).add_node(editor1).add_node(editor2).add_node(final_reviewer)

# 从 writer 节点分发到 editor1 和 editor2（并行）
builder.add_edge(writer, editor1)
builder.add_edge(writer, editor2)

# 两个编辑节点的结果汇总到 final_reviewer
builder.add_edge(editor1, final_reviewer)
builder.add_edge(editor2, final_reviewer)

# 构建并校验图结构
graph = builder.build()

# 创建 GraphFlow 工作流
flow = GraphFlow(
    participants=builder.get_participants(),
    graph=graph,
)

# 运行工作流，并将消息流式输出到控制台
await Console(flow.run_stream(task="Write a short paragraph about climate change."))

---------- TextMessage (user) ----------
Write a short paragraph about climate change.
---------- ThoughtEvent (writer) ----------

Okay, the user asked me to draft a short paragraph on climate change. Let me start by understanding what they need. They probably want a concise overview that covers the main points without getting too technical.

First, I should mention the definition—climate change refers to long-term shifts in temperatures and weather patterns. Then, the primary cause is human activities, especially the burning of fossil fuels, which increases greenhouse gases. I need to include the effects like global warming, rising sea levels, extreme weather events, and impacts on ecosystems and human societies. It's also important to touch on the urgency and the need for global cooperation. Wait, maybe I should specify which greenhouse gases, like CO2 and methane. Also, mention things like deforestation and industrial processes as contributors. Don't forget to note the consequences

TaskResult(messages=[TextMessage(id='b2d22e5f-73cc-4991-9cc9-8290d16ac845', source='user', models_usage=None, metadata={}, created_at=datetime.datetime(2025, 7, 19, 8, 31, 30, 987966, tzinfo=datetime.timezone.utc), content='Write a short paragraph about climate change.', type='TextMessage'), ThoughtEvent(id='3e82c7f1-f9de-441b-9037-31b0325b101b', source='writer', models_usage=None, metadata={}, created_at=datetime.datetime(2025, 7, 19, 8, 31, 35, 229885, tzinfo=datetime.timezone.utc), content="\nOkay, the user asked me to draft a short paragraph on climate change. Let me start by understanding what they need. They probably want a concise overview that covers the main points without getting too technical.\n\nFirst, I should mention the definition—climate change refers to long-term shifts in temperatures and weather patterns. Then, the primary cause is human activities, especially the burning of fossil fuels, which increases greenhouse gases. I need to include the effects like global war

## Message Filtering

Execution Graph vs. Message Graph

在 `GraphFlow` 中，执行图是通过 `DiGraph` 定义的，用于控制各个智能体的执行顺序。然而，执行图**并不控制**一个智能体从其他智能体接收哪些消息。默认情况下，图中的**所有消息**都会被发送给**所有智能体**。

**消息过滤**是一个独立的功能，它允许你为每个智能体筛选其接收的消息，从而使其模型上下文中仅包含**相关信息**。一组消息过滤规则共同定义了流程中的**消息图**。

指定消息图有助于：

* 减少幻觉（hallucinations）
* 控制上下文内存负载
* 使智能体聚焦于与其任务相关的信息

> 简单来说，执行图 Execution Graph 是定义工作流执行规则，消息图 Message Graph 定义了工作流的消息传递规则。

你可以使用 `MessageFilterAgent`，结合 `MessageFilterConfig` 和 `PerSourceFilter` 来定义这些规则。


下面是一个消息图的示例

![绘图1.png](resources\asset\绘图1.png)

In [7]:
from autogen_agentchat.agents import AssistantAgent, MessageFilterAgent, MessageFilterConfig, PerSourceFilter
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient

# 创建模型客户端
client = model_client

# 创建智能体
researcher = AssistantAgent(
    "researcher", model_client=client, system_message="Summarize key facts about climate change."
)
analyst = AssistantAgent("analyst", model_client=client, system_message="Review the summary and suggest improvements.")
presenter = AssistantAgent(
    "presenter", model_client=client, system_message="Prepare a presentation slide based on the final summary."
)

# 应用消息过滤器：只让 analyst 看到 researcher 的最后一条消息
filtered_analyst = MessageFilterAgent(
    name="analyst",
    wrapped_agent=analyst,
    filter=MessageFilterConfig(per_source=[PerSourceFilter(source="researcher", position="last", count=1)]),
)

# 只让 presenter 看到 analyst 的最后一条消息
filtered_presenter = MessageFilterAgent(
    name="presenter",
    wrapped_agent=presenter,
    filter=MessageFilterConfig(per_source=[PerSourceFilter(source="analyst", position="last", count=1)]),
)

# 构建工作流图
builder = DiGraphBuilder()
builder.add_node(researcher).add_node(filtered_analyst).add_node(filtered_presenter)
builder.add_edge(researcher, filtered_analyst).add_edge(filtered_analyst, filtered_presenter)

# 创建 GraphFlow 工作流
flow = GraphFlow(
    participants=builder.get_participants(),
    graph=builder.build(),
)

# 运行工作流，并将消息流式输出到控制台
await Console(flow.run_stream(task="Summarize key facts about climate change."))

---------- TextMessage (user) ----------
Summarize key facts about climate change.
---------- ThoughtEvent (researcher) ----------

Okay, so I need to summarize key facts about climate change. Let me start by recalling what I know. Climate change refers to long-term changes in temperature and weather patterns, right? It's mostly caused by human activities, especially the burning of fossil fuels which releases greenhouse gases. Carbon dioxide is a big one, along with methane and nitrous oxide. These gases trap heat in the atmosphere, leading to the greenhouse effect.

I remember that the Earth's average temperature has been rising. Scientists have data from ice cores, temperature records, and satellites. The past few decades have seen significant increases, and there's a lot of consensus that this is mainly due to human activities. The Intergovernmental Panel on Climate Change (IPCC) reports are important here, but I need to check if that's accurate.

Then there's the impact on ecosyste



## Advanced Example: Conditional Loop + Filtered Summary

本示例综合展示了以下内容的结合：

- **条件循环(Conditional Loop)：** 在 `generator` 和 `reviewer` 之间的一个循环（当 `reviewer` 回复“APPROVE”时退出）

- **过滤摘要(iltered Summary)：** 一个 `summarizer` 智能体，它仅接收用户的首次输入和 `reviewer` 的最后一条消息

In [None]:
from autogen_agentchat.agents import AssistantAgent, MessageFilterAgent, MessageFilterConfig, PerSourceFilter
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient
from typing import Dict

# 定义全局状态存储迭代次数
workflow_state: Dict[str, int] = {"iteration": 0}

# 定义迭代次数检查函数
def check_iteration_limit(msg):
    workflow_state["iteration"] += 1
    return workflow_state["iteration"] <= 2  # 最大迭代5次
    
# 初始化模型客户端
model_client = model_client

# 创建各角色智能体
generator = AssistantAgent(
    "generator",
    model_client=model_client,
    system_message="Generate a list of creative ideas."
)
reviewer = AssistantAgent(
    "reviewer",
    model_client=model_client,
    system_message="Review ideas and provide feedbacks, or just 'APPROVE' for final approval.",
)
summarizer_core = AssistantAgent(
    "summary",
    model_client=model_client,
    system_message="Summarize the user request and the final feedback."
)

# 创建带消息过滤的 summary 智能体，只看 user 的首条消息和 reviewer 的最后一条消息
filtered_summarizer = MessageFilterAgent(
    name="summary",
    wrapped_agent=summarizer_core,
    filter=MessageFilterConfig(
        per_source=[
            PerSourceFilter(source="user", position="first", count=1),
            PerSourceFilter(source="reviewer", position="last", count=1),
        ]
    ),
)

# 构建带条件循环的工作流图
builder = DiGraphBuilder()
builder.add_node(generator).add_node(reviewer).add_node(filtered_summarizer)
builder.add_edge(generator, reviewer)
# 如果 reviewer 回复中包含 "APPROVE"，则进入 summary，否则回到 generator 继续生成
builder.add_edge(reviewer, filtered_summarizer, condition='"APPROVE" in msg.to_model_text()')
builder.add_edge(reviewer, generator, condition='"APPROVE" not in msg.to_model_text() and check_iteration_limit(msg)')
builder.set_entry_point(generator)  # 显式设置入口节点

graph = builder.build()

# 创建 GraphFlow 工作流
flow = GraphFlow(
    participants=builder.get_participants(),
    graph=graph,
)

# 运行工作流，并将消息流式输出到控制台
await Console(flow.run_stream(task="Brainstorm ways to reduce plastic waste."))

# (Todo)

`builder.set_entry_point(agent)` 当图的执行顺序不好直接看出来的时候，可以通过这个方法设置图的入口点，然后通过 `builder.run()` 运行图。即，入口可能是个循环。


In [8]:
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_agentchat.agents import AssistantAgent, MessageFilterAgent, MessageFilterConfig, PerSourceFilter
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
from autogen_agentchat.ui import Console
from typing import Dict

writer = AssistantAgent("writer", model_client=model_client, system_message="你是一个专业的ai写作家，你对于科普描述ai有很多的经验。")
reviewer = AssistantAgent("reviewer", model_client=model_client, system_message="请审阅并提出改进建议。")

builder = DiGraphBuilder()
builder.add_node(writer).add_node(reviewer)
builder.add_edge(writer, reviewer)
builder.set_entry_point("writer")  # 从 writer 开始执行流程

graph = builder.build()
flow = GraphFlow([writer, reviewer], graph=graph)

await Console(flow.run_stream(task="请写一段关于auto的短文，字数在一百个字以内"))


---------- TextMessage (user) ----------
请写一段关于auto的短文，字数在一百个字以内
---------- ThoughtEvent (writer) ----------

好的，用户让我写一段关于“auto”的短文，字数在一百个字以内。首先，我需要确定用户所说的“auto”具体指的是什么。Auto这个词有多种含义，可能指汽车、自动化，或者某个特定品牌名称。

结合用户提到的科普描述AI的经验，可能他们想了解的是与AI相关的自动化技术。比如自动驾驶汽车，也就是Auto在自动驾驶领域的应用。但不确定，所以可能需要考虑其他可能性。

如果用户需要的是关于汽车的短文，可能涉及历史、技术发展等。但考虑到AI背景，更可能是指自动化，特别是自动驾驶。常见的是特斯拉、Waymo等公司的自动驾驶技术，或者更一般的自动化概念。

另一个角度是“auto”作为前缀，比如“auto-”代表自动，可以解释自动化技术如何赋能各个领域，如工业、医疗、交通等，推动效率提升和创新。

需要保持简洁，在一百字以内。要确保信息准确，涵盖关键点，比如历史背景、技术应用、对社会的影响。同时语言要通俗易懂，适合科普读者。

可能的结构：定义auto，提到汽车和自动化，简述发展历史，举例应用，最后总结影响。注意不要超过字数，可能需要删减细节，比如具体例子可能只提一个，或者概括说明。

检查是否有歧义，确保用户的需求被满足。如果“auto”指代其他内容，可能需要进一步询问，但当前信息有限，所以选择最可能的自动化方向。

---------- TextMessage (writer) ----------


**Auto：自动化之魂**  
Auto，源自希腊语“autos”（自我），象征自主与高效。从19世纪蒸汽机车到现代自动驾驶，它推动人类交通革命。如今，Auto技术融合AI与传感器，实现车辆自主感知、决策与操控，重塑出行方式。从工业流水线到智能家居，Auto赋能万物，让科技更贴近生活。它是机械的智慧，更是未来移动的钥匙。
---------- ThoughtEvent (reviewer) ----------

好的，用户让我写一段关于auto的短文，字数在一百字以内。首先，我需要明确“auto”在这里指的是什么。可能用户是指“自动驾驶”（autonomou

TaskResult(messages=[TextMessage(id='2b91697a-30e4-435c-a374-a5f2f909b3f6', source='user', models_usage=None, metadata={}, created_at=datetime.datetime(2025, 7, 19, 9, 36, 41, 393298, tzinfo=datetime.timezone.utc), content='请写一段关于auto的短文，字数在一百个字以内', type='TextMessage'), ThoughtEvent(id='0807864d-e41f-4945-9f48-5f6c3065f97c', source='writer', models_usage=None, metadata={}, created_at=datetime.datetime(2025, 7, 19, 9, 36, 46, 618522, tzinfo=datetime.timezone.utc), content='\n好的，用户让我写一段关于“auto”的短文，字数在一百个字以内。首先，我需要确定用户所说的“auto”具体指的是什么。Auto这个词有多种含义，可能指汽车、自动化，或者某个特定品牌名称。\n\n结合用户提到的科普描述AI的经验，可能他们想了解的是与AI相关的自动化技术。比如自动驾驶汽车，也就是Auto在自动驾驶领域的应用。但不确定，所以可能需要考虑其他可能性。\n\n如果用户需要的是关于汽车的短文，可能涉及历史、技术发展等。但考虑到AI背景，更可能是指自动化，特别是自动驾驶。常见的是特斯拉、Waymo等公司的自动驾驶技术，或者更一般的自动化概念。\n\n另一个角度是“auto”作为前缀，比如“auto-”代表自动，可以解释自动化技术如何赋能各个领域，如工业、医疗、交通等，推动效率提升和创新。\n\n需要保持简洁，在一百字以内。要确保信息准确，涵盖关键点，比如历史背景、技术应用、对社会的影响。同时语言要通俗易懂，适合科普读者。\n\n可能的结构：定义auto，提到汽车和自动化，简述发展历史，举例应用，最后总结影响。注意不要超过字数，可能需要删减细节，比如具体例子可能只提一个，或者概括说明。\n\n检查是否有歧义，确保用户的需求被

`builder.get_participants()`  
返回一个builder中的所有参与的agent。***返回值是形如<autogen_agentchat.agents._assistant_agent.AssistantAgent object at 0x106af1e40>的对象，我们可以通过`.name`来得到每个agent的名字***


In [None]:
participants = builder.get_participants()
for agent in participants:
    print(agent.name)

writer
reviewer



`add_conditional_edges(source, condition_to_target)`

- 复杂的环形执行图
- 并行的复杂环形图
- 一些常用的工作流代码

In [None]:
###伪代码：
def decide_next(state):
    # 根据状态中 last tool call 决定走哪条分支
    return "tools" if state["messages"][-1].tool_calls else END

builder = DiGraphBuilder()
builder.add_node("agent", call_model_fn)
builder.add_node("tools", tool_node)
builder.set_entry_point("agent")

builder.add_conditional_edges("agent", decide_next)
builder.add_edge("tools", "agent")

## Reference


1. https://microsoft.github.io/autogen/stable/user-guide/agentchat-user-guide/graph-flow.html
2. https://microsoft.github.io/autogen/stable/reference/python/autogen_agentchat.teams.html#autogen_agentchat.teams.GraphFlow