# Orchestrator-Workers 工作流程

## 简介

您是否曾经在同一任务上需要多个视角，但无法提前预测哪些视角最有价值？Orchestrator-worker模式通过让 中央 LLM 分析每个独特的任务并动态确定最佳子任务来委托给专门的 worker LLM 来解决这个问题。
传统方法要么需要多次手动提示，要么使用硬编码并行化，无论上下文如何都会生成相同的变化。

通过这种方法，Orchestrator-LLM 分析任务，确定哪些变体对于该特定案例最有价值，然后委托给生成每个变体的 worker-LLM。

### 你将构建什么

一个接受产品描述请求的系统，并且：

1. 分析哪些类型的营销文案有价值
2. 为工人动态生成专门的任务描述
3. 制作针对不同受众优化的多种内容变体
4. 返回所有worker的协调结果

### 先决条件

- Python 3.9 或更高版本
- Anthropic API 密钥设置为环境变量：`export ANTHROPIC_API_KEY='your-key'`
- 对即时工程的基本了解
- 熟悉Python类和类型提示


### 何时使用此工作流程
此工作流程非常适合您无法提前预测所需子任务的复杂任务。与简单并行化的主要区别在于其灵活性——子任务不是预先定义的，而是由编排器根据特定输入确定。

**在以下情况下使用此模式：**

- 任务需要多种不同的方法或观点
- 最佳子任务取决于具体输入
- 你需要比较不同的策略或风格

**在以下情况下不要使用此模式：**

- 您有简单的单输出任务（不必要的复杂性）
- 延迟至关重要（多个 LLM 调用会增加开销）
- 子任务是可预测的并且可以预先定义（使用更简单的并行化）

## 它是如何工作的

Orchestrator-Worker 模式分两个阶段运行：

1. **分析和规划阶段**：Orchestrator-LLM 接收任务和上下文，分析哪些方法有价值，并生成 XML 格式的结构化子任务描述。
2. **执行阶段**：每个 worker-LLM 收到：
   - 上下文的原始任务
   - 其具体子任务类型和描述
   - 提供的任何其他上下文

编排器决定“在运行时”创建哪些子任务，使其比预定义的并行工作流程更具适应性。

## Setup

### 安装依赖
```bash
pip install anthropic
```

### Helper Functions
This example uses helper functions from `util.py` for making LLM calls and parsing XML responses:

- `llm_call(prompt, system_prompt="", model="claude-sonnet-4-5")`: Sends a prompt to Claude and returns the text response
- `extract_xml(text, tag)`: Extracts content from XML tags using regex

## 实现

The `FlexibleOrchestrator` class coordinates the two-phase workflow:

**关键设计决策：**
- 提示是接受运行时变量（“任务”、“上下文”）以实现灵活性的模板
- XML 用于结构化输出解析（可靠且语言模型友好的格式）
- 工作人员会收到原始任务和具体指示，以获得更好的背景信息
- 错误处理验证工作人员返回非空响应

**实现包括：**
- parse_tasks()：将协调器的 XML 输出解析为结构化的任务字典
- FlexibleOrchestrator.process()：主要协调逻辑，调用协调器，然后调用工作者
- 响应验证，用于捕获并处理空的工作者输出

In [None]:
from patterns.agents.util import extract_xml, llm_call

# Model configuration
MODEL = "google/gemini-3-flash-preview"  # Fast, capable model for both orchestrator and workers


def parse_tasks(tasks_xml: str) -> list[dict]:
    """Parse XML tasks into a list of task dictionaries."""
    tasks = []
    current_task = {}

    for line in tasks_xml.split("\n"):
        line = line.strip()
        if not line:
            continue

        if line.startswith("<task>"):
            current_task = {}
        elif line.startswith("<type>"):
            current_task["type"] = line[6:-7].strip()
        elif line.startswith("<description>"):
            current_task["description"] = line[12:-13].strip()
        elif line.startswith("</task>"):
            if "description" in current_task:
                if "type" not in current_task:
                    current_task["type"] = "default"
                tasks.append(current_task)

    return tasks


