<a href="https://colab.research.google.com/drive/15KslE_orxpqRTdKvSkJprJJVqUDDErOH?usp=sharing" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"></a>

### Orchestrator-Worker Pattern

In [1]:
!pip install -qU google-generativeai

In [2]:
import google.generativeai as genai
import getpass
from datetime import datetime

Get free-tier Google's Gemini API Key here: https://aistudio.google.com/app/apikey

In [3]:
API_KEY = getpass.getpass("Enter your Google API key: ")

Enter your Google API key: ··········


In [4]:
genai.configure(api_key=API_KEY)

In [5]:
class Worker:
    """Specialized worker agent with specific tools"""
    def __init__(self, name, role, tools):
        self.name = name
        self.role = role
        self.tools = tools
        self.model = genai.GenerativeModel("gemini-2.0-flash")

    def can_handle(self, task):
        """Check if worker can handle this task"""
        task_lower = task.lower()
        return any(tool in task_lower for tool in self.tools.keys())

    def execute(self, task):
        """Execute task using available tools"""
        print(f"  👷 {self.name} working on: {task[:60]}...")

        # Find relevant tool
        task_lower = task.lower()
        tool_name = None
        for tool in self.tools.keys():
            if tool in task_lower:
                tool_name = tool
                break

        if not tool_name:
            return {"error": f"{self.name} has no tool for this task"}

        # Execute tool
        try:
            result = self.tools[tool_name](task)
            print(f"     ✅ Completed with {tool_name}\n")
            return {
                "worker": self.name,
                "task": task,
                "tool": tool_name,
                "result": result,
                "status": "success"
            }
        except Exception as e:
            print(f"     ❌ Error: {e}\n")
            return {
                "worker": self.name,
                "task": task,
                "error": str(e),
                "status": "failed"
            }


class Orchestrator:
    """Coordinates workers and synthesizes results"""
    def __init__(self):
        self.model = genai.GenerativeModel("gemini-2.0-flash-exp")
        self.workers = []

    def add_worker(self, worker):
        """Register a worker"""
        self.workers.append(worker)
        print(f"➕ Registered: {worker.name} ({worker.role})")

    def decompose_task(self, task):
        """Break down complex task into subtasks"""
        workers_desc = "\n".join([
            f"- {w.name}: {w.role} (tools: {', '.join(w.tools.keys())})"
            for w in self.workers
        ])

        prompt = f"""Break this task into subtasks for available workers.

        Task: {task}

        Available Workers:
        {workers_desc}

        Create 2-5 subtasks (one per line):"""

        response = self.model.generate_content(prompt).text
        subtasks = [line.strip() for line in response.split("\n") if line.strip()]
        return subtasks[:5]

    def assign_workers(self, subtasks):
        """Assign each subtask to appropriate worker"""
        assignments = []

        for subtask in subtasks:
            # Find capable worker
            worker = None
            for w in self.workers:
                if w.can_handle(subtask):
                    worker = w
                    break

            if worker:
                assignments.append({
                    "subtask": subtask,
                    "worker": worker
                })
            else:
                print(f"  ⚠️  No worker found for: {subtask}")

        return assignments

    def execute_parallel(self, assignments):
        """Execute assignments (simulated parallel)"""
        results = []

        for assignment in assignments:
            result = assignment["worker"].execute(assignment["subtask"])
            results.append(result)

        return results

    def synthesize(self, task, results):
        """Combine worker results into final answer"""
        successful_results = [r for r in results if r.get("status") == "success"]

        if not successful_results:
            return "No workers successfully completed their tasks."

        results_summary = "\n\n".join([
            f"Worker: {r['worker']}\nTask: {r['task']}\nResult: {r['result']}"
            for r in successful_results
        ])

        prompt = f"""Synthesize these worker results into a comprehensive answer.

        Original Task: {task}

        Worker Results:
        {results_summary}

        Final Answer:"""

        response = self.model.generate_content(prompt).text
        return response.strip()

    def execute(self, task):
        """Full orchestration pipeline"""
        print(f"\n{'='*60}")
        print(f"🎯 Task: {task}")
        print(f"{'='*60}\n")

        # Step 1: Decompose
        print("📋 Step 1: Decomposing task...")
        subtasks = self.decompose_task(task)
        print(f"   Created {len(subtasks)} subtasks:\n")
        for i, st in enumerate(subtasks, 1):
            print(f"   {i}. {st}")
        print()

        # Step 2: Assign
        print("👥 Step 2: Assigning workers...")
        assignments = self.assign_workers(subtasks)
        print(f"   Assigned {len(assignments)} tasks:\n")
        for a in assignments:
            print(f"   - {a['worker'].name}: {a['subtask'][:50]}...")
        print()

        # Step 3: Execute
        print("⚡ Step 3: Executing in parallel...\n")
        results = self.execute_parallel(assignments)

        # Step 4: Synthesize
        print("🔗 Step 4: Synthesizing results...\n")
        final_answer = self.synthesize(task, results)

        print(f"{'='*60}")
        print(f"📊 FINAL ANSWER")
        print(f"{'='*60}")
        print(final_answer)
        print()

        return final_answer

In [6]:
# Define worker tools
def search_web(query):
    """Simulated web search"""
    results = {
        "climate": "Climate change: Global temperatures rising 1.1°C since pre-industrial times.",
        "ai": "AI developments: Large language models showing reasoning capabilities.",
        "economy": "Economic outlook: Inflation stabilizing, GDP growth at 2.5%.",
        "tech": "Tech trends: AI, quantum computing, and renewable energy leading innovation.",
    }
    for key, val in results.items():
        if key in query.lower():
            return val
    return f"Search results for: {query}"

