# 04 - Orchestrator-Worker: Dynamic Task Breakdown

## What is Orchestrator-Worker?

The Orchestrator-Worker pattern uses one LLM (the orchestrator) to analyse a complex task and break it into subtasks. It then delegates each subtask to worker LLMs, and finally combines the results.

## Why use it for privilege review?

Some documents are too complex for fixed pipelines:
- Long email chains with 20+ messages
- Documents with multiple attachments
- Mixed content with privileged and non-privileged sections
- Complex corporate structures with multiple parties

The orchestrator dynamically decides what analysis is needed based on the specific document.

## How it works
```
Complex Document
       ↓
  Orchestrator: "This needs 4 subtasks..."
       ↓
  ┌────┼────┬────┐
  ↓    ↓    ↓    ↓
Worker Worker Worker Worker
  1    2    3    4
  ↓    ↓    ↓    ↓
  └────┼────┴────┘
       ↓
  Orchestrator: "Combining results..."
       ↓
  Final Classification
```

## Australian Law Reference

- Evidence Act 1995 (Cth) ss 118-119
- Complex privilege scenarios may require analysis of multiple legal tests

## Step 1: Setup

Import libraries and create the OpenAI client.

**What this does:**
- `from openai import OpenAI` — loads the OpenAI library
- `from IPython.display import display, Markdown` — for formatted output
- `client = OpenAI()` — creates the connection to OpenAI
- `MODEL = "gpt-4.1-nano"` — the model used for both orchestrator and workers

In [None]:
from openai import OpenAI
from IPython.display import display, Markdown

client = OpenAI()
MODEL = "gpt-4.1-nano"

print(f"Client configured with model: {MODEL}")

## Step 2: Create a Complex Test Document

A long email chain with multiple parties, attachments, and mixed content.

**What this does:**
- Creates a complex email chain with 4 messages
- Multiple parties join and leave the thread
- Some messages contain legal advice, others are operational
- Attachments are referenced throughout
- This complexity requires dynamic analysis - a fixed pipeline won't work well

In [None]:
complex_document = {
    "id": "DOC001",
    "type": "email_chain",
    "content": """
========== EMAIL CHAIN ==========

--- Email 1 of 4 ---
From: sarah.chen@acmecorp.com.au
To: michael.wong@wongpartners.com.au
Date: 2024-03-10 09:15
Subject: BuildRight dispute - initial advice needed

Michael,

We've received a letter of demand from BuildRight Pty Ltd (attached).
They claim we breached the construction contract and are seeking $2.3M.

Can you please review and provide initial advice on our exposure?

Regards,
Sarah Chen
General Counsel, ACME Corporation

Attachments: BuildRight_Letter_of_Demand.pdf

--- Email 2 of 4 ---
From: michael.wong@wongpartners.com.au
To: sarah.chen@acmecorp.com.au
Date: 2024-03-11 14:30
Subject: RE: BuildRight dispute - initial advice needed

Sarah,

I've reviewed the letter and the contract. My advice is as follows:

1. The limitation clause (cl 14.3) caps liability at $500,000
2. BuildRight's claim appears to overstate damages
3. I recommend we respond with a without prejudice offer of $350,000

This advice is provided in confidence for the purpose of this dispute.

Please don't share this with anyone outside the legal team.

Michael Wong
Partner, Wong & Partners

Attachments: Draft_Response_Letter.docx

--- Email 3 of 4 ---
From: sarah.chen@acmecorp.com.au
To: michael.wong@wongpartners.com.au
CC: john.smith@acmecorp.com.au, jane.doe@acmecorp.com.au
Date: 2024-03-12 10:00
Subject: RE: BuildRight dispute - initial advice needed

Michael,

I've copied in John Smith (CFO) and Jane Doe (CEO) as they need to 
approve any settlement authority.

John and Jane - please see Michael's advice below and attached.
Can we discuss at tomorrow's executive meeting?

Sarah

--- Email 4 of 4 ---
From: john.smith@acmecorp.com.au
To: sarah.chen@acmecorp.com.au, michael.wong@wongpartners.com.au
CC: jane.doe@acmecorp.com.au, accounts@acmecorp.com.au
Date: 2024-03-12 11:45
Subject: RE: BuildRight dispute - initial advice needed

Sarah / Michael,

I've reviewed the financial impact. We have budget capacity for 
a $350k settlement this quarter.

I've copied in Accounts so they can prepare for potential payment.

Also, can someone send me the original BuildRight contract? 
I need it for the board report.

John Smith
CFO, ACME Corporation
"""
}

