## Workflow: **Orchestrator-workers**

This workflow begins with an LLM breaking down the task into subtasks that are dynamically determined based on the input. 

These subtasks are then processed in parallel by multiple worker LLMs. Finally, the orchestrator LLM synthesizes the workers' outputs into the final result.


<img src="../images/orchestrator-workers.webp" alt="Orchestrator Workers" width="500">

## **Use Cases:**

- Breaking down a coding problem into subtasks, using an LLM to generate code for each subtask, and making a final LLM call to combine the results into a complete solution.
- Searching for data across multiple sources, using an LLM to identify relevant sources, and synthesizing the findings into a cohesive answer.
- Creating a tutorial by splitting each section into subtasks like writing an introduction, outlining steps, and generating examples. Worker LLMs handle each part, and the orchestrator combines them into a polished final document.
- Dividing a data analysis task into subtasks like cleaning the data, identifying trends, and generating visualizations. Each step is handled by separate worker LLMs, and the orchestrator integrates their findings into a complete analytical report.

In [None]:
%pip install openai pydantic nest-asyncio --upgrade

In [1]:
import nest_asyncio
nest_asyncio.apply()

In [3]:
import asyncio
import os
from typing import List
from pydantic import BaseModel, Field
from openai import AsyncOpenAI

In [None]:
os.environ["OPENAI_API_KEY"] = "sk-proj-xxxx"

In [26]:
client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
MODEL="gpt-4o",

In [27]:
class Subtask(BaseModel):
    name: str = Field(..., description="Name of the subtask")
    description: str = Field(..., description="Description of the subtask")

class OrchestratorOutput(BaseModel):
    objective: str = Field(..., description="Summary of the coding task")
    subtasks: List[Subtask] = Field(..., description="List of subtasks needed to solve the coding task")

We define three main prompts:
 1. **`ORCHESTRATOR_PROMPT`**: Tells the LLM to analyse the coding problem and break it down into subtasks in JSON format.
 2. **`WORKER_PROMPT`**: Instructs the LLM worker to produce code for a given subtask.
3. **`AGGREGATOR_PROMPT`**: Instructs a final LLM call to merge all worker outputs into a single solution.


In [28]:
ORCHESTRATOR_PROMPT = """
You are a skilled software architect. 
Read the coding problem below and:
1. Summarise the objective in your own words.
2. Identify 2-3 essential subtasks required to solve the problem.
3. Provide your answer in JSON format with fields:
   - objective (string)
   - subtasks (an array of objects, each having "name" and "description")

Coding Problem:
{problem}

Return only valid JSON. Do not include any additional text.
"""

WORKER_PROMPT = """
You are a seasoned software engineer. 
Here is your subtask for the larger coding problem:
Subtask Name: {name}
Subtask Description: {description}

Write the Python code that accomplishes this subtask. 
Return only your code without any Markdown formatting or additional explanation.
"""

AGGREGATOR_PROMPT = """
You are an experienced integrator of code. 
We have code snippets from different subtasks. 
Your job is to assemble them into a cohesive, working solution that solves the original coding problem in its entirety.

Subtasks Code:
{subtasks_code}

Combine these snippets into a final, complete Python solution. 
You can reorder or modify the code slightly if needed for correct integration. 
Return only the combined code without any Markdown or extra commentary.
"""


In [29]:
async def call_orchestrator(problem: str, model: str = "gpt-4o") -> OrchestratorOutput:
    """
    Calls the orchestrator LLM to break down the coding task into subtasks.
    Expects a valid JSON response that matches OrchestratorOutput structure.
    """
    prompt = ORCHESTRATOR_PROMPT.format(problem=problem)
    
    response = await client.beta.chat.completions.parse(
        model=model,
        messages=[{"role": "user", "content": prompt}],
        temperature=0.0,
        max_tokens=500,
        response_format=OrchestratorOutput
    )
    if not response.choices[0].message.content or not response.choices[0].message.parsed:
        raise ValueError("Failed to parse orchestrator JSON.")
    else:
        return response.choices[0].message.parsed

async def call_worker(name: str, description: str, model: str = "gpt-4") -> str:
    """
    Calls a worker LLM to produce code for a given subtask.
    Returns only the code as a string.
    """
    prompt = WORKER_PROMPT.format(name=name, description=description)
    
    response = await client.chat.completions.create(
        model=model,
        messages=[{"role": "user", "content": prompt}],
        temperature=0.2,
        max_tokens=1000
    )
    if not response.choices[0].message.content:
        raise ValueError("Failed to parse worker code.")
    else:
        return response.choices[0].message.content

async def call_aggregator(subtasks_code: str, model: str = "gpt-4") -> str:
    """
    Calls the aggregator LLM to merge multiple code snippets into a final solution.
    """
    prompt = AGGREGATOR_PROMPT.format(subtasks_code=subtasks_code)
    
    response = await client.chat.completions.create(
        model=model,
        messages=[{"role": "user", "content": prompt}],
        temperature=0.0,
        max_tokens=1500
    )
    
    if not response.choices[0].message.content:
        raise ValueError("Failed to parse aggregator code.")
    else:
        return response.choices[0].message.content

---

### Orchestrator-Workers Flow
 
We use the above helper functions in a coordinated workflow:
1. **Orchestrator**: Breaks the main problem into subtasks.
2. **Parallel Workers**: Each subtask is handled by a separate LLM call in parallel.
3. **Aggregator**: Combines the results into the final code.

In [30]:
async def orchestrator_workers_flow(problem: str) -> str:
    """
    Full orchestrator-workers flow:
    1. Call the orchestrator to extract subtasks.
    2. For each subtask, call a worker LLM in parallel to generate code.
    3. Aggregate all resulting code snippets into a final solution.
    4. Return that final integrated solution.
    """
    
    # Step 1: Orchestrator analyses the problem and returns subtasks
    orchestrator_result = await call_orchestrator(problem)
    
    # Print for demonstration
    print("=== ORCHESTRATOR RESPONSE ===")
    print("Objective:", orchestrator_result.objective)
    for i, sub in enumerate(orchestrator_result.subtasks, start=1):
        print(f"Subtask {i} Name:", sub.name)
        print(f"Subtask {i} Description:", sub.description)
    
    # Step 2: Call workers in parallel
    tasks = []
    for subtask in orchestrator_result.subtasks:
        tasks.append(call_worker(subtask.name, subtask.description))
    
    worker_results = await asyncio.gather(*tasks)
    
    # Print intermediate worker code results
    print("\n=== WORKER CODE SNIPPETS ===")
    for i, code_snippet in enumerate(worker_results, start=1):
        print(f"--- Subtask {i} Code ---\n{code_snippet}\n")
    
    # Prepare code snippets for aggregation
    all_code = "\n".join(worker_results)
    
    # Step 3: Aggregate into a final solution
    final_solution = await call_aggregator(all_code)
    
    # Return the integrated code
    return final_solution

In [None]:
async def main():
    # Define a sample coding problem
    coding_problem = """
    Create a Python script that reads a CSV file, processes the data, 
    and outputs a summary in JSON format. 
    The solution should handle missing values gracefully and highlight data anomalies.
    """
    
    # Run the orchestrator-workers flow
    final_code = await orchestrator_workers_flow(coding_problem)
    
    print("=== FINAL AGGREGATED SOLUTION ===")
    print(final_code)

# Execute the main function
asyncio.run(main())