def analyze_data(task):
    """Simulated data analysis"""
    return "Analysis complete: Found 3 key trends, 85% confidence level, recommend action A."

def fetch_database(task):
    """Simulated database query"""
    return "Database query returned 127 records. Top entry: ID=1001, Value=250, Status=Active."

def api_call(task):
    """Simulated API request"""
    return "API Response: {status: 200, data: {users: 1500, active: 1200, growth: 15%}}"

def generate_report(task):
    """Simulated report generation"""
    return "Report generated: 5 pages, includes executive summary, 3 charts, recommendations."

def send_notification(task):
    """Simulated notification"""
    return "Notification sent to 5 stakeholders via email and Slack."

def calculate(task):
    """Simulated calculations"""
    return "Calculation: Total=1,250, Average=208.33, Growth Rate=12.5%"

In [None]:
# Example 1: Research Task
print("="*60)
print("EXAMPLE 1: Multi-Domain Research")
print("="*60)

orchestrator1 = Orchestrator()

# Add specialized workers
research_worker = Worker(
    "ResearchBot",
    "Web research specialist",
    {"search": search_web}
)

data_worker = Worker(
    "DataAnalyst",
    "Data analysis specialist",
    {"analyze": analyze_data, "calculate": calculate}
)

report_worker = Worker(
    "ReportWriter",
    "Report generation specialist",
    {"report": generate_report}
)

orchestrator1.add_worker(research_worker)
orchestrator1.add_worker(data_worker)
orchestrator1.add_worker(report_worker)
print()

orchestrator1.execute(
    "Research climate change trends, analyze the data, and generate a report"
)


# Example 2: Enterprise Workflow
print("\n" + "="*60)
print("EXAMPLE 2: Enterprise Data Pipeline")
print("="*60)

orchestrator2 = Orchestrator()

db_worker = Worker(
    "DatabaseAgent",
    "Database operations",
    {"database": fetch_database, "fetch": fetch_database}
)

api_worker = Worker(
    "APIAgent",
    "External API integration",
    {"api": api_call, "call": api_call}
)

analytics_worker = Worker(
    "AnalyticsAgent",
    "Data analytics",
    {"analyze": analyze_data, "calculate": calculate}
)

notification_worker = Worker(
    "NotificationAgent",
    "Communication services",
    {"notify": send_notification, "send": send_notification}
)

orchestrator2.add_worker(db_worker)
orchestrator2.add_worker(api_worker)
orchestrator2.add_worker(analytics_worker)
orchestrator2.add_worker(notification_worker)
print()

orchestrator2.execute(
    "Fetch user data from database, call the analytics API, analyze growth trends, and notify the team"
)


# Example 3: Customer Support Triage
print("\n" + "="*60)
print("EXAMPLE 3: Customer Support Automation")
print("="*60)

orchestrator3 = Orchestrator()

# Specialized support workers
tech_support = Worker(
    "TechSupport",
    "Technical troubleshooting",
    {"search": search_web, "database": fetch_database}
)

billing_support = Worker(
    "BillingAgent",
    "Billing and payments",
    {"database": fetch_database, "calculate": calculate}
)

account_support = Worker(
    "AccountAgent",
    "Account management",
    {"api": api_call, "notify": send_notification}
)

orchestrator3.add_worker(tech_support)
orchestrator3.add_worker(billing_support)
orchestrator3.add_worker(account_support)
print()

orchestrator3.execute(
    "Customer needs help with a billing issue, wants to check their account status, and needs a technical search for VPN setup"
)


# Example 4: Complex Multi-step Project
print("\n" + "="*60)
print("EXAMPLE 4: Project Execution with Multiple Specialists")
print("="*60)

orchestrator4 = Orchestrator()

# Add all types of workers
orchestrator4.add_worker(research_worker)
orchestrator4.add_worker(data_worker)
orchestrator4.add_worker(db_worker)
orchestrator4.add_worker(api_worker)
orchestrator4.add_worker(report_worker)
orchestrator4.add_worker(notification_worker)
print()

orchestrator4.execute(
    "Research AI trends, fetch database metrics, call growth API, analyze all data, generate comprehensive report, and notify stakeholders"
)

print("✅ Orchestrator-Worker Pattern Complete!")

EXAMPLE 1: Multi-Domain Research
➕ Registered: ResearchBot (Web research specialist)
➕ Registered: DataAnalyst (Data analysis specialist)
➕ Registered: ReportWriter (Report generation specialist)


🎯 Task: Research climate change trends, analyze the data, and generate a report

📋 Step 1: Decomposing task...
   Created 4 subtasks:

   1. 1. ResearchBot: Research and gather climate change data from reputable sources (e.g., NASA, NOAA, IPCC).
   2. 2. DataAnalyst: Analyze the collected data to identify key trends, anomalies, and correlations.
   3. 3. DataAnalyst: Calculate statistical significance of observed trends and create visualizations.
   4. 4. ReportWriter: Generate a comprehensive report summarizing the research findings, data analysis, and key conclusions.

👥 Step 2: Assigning workers...
   Assigned 4 tasks:

   - ResearchBot: 1. ResearchBot: Research and gather climate change...
   - DataAnalyst: 2. DataAnalyst: Analyze the collected data to iden...
   - DataAnalyst: 3. DataAn