print(f"Complex document created: {complex_document['id']}")
print(f"Type: {complex_document['type']}")
print("Contains: 4 emails, multiple parties, attachments, mixed privilege status")

## Step 3: Create the Orchestrator

The orchestrator analyses the document and decides what subtasks are needed.

**What this does:**
- Examines the complex document structure
- Identifies what analysis is required (parties, timeline, privilege per email, waiver risks)
- Returns a list of specific subtasks for the workers
- This is dynamic - different documents generate different subtasks

In [None]:
def orchestrator_analyse(document):
    """Orchestrator: analyse document and generate subtasks"""
    
    messages = [
        {"role": "system", "content": """You are a legal document analysis orchestrator.
Your job is to examine complex documents and break them into specific analysis subtasks.
Each subtask should be focused and actionable."""},
        {"role": "user", "content": f"""
Analyse this complex document and identify what subtasks are needed for a thorough 
Australian legal professional privilege assessment.

Document:
{document['content']}

Consider:
- How many separate communications are there?
- Who are all the parties involved?
- Are there attachments that need separate analysis?
- Are there potential waiver issues?
- Is there mixed privileged/non-privileged content?

Return a numbered list of specific subtasks. Each subtask should be a focused 
analysis question that a worker can answer.

Format:
SUBTASK 1: [specific analysis task]
SUBTASK 2: [specific analysis task]
...
DOCUMENT_COMPLEXITY: [Low/Medium/High]
REASONING: [why these subtasks are needed]
"""}
    ]
    
    response = client.chat.completions.create(
        model=MODEL,
        messages=messages
    )
    
    return response.choices[0].message.content

# Run the orchestrator
print("Orchestrator analysing document...")
orchestrator_result = orchestrator_analyse(complex_document)
display(Markdown(f"### Orchestrator Analysis\n\n{orchestrator_result}"))

## Step 4: Create the Worker Function

Workers execute individual subtasks assigned by the orchestrator.

**What this does:**
- Takes a single subtask and the document
- Performs focused analysis on just that subtask
- Returns a specific finding
- Each worker operates independently on its assigned task

In [None]:
def worker_execute(document, subtask):
    """Worker: execute a single subtask"""
    
    messages = [
        {"role": "system", "content": """You are a legal analyst worker specialising in Australian legal professional privilege.
You will be given a specific analysis task to perform on a document.
Be thorough but focused on your assigned task only."""},
        {"role": "user", "content": f"""
Perform this specific analysis task on the document below.

TASK: {subtask}

DOCUMENT:
{document['content']}

Provide a focused analysis answering only this specific task.
Apply Evidence Act 1995 (Cth) ss 118-119 where relevant.

Format:
FINDING: [your key finding]
DETAILS: [supporting details]
PRIVILEGE_IMPLICATION: [how this affects privilege status]
"""}
    ]
    
    response = client.chat.completions.create(
        model=MODEL,
        messages=messages
    )
    
    return response.choices[0].message.content

print("Worker function created: worker_execute()")

## Step 5: Execute Workers on Subtasks

Parse the orchestrator's subtasks and run workers on each.

**What this does:**
- Extracts individual subtasks from the orchestrator's output
- Runs a worker on each subtask
- Collects all findings for the final synthesis
- For efficiency, we'll run the first 5 subtasks (full run would do all 10)