class FlexibleOrchestrator:
    """Break down tasks and run them in parallel using worker LLMs."""

    def __init__(
        self,
        orchestrator_prompt: str,
        worker_prompt: str,
        model: str = MODEL,
    ):
        """Initialize with prompt templates and model selection."""
        self.orchestrator_prompt = orchestrator_prompt
        self.worker_prompt = worker_prompt
        self.model = model

    def _format_prompt(self, template: str, **kwargs) -> str:
        """Format a prompt template with variables."""
        try:
            return template.format(**kwargs)
        except KeyError as e:
            raise ValueError(f"Missing required prompt variable: {e}") from e

    def process(self, task: str, context: dict | None = None) -> dict:
        """Process task by breaking it down and running subtasks in parallel."""
        context = context or {}

        # Step 1: Get orchestrator response
        orchestrator_input = self._format_prompt(self.orchestrator_prompt, task=task, **context)
        orchestrator_response = llm_call(orchestrator_input, model=self.model)

        # Parse orchestrator response
        analysis = extract_xml(orchestrator_response, "analysis")
        tasks_xml = extract_xml(orchestrator_response, "tasks")
        tasks = parse_tasks(tasks_xml)

        print("\n" + "=" * 80)
        print("ORCHESTRATOR ANALYSIS")
        print("=" * 80)
        print(f"\n{analysis}\n")

        print("\n" + "=" * 80)
        print(f"IDENTIFIED {len(tasks)} APPROACHES")
        print("=" * 80)
        for i, task_info in enumerate(tasks, 1):
            print(f"\n{i}. {task_info['type'].upper()}")
            print(f"   {task_info['description']}")

        print("\n" + "=" * 80)
        print("GENERATING CONTENT")
        print("=" * 80 + "\n")

        # Step 2: Process each task
        worker_results = []
        for i, task_info in enumerate(tasks, 1):
            print(f"[{i}/{len(tasks)}] Processing: {task_info['type']}...")

            worker_input = self._format_prompt(
                self.worker_prompt,
                original_task=task,
                task_type=task_info["type"],
                task_description=task_info["description"],
                **context,
            )

            worker_response = llm_call(worker_input, model=self.model)
            worker_content = extract_xml(worker_response, "response")

            # Validate worker response - handle empty outputs
            if not worker_content or not worker_content.strip():
                print(f"⚠️  Warning: Worker '{task_info['type']}' returned no content")
                worker_content = f"[Error: Worker '{task_info['type']}' failed to generate content]"

            worker_results.append(
                {
                    "type": task_info["type"],
                    "description": task_info["description"],
                    "result": worker_content,
                }
            )

        # Display results
        print("\n" + "=" * 80)
        print("RESULTS")
        print("=" * 80)
        for i, result in enumerate(worker_results, 1):
            print(f"\n{'-' * 80}")
            print(f"Approach {i}: {result['type'].upper()}")
            print(f"{'-' * 80}")
            print(f"\n{result['result']}\n")

        return {
            "analysis": analysis,
            "worker_results": worker_results,
        }

## 示例用例：营销变体生成

现在，让我们通过一个实际示例来了解 Orchestrator-Worker 模式的实际应用：为产品生成多种风格的营销文案。

**为什么这个例子很好地展示了该模式：**

- 不同的产品受益于不同的营销角度
- “最佳”变体取决于具体的产品功能和目标受众
- Orchestrator 可以根据输入调整其策略，而不是使用固定的模板

**提示设计说明：**

- Orchestrator 提示要求 2-3 种方法并提供 XML 结构指导
- Worker prompts 为 worker 提供完整的背景信息（原始任务、他们的风格和指南）
- 两个提示都使用清晰的 XML 格式以确保可靠的解析

In [4]:
ORCHESTRATOR_PROMPT = """
Your answer should in CHINESE.

Analyze this task and break it down into 2-3 distinct approaches:

Task: {task}

Return your response in this format:

<analysis>
Explain your understanding of the task and which variations would be valuable.
Focus on how each approach serves different aspects of the task.
</analysis>

<tasks>
    <task>
    <type>formal</type>
    <description>Write a precise, technical version that emphasizes specifications</description>
    </task>
    <task>
    <type>conversational</type>
    <description>Write an engaging, friendly version that connects with readers</description>
    </task>
</tasks>
"""

WORKER_PROMPT = """
Generate content based on:
Task: {original_task}
Style: {task_type}
Guidelines: {task_description}

Return your response in this format:

<response>
Your content here, maintaining the specified style and fully addressing requirements.
</response>
"""


orchestrator = FlexibleOrchestrator(
    orchestrator_prompt=ORCHESTRATOR_PROMPT,
    worker_prompt=WORKER_PROMPT,
)

