## Orchestrator-Workers Workflow

This notebook demonstrates the orchestrator-workers workflow, where a central LLM dynamically breaks down tasks, delegates them to worker LLMs, and synthesizes their results. This creates a flexible system that can adapt its approach based on the specific requirements of each task.

### When to use this workflow

This workflow is well-suited for complex tasks where you can't predict the subtasks needed. The key difference from simple parallelization is its flexibility—subtasks aren't pre-defined, but determined by the orchestrator based on the specific input.

### Examples where this workflow is useful

- Content Generation: Creating multiple versions of content tailored to different audiences or platforms
- Data Analysis: Breaking down complex analysis tasks into specific investigative angles
- Problem Solving: Approaching complex problems from multiple perspectives
- Document Processing: Analyzing different aspects of documents (style, content, structure) in parallel

![](https://www.anthropic.com/_next/image?url=https%3A%2F%2Fwww-cdn.anthropic.com%2Fimages%2F4zrzovbb%2Fwebsite%2F8985fc683fae4780fb34eab1365ab78c7e51bc8e-2401x1000.png&w=3840&q=75)

In [None]:
%pip install boto3 litellm aiohttp --quiet --upgrade

## Setup - Configure the LLM to use Amazon Bedrock

To simplify things, we're going to use LiteLLM rather than Boto3.

In [None]:
from litellm import completion
import re
from typing import Dict, List, Optional
import boto3


def llm_call(prompt: str, system_prompt: str = "") -> str:
    """
    Calls the model with the given prompt and returns the response.

    Args:
        prompt (str): The user prompt to send to the model.
        system_prompt (str, optional): The system prompt to send to the model. Defaults to "".

    Returns:
        str: The response from the language model.
    """
    messages = []
    if system_prompt:
        messages.append({"role": "system", "content": system_prompt})
    messages.append({"role": "user", "content": prompt})
    
    response = completion(
        model="bedrock/us.amazon.nova-pro-v1:0",
        aws_region_name=boto3.Session().region_name,
        aws_profile_name="default",
        messages=messages,
        max_tokens=4096,
        temperature=0.1
    )
    return response.choices[0].message.content

def extract_xml(text: str, tag: str) -> str:
    """
    Extracts the content of the specified XML tag from the given text.
    Used for parsing structured responses.

    Args:
        text (str): The text containing the XML.
        tag (str): The XML tag to extract content from.

    Returns:
        str: The content of the specified XML tag, or an empty string if the tag is not found.
    """
    match = re.search(f'<{tag}>(.*?)</{tag}>', text, re.DOTALL)
    return match.group(1) if match else ""

## Implementation

In [2]:
def parse_tasks(tasks_xml: str) -> List[Dict]:
    """Parse XML tasks into a list of task dictionaries."""
    tasks = []
    current_task = {}
    
    for line in tasks_xml.split('\n'):
        line = line.strip()
        if not line:
            continue
            
        if line.startswith("<task>"):
            current_task = {}
        elif line.startswith("<type>"):
            current_task["type"] = line[6:-7].strip()
        elif line.startswith("<description>"):
            current_task["description"] = line[12:-13].strip()
        elif line.startswith("</task>"):
            if "description" in current_task:
                if "type" not in current_task:
                    current_task["type"] = "default"
                tasks.append(current_task)
    
    return tasks

class FlexibleOrchestrator:
    """Break down tasks and run them in parallel using worker LLMs."""
    
    def __init__(
        self,
        orchestrator_prompt: str,
        worker_prompt: str,
    ):
        """Initialize with prompt templates."""
        self.orchestrator_prompt = orchestrator_prompt
        self.worker_prompt = worker_prompt

    def _format_prompt(self, template: str, **kwargs) -> str:
        """Format a prompt template with variables."""
        try:
            return template.format(**kwargs)
        except KeyError as e:
            raise ValueError(f"Missing required prompt variable: {e}")

    def process(self, task: str, context: Optional[Dict] = None) -> Dict:
        """Process task by breaking it down and running subtasks in parallel."""
        context = context or {}
        
        # Step 1: Get orchestrator response
        orchestrator_input = self._format_prompt(
            self.orchestrator_prompt,
            task=task,
            **context
        )
        orchestrator_response = llm_call(orchestrator_input)
        
        # Parse orchestrator response
        analysis = extract_xml(orchestrator_response, "analysis")
        tasks_xml = extract_xml(orchestrator_response, "tasks")
        tasks = parse_tasks(tasks_xml)
        
        print("\n=== ORCHESTRATOR OUTPUT ===")
        print(f"\nANALYSIS:\n{analysis}")
        print(f"\nTASKS:\n{tasks}")
        
        # Step 2: Process each task
        worker_results = []
        for task_info in tasks:
            worker_input = self._format_prompt(
                self.worker_prompt,
                original_task=task,
                task_type=task_info['type'],
                task_description=task_info['description'],
                **context
            )
            
            worker_response = llm_call(worker_input)
            result = extract_xml(worker_response, "response")
            
            worker_results.append({
                "type": task_info["type"],
                "description": task_info["description"],
                "result": result
            })
            
            print(f"\n=== WORKER RESULT ({task_info['type']}) ===\n{result}\n")
        
        return {
            "analysis": analysis,
            "worker_results": worker_results,
        }

## Example 1: Marketing Variation Generation

In [3]:
ORCHESTRATOR_PROMPT = """
Analyze this task and break it down into 2-3 distinct approaches:

Task: {task}

Return your response in this format:

<analysis>
Explain your understanding of the task and which variations would be valuable.
Focus on how each approach serves different aspects of the task.
</analysis>

<tasks>
    <task>
    <type>formal</type>
    <description>Write a precise, technical version that emphasizes specifications</description>
    </task>
    <task>
    <type>conversational</type>
    <description>Write an engaging, friendly version that connects with readers</description>
    </task>
</tasks>
"""

WORKER_PROMPT = """
Generate content based on:
Task: {original_task}
Style: {task_type}
Guidelines: {task_description}

Return your response in this format:

<response>
Your content here, maintaining the specified style and fully addressing requirements.
</response>
"""

orchestrator = FlexibleOrchestrator(
    orchestrator_prompt=ORCHESTRATOR_PROMPT,
    worker_prompt=WORKER_PROMPT,
)

results = orchestrator.process(
    task="Write a product description for a new eco-friendly water bottle",
    context={
        "target_audience": "environmentally conscious millennials",
        "key_features": ["plastic-free", "insulated", "lifetime warranty"]
    }
)


=== ORCHESTRATOR OUTPUT ===

ANALYSIS:

The task requires crafting a product description for a new eco-friendly water bottle. To cater to different audiences and purposes, it's valuable to approach this task from multiple angles. A formal, technical description would appeal to consumers who prioritize detailed specifications and environmental impact data. On the other hand, a conversational, friendly description would resonate with a broader audience, fostering an emotional connection and encouraging engagement. Both approaches are valuable as they target different consumer preferences and decision-making processes.


TASKS:
[{'type': 'formal', 'description': '>Write a precise, technical version that emphasizes specifications, materials, and environmental impact.<'}, {'type': 'conversational', 'description': '>Write an engaging, friendly version that highlights the benefits and eco-friendly features in a relatable manner.<'}]

=== WORKER RESULT (formal) ===

Introducing the EcoHydrate

## Example 2: Technical Documentation Generation

In [None]:
TECH_ORCHESTRATOR_PROMPT = """
Analyze this technical documentation task and break it down into distinct sections:

Task: {task}

Return your response in this format:

<analysis>
Explain how you'll divide the documentation into focused sections that serve different user needs.
</analysis>

<tasks>
    <task>
    <type>overview</type>
    <description>Write a high-level introduction and system overview</description>
    </task>
    <task>
    <type>technical</type>
    <description>Provide detailed technical specifications and implementation details</description>
    </task>
    <task>
    <type>tutorial</type>
    <description>Create a step-by-step tutorial with practical examples</description>
    </task>
</tasks>
"""

TECH_WORKER_PROMPT = """
Generate documentation content based on:
Task: {original_task}
Section Type: {task_type}
Guidelines: {task_description}

Return your response in this format:

<response>
Your documentation content here, following the section type guidelines.
</response>
"""

tech_orchestrator = FlexibleOrchestrator(
    orchestrator_prompt=TECH_ORCHESTRATOR_PROMPT,
    worker_prompt=TECH_WORKER_PROMPT,
)

tech_results = tech_orchestrator.process(
    task="Create documentation for a new REST API endpoint that handles user authentication",
    context={
        "api_version": "v2",
        "auth_methods": ["JWT", "OAuth2"],
        "rate_limits": "100 requests per minute"
    }
)