In [None]:
def parse_subtasks(orchestrator_output):
    """Extract subtasks from orchestrator output"""
    subtasks = []
    for line in orchestrator_output.split('\n'):
        if line.strip().startswith("SUBTASK"):
            # Extract the task description after the colon
            task = line.split(':', 1)[1].strip() if ':' in line else line
            subtasks.append(task)
    return subtasks

# Parse subtasks from orchestrator
subtasks = parse_subtasks(orchestrator_result)
print(f"Found {len(subtasks)} subtasks")

# Run workers on first 5 subtasks (for demo efficiency)
worker_results = []
subtasks_to_run = subtasks[:5]

print(f"\nRunning workers on {len(subtasks_to_run)} subtasks...\n")

for i, subtask in enumerate(subtasks_to_run, 1):
    print(f"Worker {i} executing: {subtask[:60]}...")
    result = worker_execute(complex_document, subtask)
    worker_results.append({
        "subtask_num": i,
        "subtask": subtask,
        "result": result
    })
    print(f"Worker {i} complete.\n")

# Display worker results
for wr in worker_results:
    display(Markdown(f"### Worker {wr['subtask_num']} Result\n\n**Task:** {wr['subtask']}\n\n{wr['result']}\n\n---"))

## Step 6: Orchestrator Synthesizes Results

The orchestrator combines all worker findings into a final determination.

**What this does:**
- Takes all worker findings as input
- Synthesizes the analysis into a coherent privilege assessment
- Identifies which parts are privileged, which are not
- Flags any waiver issues discovered by the workers
- Produces a final recommendation for human review

In [None]:
def orchestrator_synthesize(document, worker_results):
    """Orchestrator: synthesize worker findings into final determination"""
    
    # Format worker findings for the orchestrator
    findings_text = "\n\n".join([
        f"WORKER {wr['subtask_num']} - {wr['subtask']}\n{wr['result']}"
        for wr in worker_results
    ])
    
    messages = [
        {"role": "system", "content": """You are a senior Australian legal privilege expert.
Your job is to synthesize multiple analysis findings into a final privilege determination.
Apply Evidence Act 1995 (Cth) ss 118-119 and relevant case law."""},
        {"role": "user", "content": f"""
Synthesize these worker findings into a final privilege determination for this document.

DOCUMENT TYPE: {document['type']}

WORKER FINDINGS:
{findings_text}

Provide a comprehensive final determination that:
1. Summarises key findings from each worker
2. Identifies which specific parts are privileged vs not privileged
3. Addresses any waiver issues identified
4. Provides a final classification with reasoning

Format:
OVERALL_CLASSIFICATION: [PRIVILEGED/NOT_PRIVILEGED/PARTIAL_PRIVILEGE/UNCERTAIN]
CONFIDENCE_SCORE: [0-100]

PRIVILEGED_CONTENT:
[List specific emails/attachments that are privileged]

NON_PRIVILEGED_CONTENT:
[List specific emails/attachments that are NOT privileged]

WAIVER_ASSESSMENT:
[Analysis of any waiver issues]

LEGAL_BASIS: [Relevant statutes and cases]

REASONING: [3-4 sentence synthesis of the analysis]

RECOMMENDATION_FOR_REVIEW:
[Specific guidance for senior lawyer reviewing this document]
"""}
    ]
    
    response = client.chat.completions.create(
        model=MODEL,
        messages=messages
    )
    
    return response.choices[0].message.content

# Run the orchestrator synthesis
print("Orchestrator synthesizing worker findings...")
final_result = orchestrator_synthesize(complex_document, worker_results)
display(Markdown(f"### Final Orchestrator Determination\n\n{final_result}"))

## Step 7: Export to CSV for Senior Lawyer Review

Create a CSV output summarising the orchestrator-worker analysis.

**What this does:**
- Records the orchestrator's final determination
- Lists privileged and non-privileged content separately
- Highlights waiver issues for attention
- Includes blank columns for senior lawyer review and sign-off
- Complex documents like this particularly benefit from human review

In [None]:
import pandas as pd
from datetime import datetime