results = orchestrator.process(
    task="Write a product description for a new eco-friendly water bottle",
    context={
        "target_audience": "environmentally conscious millennials",
        "key_features": ["plastic-free", "insulated", "lifetime warranty"],
    },
)


ORCHESTRATOR ANALYSIS


这项任务的核心是为一款新型环保水瓶撰写产品描述。为了有效地推向市场，我们需要针对不同的受众心理和购物场景制定策略。

第一种变体应侧重于“理性说服”，通过具体的环保参数和材质规格来建立信任感，适合那些对产品质量和可持续性有严格要求的专业消费者。
第二种变体应侧重于“感性连接”，将产品融入日常生活场景，强调使用体验和品牌价值观，适合社交媒体推广或日常消费群体。
第三种变体（可选）可以侧重于“极简主义”，用精炼的语言突出核心卖点，适合移动端快速浏览。

通过这几种不同的切入点，我们可以覆盖从技术控到生活方式追求者的全方位受众。



IDENTIFIED 3 APPROACHES

1. 专业技术型 (FORMAL/TECHNICAL)
   >撰写一个精确且专业的版本，重点强调材质规格（如不含BPA、可回收材料比例）、保温性能参数以及耐用性测试数据，以体现产品的卓越品质和环保标准。<

2. 生活对话型 (CONVERSATIONAL/LIFESTYLE)
   >撰写一个亲切且具吸引力的版本，通过描述日常使用场景（如健身、办公、户外旅行）来引起读者共鸣，强调产品如何提升生活品质并减少塑料足迹。<

3. 极简卖点型 (MINIMALIST/PUNCHY)
   >采用精炼的短句和排版，直接列出核心优势（如“轻量化”、“零渗漏”、“终身保修”），旨在快速抓住快节奏消费者的注意力。<

GENERATING CONTENT

[1/3] Processing: 专业技术型 (Formal/Technical)...
[2/3] Processing: 生活对话型 (Conversational/Lifestyle)...
[3/3] Processing: 极简卖点型 (Minimalist/Punchy)...

RESULTS

--------------------------------------------------------------------------------
Approach 1: 专业技术型 (FORMAL/TECHNICAL)
-------------------------------------------------------------------------------

## 总结

您现在已经实现了一个 Orchestrator-Workers 模式，该模式可以根据特定输入动态调整其任务分解。这种模式生成了多个营销文案变体，每个变体都针对不同的受众和环境量身定制，而无需您预先定义这些变体应该是什么。

### 要点

**模式优点：**
- **适应性**：编排器确定每个独特输入的最佳方法
- **灵活性**：通过更改提示轻松应用于不同的域
- **结构化协调**：基于XML的通信确保可靠的解析
- **错误恢复**：验证捕获并处理工作人员故障

**当这种模式表现出色时：**
- 内容生成需要多个视角（营销、文档、创意写作）
- 受益于不同分析镜头的分析任务
- 分解策略取决于问题的问题解决

### 限制和注意事项

**成本和延迟：**
- 需要 N+1 个 LLM 通话（1 个协调器 + N 个工作人员）
- 此实现中的顺序处理（工作线程一次运行一个）
- 为了获得更好的性能，请考虑使用“asyncio”或线程池并行工作线程调用

**何时不使用此模式：**
- 具有单一、清晰输出的简单任务（增加的复杂性是不合理的）
- 延迟关键型应用程序（多个 API 调用会增加开销）
- 子任务始终相同的任务（改为使用预定义的并行化）

**要考虑的故障模式：**
- Orchestrator 可能无法以最佳方式分解任务（及时的工程设计至关重要）
- 工作人员可能会返回空的或格式错误的响应（我们通过验证来处理此问题）
- 如果模型不完全遵循格式，XML 解析可能会失败（考虑使用 JSON 作为替代方案）

### 后续步骤

**增强此实施：**
1. 使用 `asyncio` 添加并行工作执行以获得更好的性能
2. 为失败的 worker 实现重试逻辑
3. 添加一个综合阶段，其中 LLM 结合了工作人员的输出
4. 尝试不同的协调器策略（例如，要求更多/更少的子任务）

**适应您的用例：**
- 修改协调器提示以指导您域的任务分解
- 调整工作人员提示以提供特定于域的说明
- 添加与您的应用程序相关的上下文参数
- 考虑使用 Claude Opus 作为协调器，使用 Claude Haiku 作为工作人员，以优化成本与质量