def parse_result(result_text, field):
    """Extract a field value from the LLM output"""
    for line in result_text.split('\n'):
        if line.startswith(field + ':'):
            return line.split(':', 1)[1].strip()
    return "Not found"

# Build the CSV row
csv_row = {
    "doc_id": complex_document['id'],
    "doc_type": complex_document['type'],
    "subtasks_generated": len(subtasks),
    "subtasks_executed": len(worker_results),
    "classification": parse_result(final_result, "OVERALL_CLASSIFICATION"),
    "confidence_score": parse_result(final_result, "CONFIDENCE_SCORE"),
    "waiver_identified": "Yes" if "waiver" in final_result.lower() else "No",
    "legal_basis": parse_result(final_result, "LEGAL_BASIS"),
    "reasoning": parse_result(final_result, "REASONING"),
    "recommendation": parse_result(final_result, "RECOMMENDATION_FOR_REVIEW"),
    # Blank columns for senior lawyer HITL review
    "reviewer_notes": "",
    "reviewer_decision": "",
    "reviewed_by": "",
    "review_date": ""
}

# Create DataFrame and export
df = pd.DataFrame([csv_row])
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
csv_filename = f"privilege_review_orchestrator_{timestamp}.csv"

# Display preview
display(Markdown("### CSV Preview for HITL Review"))
display(df[['doc_id', 'doc_type', 'subtasks_executed', 'classification', 'waiver_identified']])

# Save to file
df.to_csv(csv_filename, index=False)
display(Markdown(f"**Exported:** `{csv_filename}`"))

## Conclusion: Orchestrator-Worker for LPP Classification

### What We Built

A dynamic analysis system where an orchestrator breaks down complex documents and delegates to workers:
```
Complex Email Chain (4 emails, 2 attachments)
                    ↓
         Orchestrator Analysis
      "This needs 10 subtasks..."
                    ↓
    ┌───┬───┬───┬───┬───┐
    ↓   ↓   ↓   ↓   ↓   
   W1  W2  W3  W4  W5  (Workers)
    ↓   ↓   ↓   ↓   ↓
    └───┴───┴───┴───┘
                    ↓
      Orchestrator Synthesis
                    ↓
      PARTIAL_PRIVILEGE + Waiver Warning
```

### Why Orchestrator-Worker Works for Privilege

- **Dynamic:** Adapts to document complexity - simple docs get fewer subtasks
- **Thorough:** Each aspect analysed by a focused worker
- **Catches nuance:** Worker 5 identified the waiver issue from CC'ing non-lawyers
- **Auditable:** Each worker's finding is logged separately

### What We Discovered

| Worker | Finding |
|--------|---------|
| 1 | 4 emails, 2 attachments catalogued |
| 2 | Mix of legal and corporate parties |
| 3 | Emails 1&2 privileged, 3&4 factual |
| 4 | Draft response privileged, letter of demand not |
| 5 | **Potential waiver** - advice shared with CFO/CEO |

The waiver issue is exactly what this pattern is designed to find.

### Comparison to Other Patterns

| Aspect | Prompt Chaining | Routing | Parallelization | Orchestrator-Worker |
|--------|-----------------|---------|-----------------|---------------------|
| Structure | Fixed steps | Branch by type | Same task, multiple models | Dynamic breakdown |
| Best for | Simple docs | Mixed types | High-stakes decisions | Complex docs |
| Adaptability | None | Limited | None | High |
| Subtask count | Fixed | Fixed per type | 1 | Variable |

### Limitations

**Higher cost and latency**
- Multiple LLM calls (orchestrator + workers + synthesis)
- More expensive than single-pass approaches
- Slower due to sequential orchestrator → workers → synthesis

**Orchestrator quality matters**
- If the orchestrator misses a key subtask, workers won't analyse it
- Garbage in, garbage out

**Overkill for simple documents**
- A straightforward email doesn't need 10 subtasks
- Use routing to send simple docs to simpler classifiers

### Next Notebook

`05_evaluator_optimizer.ipynb` - Generate, critique, and improve classifications through self-reflection.