# LangGraph Orchestrator-Worker Framework

## Architecture Overview

This notebook implements a **LangGraph-driven orchestrator** that coordinates multiple **worker agents deployed as independent microservices**. The architecture follows an orchestrator-worker pattern where:

```
┌─────────────────────────────────────────────────────────────────────┐
│                        ORCHESTRATOR (LangGraph)                     │
│                                                                     │
│  ┌──────────┐   ┌───────────┐   ┌───────────┐   ┌──────────────┐  │
│  │ Planning  │──▶│ Discovery │──▶│ Execution │──▶│  Response    │  │
│  │  Agent    │   │  Service  │   │  Router   │   │  Synthesis   │  │
│  └──────────┘   └───────────┘   └───────────┘   └──────────────┘  │
│                       │               │                             │
│                       ▼               ▼                             │
│              ┌─────────────┐  ┌─────────────┐                      │
│              │  Agent      │  │  Streaming   │                      │
│              │  Registry   │  │  Multiplexer │                      │
│              └─────────────┘  └─────────────┘                      │
└───────────────────────┬───────────────┬─────────────────────────────┘
                        │               │
          ┌─────────────┼───────────────┼─────────────┐
          ▼             ▼               ▼             ▼
   ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐
   │  Worker A  │ │  Worker B  │ │  Worker C  │ │  Worker N  │
   │ (Dynamic   │ │ (Static    │ │ (Dynamic   │ │ (Static    │
   │  Streaming)│ │  Response) │ │  Streaming)│ │  Response) │
   │  :8001     │ │  :8002     │ │  :8003     │ │  :800N     │
   └────────────┘ └────────────┘ └────────────┘ └────────────┘
```

### Core Components

1. **Planning Agent** — Decomposes user queries into executable sub-tasks
2. **Discovery Service** — Microservice that provides agent metadata (endpoints, payloads, capabilities, streaming mode)
3. **Execution Router** — Dispatches sub-tasks to appropriate worker agents via HTTP
4. **Response Synthesis** — Aggregates results from multiple workers into a coherent response
5. **Streaming Multiplexer** — Handles both dynamic (LLM-streamed) and static (hardcoded) responses

### Streaming Modes

| Mode | Description | Use Case |
|------|-------------|----------|
| **Dynamic (event-level)** | SSE stream of token-by-token LLM output | Real-time generative agents |
| **Dynamic (chat-level)** | SSE stream of complete chat messages | Conversational agents |
| **Static** | Pre-defined response returned immediately | Deterministic/rule-based agents |

## 1. Dependencies & Configuration

In [None]:
import os
import json
import asyncio
import logging
from enum import Enum
from typing import (
    Any,
    AsyncGenerator,
    Dict,
    List,
    Literal,
    Optional,
    Sequence,
    TypedDict,
    Union,
    Annotated,
)
from dataclasses import dataclass, field
from datetime import datetime

import httpx
from pydantic import BaseModel, Field
from langchain_core.messages import AIMessage, BaseMessage, HumanMessage, SystemMessage
from langchain_openai import ChatOpenAI
from langgraph.graph import END, StateGraph
from langgraph.graph.message import add_messages

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s")
logger = logging.getLogger("orchestrator")

In [None]:
# -------------------------------------------------------------------
# Configuration — swap these for your deployment
# -------------------------------------------------------------------

OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "sk-...")
DISCOVERY_SERVICE_URL = os.getenv("DISCOVERY_SERVICE_URL", "http://localhost:9000")

ORCHESTRATOR_LLM_MODEL = "gpt-4o"
ORCHESTRATOR_LLM_TEMPERATURE = 0.2

WORKER_TIMEOUT_SECONDS = 120
WORKER_MAX_RETRIES = 2

llm = ChatOpenAI(
    model=ORCHESTRATOR_LLM_MODEL,
    temperature=ORCHESTRATOR_LLM_TEMPERATURE,
    api_key=OPENAI_API_KEY,
    streaming=True,
)

## 2. Data Models

Pydantic models define the contract between the orchestrator, discovery service, and worker agents. Every worker is described by an `AgentDescriptor` that the discovery service returns.

In [None]:
class StreamingMode(str, Enum):
    """How a worker agent delivers its output back to the orchestrator."""
    DYNAMIC_EVENTS = "dynamic_events"    # SSE token-level stream
    DYNAMIC_MESSAGES = "dynamic_messages"  # SSE chat-message-level stream
    STATIC = "static"                     # Pre-canned / hardcoded response


class AgentCapability(str, Enum):
    """Broad capability tags so the planner can match tasks to agents."""
    TEXT_GENERATION = "text_generation"
    CODE_GENERATION = "code_generation"
    DATA_ANALYSIS = "data_analysis"
    SUMMARIZATION = "summarization"
    TRANSLATION = "translation"
    SEARCH = "search"
    CUSTOM = "custom"


class PayloadSchema(BaseModel):
    """Describes the JSON payload a worker agent expects."""
    content_type: str = "application/json"
    required_fields: Dict[str, str] = Field(
        default_factory=dict,
        description="Map of field name to expected type, e.g. {'query': 'string'}",
    )
    optional_fields: Dict[str, str] = Field(default_factory=dict)
    example: Dict[str, Any] = Field(default_factory=dict)


class AgentDescriptor(BaseModel):
    """
    Full metadata for a single worker agent.
    The discovery service returns a list of these.
    """
    agent_id: str = Field(..., description="Unique identifier")
    name: str
    description: str
    endpoint: str = Field(..., description="Base URL, e.g. http://worker-a:8001")
    health_endpoint: str = Field(default="/health")
    invoke_endpoint: str = Field(default="/invoke")
    stream_endpoint: str = Field(default="/stream")
    capabilities: List[AgentCapability] = Field(default_factory=list)
    streaming_mode: StreamingMode = StreamingMode.STATIC
    payload_schema: PayloadSchema = Field(default_factory=PayloadSchema)
    static_response: Optional[str] = Field(
        None,
        description="If streaming_mode is STATIC, this is the hardcoded response.",
    )
    priority: int = Field(default=0, description="Higher = preferred when multiple agents match")
    max_tokens: Optional[int] = None
    timeout_seconds: int = 60
    metadata: Dict[str, Any] = Field(default_factory=dict)


class SubTask(BaseModel):
    """A single unit of work the planner produces."""
    task_id: str
    description: str
    required_capability: AgentCapability
    input_data: Dict[str, Any] = Field(default_factory=dict)
    depends_on: List[str] = Field(
        default_factory=list,
        description="task_ids this sub-task depends on (for DAG execution)",
    )


class ExecutionPlan(BaseModel):
    """Output of the planning agent."""
    plan_id: str
    original_query: str
    sub_tasks: List[SubTask]
    reasoning: str = ""


class WorkerResult(BaseModel):
    """Result returned by a single worker invocation."""
    agent_id: str
    task_id: str
    success: bool
    response: str = ""
    streaming_mode: StreamingMode = StreamingMode.STATIC
    events: List[Dict[str, Any]] = Field(default_factory=list)
    error: Optional[str] = None
    latency_ms: float = 0.0


class StreamEvent(BaseModel):
    """
    A single event yielded during streaming.
    Carries enough metadata so the consumer knows which agent it came from.
    """
    agent_id: str
    task_id: str
    event_type: Literal["token", "message", "status", "error", "done"]
    data: str
    timestamp: str = Field(default_factory=lambda: datetime.utcnow().isoformat())


print("Data models loaded")

## 3. Discovery Service Client

The Discovery Service is itself a microservice. The orchestrator calls it at startup (and optionally on a refresh interval) to learn which worker agents are available, what payloads they expect, and whether they stream dynamically or return static responses.

> **Recommendation:** In production, add a caching layer (TTL-based) so you don't call discovery on every request. Use a background refresh task instead.

In [None]:
class DiscoveryServiceClient:
    """
    Client for the Discovery microservice.

    Responsibilities:
      - Fetch the full agent registry
      - Filter agents by capability
      - Health-check individual agents
      - Cache results to reduce network chatter
    """

    def __init__(self, base_url: str, cache_ttl_seconds: int = 300):
        self._base_url = base_url.rstrip("/")
        self._cache_ttl = cache_ttl_seconds
        self._cache: Optional[List[AgentDescriptor]] = None
        self._cache_ts: Optional[datetime] = None
        self._client = httpx.AsyncClient(timeout=30)

    def _cache_is_valid(self) -> bool:
        if self._cache is None or self._cache_ts is None:
            return False
        elapsed = (datetime.utcnow() - self._cache_ts).total_seconds()
        return elapsed < self._cache_ttl

    async def discover_agents(self, force_refresh: bool = False) -> List[AgentDescriptor]:
        """Fetch all registered agents from the discovery service."""
        if self._cache_is_valid() and not force_refresh:
            logger.info("Returning %d agents from cache", len(self._cache))
            return self._cache

        url = f"{self._base_url}/agents"
        logger.info("Fetching agent registry from %s", url)
        try:
            resp = await self._client.get(url)
            resp.raise_for_status()
            raw = resp.json()
            agents = [AgentDescriptor(**a) for a in raw.get("agents", raw)]
            self._cache = agents
            self._cache_ts = datetime.utcnow()
            logger.info("Discovered %d agents", len(agents))
            return agents
        except httpx.HTTPError as exc:
            logger.error("Discovery service error: %s", exc)
            if self._cache is not None:
                logger.warning("Falling back to stale cache (%d agents)", len(self._cache))
                return self._cache
            raise

    async def get_agents_by_capability(
        self, capability: AgentCapability, force_refresh: bool = False
    ) -> List[AgentDescriptor]:
        """Return agents that advertise a specific capability, sorted by priority."""
        agents = await self.discover_agents(force_refresh=force_refresh)
        matched = [a for a in agents if capability in a.capabilities]
        return sorted(matched, key=lambda a: a.priority, reverse=True)

    async def health_check(self, agent: AgentDescriptor) -> bool:
        """Ping a single agent's health endpoint."""
        url = f"{agent.endpoint.rstrip('/')}{agent.health_endpoint}"
        try:
            resp = await self._client.get(url, timeout=5)
            return resp.status_code == 200
        except httpx.HTTPError:
            return False

    async def close(self):
        await self._client.aclose()


discovery_client = DiscoveryServiceClient(DISCOVERY_SERVICE_URL)
print("Discovery service client ready")

## 4. Streaming Infrastructure

The streaming multiplexer handles the two fundamentally different output modes:

- **Dynamic streaming** — The worker returns an SSE stream. We consume it token-by-token (event-level) or message-by-message (chat-level) and re-yield `StreamEvent` objects so the orchestrator can forward them to the caller in real time.
- **Static responses** — The worker's response is hardcoded in the agent descriptor (or returned as a plain JSON body). We wrap it into a single `StreamEvent` for uniform handling.

> **Recommendation:** Use `httpx` with `aiter_lines()` for SSE parsing rather than pulling in a dedicated SSE library — it keeps dependencies lean and gives you full control over reconnection logic.

In [None]:
class StreamingMultiplexer:
    """
    Unified streaming layer that abstracts over dynamic SSE streams and static
    responses, exposing a single async-generator interface to the orchestrator.
    """

    def __init__(self):
        self._client = httpx.AsyncClient(timeout=WORKER_TIMEOUT_SECONDS)

    # ------------------------------------------------------------------
    # Dynamic event-level streaming  (token by token from LLM workers)
    # ------------------------------------------------------------------

    async def stream_events(
        self, agent: AgentDescriptor, task_id: str, payload: Dict[str, Any]
    ) -> AsyncGenerator[StreamEvent, None]:
        """
        Connect to a worker's SSE /stream endpoint and yield StreamEvent
        objects as tokens arrive.

        Expected SSE format from the worker:
            data: {"token": "Hello", "type": "token"}
            data: {"token": " world", "type": "token"}
            data: {"type": "done"}
        """
        url = f"{agent.endpoint.rstrip('/')}{agent.stream_endpoint}"
        logger.info("[stream_events] POST %s  task=%s", url, task_id)

        yield StreamEvent(
            agent_id=agent.agent_id,
            task_id=task_id,
            event_type="status",
            data=f"Connecting to {agent.name}...",
        )

        async with self._client.stream("POST", url, json=payload) as resp:
            resp.raise_for_status()
            async for line in resp.aiter_lines():
                if not line.startswith("data:"):
                    continue
                raw = line[len("data:"):].strip()
                if not raw:
                    continue
                try:
                    chunk = json.loads(raw)
                except json.JSONDecodeError:
                    yield StreamEvent(
                        agent_id=agent.agent_id,
                        task_id=task_id,
                        event_type="token",
                        data=raw,
                    )
                    continue

                event_type = chunk.get("type", "token")
                if event_type == "done":
                    yield StreamEvent(
                        agent_id=agent.agent_id,
                        task_id=task_id,
                        event_type="done",
                        data="",
                    )
                    return

                yield StreamEvent(
                    agent_id=agent.agent_id,
                    task_id=task_id,
                    event_type=event_type,
                    data=chunk.get("token", chunk.get("content", "")),
                )

    # ------------------------------------------------------------------
    # Dynamic message-level streaming  (complete messages from LLM workers)
    # ------------------------------------------------------------------

    async def stream_messages(
        self, agent: AgentDescriptor, task_id: str, payload: Dict[str, Any]
    ) -> AsyncGenerator[StreamEvent, None]:
        """
        Like stream_events but each SSE frame is a complete chat message.

        Expected SSE format:
            data: {"role": "assistant", "content": "Full sentence one."}
            data: {"role": "assistant", "content": "Full sentence two."}
            data: {"type": "done"}
        """
        url = f"{agent.endpoint.rstrip('/')}{agent.stream_endpoint}"
        logger.info("[stream_messages] POST %s  task=%s", url, task_id)

        yield StreamEvent(
            agent_id=agent.agent_id,
            task_id=task_id,
            event_type="status",
            data=f"Streaming messages from {agent.name}...",
        )

        async with self._client.stream("POST", url, json=payload) as resp:
            resp.raise_for_status()
            async for line in resp.aiter_lines():
                if not line.startswith("data:"):
                    continue
                raw = line[len("data:"):].strip()
                if not raw:
                    continue
                try:
                    chunk = json.loads(raw)
                except json.JSONDecodeError:
                    continue

                if chunk.get("type") == "done":
                    yield StreamEvent(
                        agent_id=agent.agent_id,
                        task_id=task_id,
                        event_type="done",
                        data="",
                    )
                    return

                yield StreamEvent(
                    agent_id=agent.agent_id,
                    task_id=task_id,
                    event_type="message",
                    data=chunk.get("content", json.dumps(chunk)),
                )

    # ------------------------------------------------------------------
    # Static (hardcoded) responses
    # ------------------------------------------------------------------

    async def emit_static(
        self, agent: AgentDescriptor, task_id: str, payload: Dict[str, Any]
    ) -> AsyncGenerator[StreamEvent, None]:
        """
        For agents with StreamingMode.STATIC: either return the hardcoded
        static_response from the descriptor, or POST to /invoke and wrap
        the JSON body as a single message event.
        """
        if agent.static_response:
            logger.info("[static] Using hardcoded response for %s", agent.agent_id)
            yield StreamEvent(
                agent_id=agent.agent_id,
                task_id=task_id,
                event_type="message",
                data=agent.static_response,
            )
        else:
            url = f"{agent.endpoint.rstrip('/')}{agent.invoke_endpoint}"
            logger.info("[static] POST %s  task=%s", url, task_id)
            resp = await self._client.post(url, json=payload)
            resp.raise_for_status()
            body = resp.json()
            yield StreamEvent(
                agent_id=agent.agent_id,
                task_id=task_id,
                event_type="message",
                data=body.get("response", json.dumps(body)),
            )

        yield StreamEvent(
            agent_id=agent.agent_id,
            task_id=task_id,
            event_type="done",
            data="",
        )

    # ------------------------------------------------------------------
    # Unified dispatcher — picks the right method based on streaming mode
    # ------------------------------------------------------------------

    async def stream(
        self, agent: AgentDescriptor, task_id: str, payload: Dict[str, Any]
    ) -> AsyncGenerator[StreamEvent, None]:
        """
        Single entry-point: route to the correct streaming strategy based
        on the agent's declared streaming_mode.
        """
        if agent.streaming_mode == StreamingMode.DYNAMIC_EVENTS:
            async for event in self.stream_events(agent, task_id, payload):
                yield event
        elif agent.streaming_mode == StreamingMode.DYNAMIC_MESSAGES:
            async for event in self.stream_messages(agent, task_id, payload):
                yield event
        else:
            async for event in self.emit_static(agent, task_id, payload):
                yield event

    async def close(self):
        await self._client.aclose()


streaming_mux = StreamingMultiplexer()
print("Streaming multiplexer ready")

## 5. LangGraph State Definition

The orchestrator's shared state flows through every node in the graph. We use `TypedDict` with `Annotated` fields so LangGraph can merge message lists automatically across nodes.

> **Recommendation:** Keep the state flat rather than deeply nested. LangGraph checkpoints serialize the full state — flat structures are cheaper to persist and easier to debug.

In [None]:
class OrchestratorState(TypedDict):
    """
    Shared state that travels through the LangGraph orchestrator.
    Every node reads from and writes to this dict.
    """

    messages: Annotated[Sequence[BaseMessage], add_messages]
    user_query: str
    execution_plan: Optional[ExecutionPlan]
    available_agents: List[AgentDescriptor]
    task_agent_map: Dict[str, AgentDescriptor]
    worker_results: List[WorkerResult]
    stream_buffer: List[StreamEvent]
    final_response: str
    current_phase: str
    error: Optional[str]


def make_initial_state(query: str) -> OrchestratorState:
    """Factory for a clean initial state."""
    return OrchestratorState(
        messages=[HumanMessage(content=query)],
        user_query=query,
        execution_plan=None,
        available_agents=[],
        task_agent_map={},
        worker_results=[],
        stream_buffer=[],
        final_response="",
        current_phase="planning",
        error=None,
    )


print("State schema defined")

## 6. Orchestrator Nodes

Each function below is a **LangGraph node**. The graph wires them in sequence:

```
planning -> discovery -> execution -> response_synthesis
```

Conditional edges handle error recovery and re-planning.

### 6a. Planning Agent Node

The planning agent uses the orchestrator's own LLM to decompose the user query into discrete sub-tasks, each tagged with a required capability. This allows the discovery node to match tasks to the best available worker.

> **Recommendation:** Use structured output (function calling / tool use) to get the LLM to return a typed `ExecutionPlan` rather than parsing free-form text. This eliminates brittle regex parsing.

In [None]:
PLANNING_SYSTEM_PROMPT = """\
You are the Planning Agent inside an orchestration framework.

Your job:
1. Analyse the user's query.
2. Decompose it into 1-N independent sub-tasks.
3. For each sub-task, assign one of these capabilities:
   text_generation, code_generation, data_analysis,
   summarization, translation, search, custom.
4. Identify dependencies between sub-tasks (if any).

Return ONLY valid JSON matching this schema (no markdown fences):
{
  "plan_id": "<uuid-like string>",
  "original_query": "<the user query>",
  "reasoning": "<brief explanation of the decomposition>",
  "sub_tasks": [
    {
      "task_id": "t1",
      "description": "...",
      "required_capability": "text_generation",
      "input_data": {"query": "..."},
      "depends_on": []
    }
  ]
}
"""


async def planning_node(state: OrchestratorState) -> dict:
    """
    LangGraph node: invoke the LLM to produce an ExecutionPlan.
    """
    logger.info("=== PLANNING NODE ===")
    user_query = state["user_query"]

    messages = [
        SystemMessage(content=PLANNING_SYSTEM_PROMPT),
        HumanMessage(content=f"User query:\n{user_query}"),
    ]

    resp = await llm.ainvoke(messages)
    raw_text = resp.content.strip()

    # Strip markdown code fences if the LLM wraps its JSON
    if raw_text.startswith("```"):
        raw_text = "\n".join(raw_text.split("\n")[1:])
    if raw_text.endswith("```"):
        raw_text = "\n".join(raw_text.split("\n")[:-1])

    try:
        plan_data = json.loads(raw_text)
        plan = ExecutionPlan(**plan_data)
    except (json.JSONDecodeError, Exception) as exc:
        logger.error("Planning failed to parse: %s", exc)
        return {
            "error": f"Planning parse error: {exc}",
            "current_phase": "error",
            "messages": [AIMessage(content=f"Planning failed: {exc}")],
        }

    logger.info("Plan: %d sub-tasks, reasoning=%s", len(plan.sub_tasks), plan.reasoning[:80])

    return {
        "execution_plan": plan,
        "current_phase": "discovery",
        "messages": [
            AIMessage(content=f"Plan created with {len(plan.sub_tasks)} sub-task(s): {plan.reasoning}")
        ],
    }


print("Planning node defined")

### 6b. Discovery Node

This node calls the Discovery Service microservice to fetch the live agent registry, then maps each sub-task from the plan to the best-matching agent based on capability and priority.

> **Recommendation:** Implement a fallback strategy — if the primary agent is unhealthy, automatically pick the next-highest-priority agent with the same capability.

In [None]:
async def discovery_node(state: OrchestratorState) -> dict:
    """
    LangGraph node: call the Discovery Service to resolve agents for every
    sub-task in the execution plan.
    """
    logger.info("=== DISCOVERY NODE ===")
    plan: ExecutionPlan = state["execution_plan"]

    if plan is None:
        return {
            "error": "No execution plan available",
            "current_phase": "error",
            "messages": [AIMessage(content="Discovery skipped — no plan.")],
        }

    agents = await discovery_client.discover_agents()
    task_agent_map: Dict[str, AgentDescriptor] = {}
    unmatched_tasks: List[str] = []

    for task in plan.sub_tasks:
        candidates = [
            a for a in agents if task.required_capability in a.capabilities
        ]
        candidates.sort(key=lambda a: a.priority, reverse=True)

        if not candidates:
            logger.warning("No agent for capability=%s (task=%s)", task.required_capability, task.task_id)
            unmatched_tasks.append(task.task_id)
            continue

        # Health-check the top candidate; fall back to next if unhealthy
        assigned = False
        for candidate in candidates:
            healthy = await discovery_client.health_check(candidate)
            if healthy:
                task_agent_map[task.task_id] = candidate
                logger.info("Task %s -> %s (%s)", task.task_id, candidate.name, candidate.streaming_mode)
                assigned = True
                break
            else:
                logger.warning("Agent %s unhealthy, trying next", candidate.agent_id)

        if not assigned:
            unmatched_tasks.append(task.task_id)

    if unmatched_tasks:
        logger.warning("Unmatched tasks: %s", unmatched_tasks)

    return {
        "available_agents": agents,
        "task_agent_map": task_agent_map,
        "current_phase": "execution",
        "messages": [
            AIMessage(
                content=(
                    f"Discovery complete: {len(task_agent_map)} tasks matched, "
                    f"{len(unmatched_tasks)} unmatched."
                )
            )
        ],
    }


print("Discovery node defined")

### 6c. Execution Node

The execution node dispatches sub-tasks to worker agents in dependency order. For independent tasks it fans out concurrently using `asyncio.gather`. The streaming multiplexer handles the actual HTTP/SSE communication and normalizes all output into `StreamEvent` objects.

> **Recommendation:** For production workloads, replace the simple dependency-order loop with a proper DAG executor (topological sort + parallel dispatch per level). This notebook uses a simplified two-pass approach for clarity.

In [None]:
async def _execute_single_task(
    task: SubTask,
    agent: AgentDescriptor,
    prior_results: Dict[str, WorkerResult],
) -> WorkerResult:
    """
    Execute one sub-task against its assigned worker agent.
    Collects all stream events and assembles them into a WorkerResult.
    """
    import time

    payload = {
        **task.input_data,
        "task_id": task.task_id,
        "description": task.description,
    }

    # Inject outputs from upstream tasks this task depends on
    if task.depends_on:
        payload["context"] = {
            dep_id: prior_results[dep_id].response
            for dep_id in task.depends_on
            if dep_id in prior_results
        }

    t0 = time.perf_counter()
    collected_events: List[Dict[str, Any]] = []
    full_response_parts: List[str] = []

    try:
        async for event in streaming_mux.stream(agent, task.task_id, payload):
            collected_events.append(event.model_dump())
            if event.event_type in ("token", "message"):
                full_response_parts.append(event.data)

        elapsed = (time.perf_counter() - t0) * 1000
        return WorkerResult(
            agent_id=agent.agent_id,
            task_id=task.task_id,
            success=True,
            response="".join(full_response_parts),
            streaming_mode=agent.streaming_mode,
            events=collected_events,
            latency_ms=round(elapsed, 1),
        )

    except Exception as exc:
        elapsed = (time.perf_counter() - t0) * 1000
        logger.error("Task %s failed: %s", task.task_id, exc)
        return WorkerResult(
            agent_id=agent.agent_id,
            task_id=task.task_id,
            success=False,
            error=str(exc),
            latency_ms=round(elapsed, 1),
        )


async def execution_node(state: OrchestratorState) -> dict:
    """
    LangGraph node: execute all sub-tasks via worker agents.

    Execution strategy:
      Pass 1 — tasks with no dependencies (fan-out with asyncio.gather)
      Pass 2 — tasks that depend on Pass-1 results (sequential for simplicity)
    """
    logger.info("=== EXECUTION NODE ===")
    plan: ExecutionPlan = state["execution_plan"]
    task_agent_map: Dict[str, AgentDescriptor] = state["task_agent_map"]

    if not plan or not task_agent_map:
        return {
            "error": "Nothing to execute",
            "current_phase": "error",
            "messages": [AIMessage(content="Execution skipped — no plan or agents.")],
        }

    results_by_id: Dict[str, WorkerResult] = {}
    all_results: List[WorkerResult] = []
    all_events: List[StreamEvent] = []

    independent = [t for t in plan.sub_tasks if not t.depends_on and t.task_id in task_agent_map]
    dependent = [t for t in plan.sub_tasks if t.depends_on and t.task_id in task_agent_map]

    # --- Pass 1: fan-out independent tasks concurrently ---
    if independent:
        logger.info("Pass 1: executing %d independent tasks concurrently", len(independent))
        coros = [
            _execute_single_task(task, task_agent_map[task.task_id], results_by_id)
            for task in independent
        ]
        pass1_results = await asyncio.gather(*coros)
        for r in pass1_results:
            results_by_id[r.task_id] = r
            all_results.append(r)

    # --- Pass 2: dependent tasks sequentially ---
    for task in dependent:
        logger.info("Pass 2: executing dependent task %s", task.task_id)
        r = await _execute_single_task(task, task_agent_map[task.task_id], results_by_id)
        results_by_id[r.task_id] = r
        all_results.append(r)

    for r in all_results:
        for evt_dict in r.events:
            all_events.append(StreamEvent(**evt_dict))

    successes = sum(1 for r in all_results if r.success)
    failures = len(all_results) - successes
    logger.info("Execution done: %d succeeded, %d failed", successes, failures)

    return {
        "worker_results": all_results,
        "stream_buffer": [e.model_dump() for e in all_events],
        "current_phase": "synthesis",
        "messages": [
            AIMessage(
                content=f"Execution complete: {successes} succeeded, {failures} failed."
            )
        ],
    }


print("Execution node defined")

### 6d. Response Synthesis Node

After all workers have returned their results, the synthesis node uses the orchestrator's LLM to weave the individual outputs into a single, coherent response.

> **Recommendation:** For latency-sensitive applications, consider streaming the synthesis LLM call itself so the user sees tokens arriving while synthesis is in progress — a "stream of streams" pattern.

In [None]:
SYNTHESIS_SYSTEM_PROMPT = """\
You are the Response Synthesis Agent.

You receive the outputs of multiple worker agents that each handled a sub-task
of the user's original query. Your job:
1. Merge the outputs into a single coherent response.
2. Resolve any contradictions by preferring higher-confidence answers.
3. Maintain the user's original intent and tone.
4. If any sub-task failed, acknowledge it gracefully and provide
   the best partial answer you can.

Do NOT mention internal task IDs, agent names, or infrastructure details.
"""


async def synthesis_node(state: OrchestratorState) -> dict:
    """
    LangGraph node: synthesize all worker results into one final answer.
    """
    logger.info("=== SYNTHESIS NODE ===")
    results: List[WorkerResult] = state["worker_results"]
    user_query = state["user_query"]

    if not results:
        return {
            "final_response": "No results to synthesize.",
            "current_phase": "done",
            "messages": [AIMessage(content="No worker results available.")],
        }

    result_sections = []
    for r in results:
        status = "SUCCESS" if r.success else f"FAILED ({r.error})"
        section = (
            f"--- Sub-task {r.task_id} [{status}] "
            f"(agent={r.agent_id}, mode={r.streaming_mode.value}, "
            f"latency={r.latency_ms:.0f}ms) ---\n"
            f"{r.response if r.success else '[no output]'}\n"
        )
        result_sections.append(section)

    context = "\n".join(result_sections)

    messages = [
        SystemMessage(content=SYNTHESIS_SYSTEM_PROMPT),
        HumanMessage(
            content=(
                f"Original user query:\n{user_query}\n\n"
                f"Worker results:\n{context}\n\n"
                "Synthesize these into a single, helpful response."
            )
        ),
    ]

    resp = await llm.ainvoke(messages)
    final = resp.content.strip()
    logger.info("Synthesis complete (%d chars)", len(final))

    return {
        "final_response": final,
        "current_phase": "done",
        "messages": [AIMessage(content=final)],
    }


print("Synthesis node defined")

## 7. Error Handling Node

A dedicated error-handling node catches failures at any stage and decides whether to retry, re-plan, or gracefully terminate.

In [None]:
MAX_RETRIES = 2


async def error_handler_node(state: OrchestratorState) -> dict:
    """
    LangGraph node: handle errors from any upstream node.
    Simple strategy: log, attach an error message, and move to done.
    """
    logger.error("=== ERROR HANDLER === error: %s", state.get("error"))
    error_msg = state.get("error", "Unknown error")

    return {
        "final_response": f"The orchestrator encountered an error: {error_msg}",
        "current_phase": "done",
        "messages": [
            AIMessage(content=f"I encountered an issue while processing your request: {error_msg}")
        ],
    }


print("Error handler node defined")

## 8. LangGraph — Wiring the Orchestrator Graph

Here we assemble the full graph. Conditional edges route to the error handler when any node sets `current_phase = "error"`.

```
            +----------+
            | planning |
            +----+-----+
                 |
          +------v------+
          |  discovery  |
          +------+------+
                 |
          +------v------+
          |  execution  |
          +------+------+
                 |
          +------v------+
          |  synthesis  |
          +------+------+
                 |
               (END)

     Any node -> error_handler -> (END)
```

In [None]:
def route_after_planning(state: OrchestratorState) -> str:
    if state.get("error"):
        return "error_handler"
    return "discovery"


def route_after_discovery(state: OrchestratorState) -> str:
    if state.get("error"):
        return "error_handler"
    return "execution"


def route_after_execution(state: OrchestratorState) -> str:
    if state.get("error"):
        return "error_handler"
    return "synthesis"


def route_after_synthesis(state: OrchestratorState) -> str:
    return END


# -------------------------------------------------------------------
# Build the graph
# -------------------------------------------------------------------

workflow = StateGraph(OrchestratorState)

workflow.add_node("planning", planning_node)
workflow.add_node("discovery", discovery_node)
workflow.add_node("execution", execution_node)
workflow.add_node("synthesis", synthesis_node)
workflow.add_node("error_handler", error_handler_node)

workflow.set_entry_point("planning")

workflow.add_conditional_edges("planning", route_after_planning, {
    "discovery": "discovery",
    "error_handler": "error_handler",
})

workflow.add_conditional_edges("discovery", route_after_discovery, {
    "execution": "execution",
    "error_handler": "error_handler",
})

workflow.add_conditional_edges("execution", route_after_execution, {
    "synthesis": "synthesis",
    "error_handler": "error_handler",
})

workflow.add_conditional_edges("synthesis", route_after_synthesis, {
    END: END,
})

workflow.add_edge("error_handler", END)

orchestrator_graph = workflow.compile()

print("LangGraph orchestrator compiled")

## 9. Streaming Pass-Through Wrapper

This is the top-level API the caller (e.g. a FastAPI endpoint) uses. It runs the LangGraph orchestrator and **streams events back in real time** — forwarding dynamic worker streams token-by-token and wrapping static responses into the same event protocol.

> **Recommendation:** In a real deployment, expose this as an SSE endpoint via FastAPI's `StreamingResponse`. The caller receives a uniform stream of `StreamEvent` JSON lines regardless of which workers are dynamic vs. static.

In [None]:
async def run_orchestrator(query: str) -> Dict[str, Any]:
    """
    Non-streaming entry point — runs the full graph to completion
    and returns the final state.
    """
    initial = make_initial_state(query)
    final_state = await orchestrator_graph.ainvoke(initial)
    return final_state


async def run_orchestrator_streaming(query: str) -> AsyncGenerator[StreamEvent, None]:
    """
    Streaming entry point — yields StreamEvent objects as the orchestrator
    progresses through planning -> discovery -> execution -> synthesis.

    Uses LangGraph's astream_events to intercept node transitions
    and forward worker stream events in real time.
    """
    initial = make_initial_state(query)

    yield StreamEvent(
        agent_id="orchestrator",
        task_id="root",
        event_type="status",
        data="Starting orchestration...",
    )

    async for graph_event in orchestrator_graph.astream_events(initial, version="v2"):
        event_kind = graph_event.get("event", "")
        event_name = graph_event.get("name", "")

        if event_kind == "on_chain_start" and event_name in (
            "planning", "discovery", "execution", "synthesis", "error_handler"
        ):
            yield StreamEvent(
                agent_id="orchestrator",
                task_id="root",
                event_type="status",
                data=f"Entering {event_name} phase...",
            )

        # When the execution node completes, forward all buffered stream events
        if event_kind == "on_chain_end" and event_name == "execution":
            output = graph_event.get("data", {}).get("output", {})
            buffered = output.get("stream_buffer", [])
            for evt_data in buffered:
                if isinstance(evt_data, dict):
                    yield StreamEvent(**evt_data)

        # When synthesis completes, yield the final answer
        if event_kind == "on_chain_end" and event_name == "synthesis":
            output = graph_event.get("data", {}).get("output", {})
            final = output.get("final_response", "")
            if final:
                yield StreamEvent(
                    agent_id="orchestrator",
                    task_id="root",
                    event_type="message",
                    data=final,
                )

    yield StreamEvent(
        agent_id="orchestrator",
        task_id="root",
        event_type="done",
        data="",
    )


print("Orchestrator wrappers defined")

## 10. Mock Discovery Service & Workers (for local testing)

To run this notebook end-to-end without deploying real microservices, we stand up lightweight mock servers using `asyncio` and a simple dict-based registry.

These mocks simulate:
- A **discovery service** that returns agent descriptors
- **Dynamic-streaming workers** that emit SSE token streams
- **Static workers** that return hardcoded JSON

In [None]:
MOCK_AGENTS = [
    AgentDescriptor(
        agent_id="writer-01",
        name="Creative Writer",
        description="Generates creative text, stories, and marketing copy",
        endpoint="http://localhost:8001",
        capabilities=[AgentCapability.TEXT_GENERATION],
        streaming_mode=StreamingMode.DYNAMIC_EVENTS,
        payload_schema=PayloadSchema(
            required_fields={"query": "string"},
            example={"query": "Write a haiku about AI"},
        ),
        priority=10,
        timeout_seconds=60,
    ),
    AgentDescriptor(
        agent_id="coder-01",
        name="Code Assistant",
        description="Generates and explains code",
        endpoint="http://localhost:8002",
        capabilities=[AgentCapability.CODE_GENERATION],
        streaming_mode=StreamingMode.DYNAMIC_MESSAGES,
        payload_schema=PayloadSchema(
            required_fields={"query": "string", "language": "string"},
            example={"query": "Fibonacci function", "language": "python"},
        ),
        priority=10,
    ),
    AgentDescriptor(
        agent_id="lookup-01",
        name="Knowledge Lookup",
        description="Returns pre-indexed factual answers",
        endpoint="http://localhost:8003",
        capabilities=[AgentCapability.SEARCH],
        streaming_mode=StreamingMode.STATIC,
        static_response="Paris is the capital of France. Population: ~2.1 million (city), ~12 million (metro).",
        priority=5,
    ),
    AgentDescriptor(
        agent_id="summarizer-01",
        name="Summarizer",
        description="Summarizes long text into concise bullet points",
        endpoint="http://localhost:8004",
        capabilities=[AgentCapability.SUMMARIZATION],
        streaming_mode=StreamingMode.STATIC,
        static_response="Point 1: Key insight extracted.\nPoint 2: Supporting detail.\nPoint 3: Conclusion.",
        priority=8,
    ),
    AgentDescriptor(
        agent_id="translator-01",
        name="Translator",
        description="Translates text between languages using an LLM",
        endpoint="http://localhost:8005",
        capabilities=[AgentCapability.TRANSLATION],
        streaming_mode=StreamingMode.DYNAMIC_EVENTS,
        payload_schema=PayloadSchema(
            required_fields={"text": "string", "target_language": "string"},
            example={"text": "Hello world", "target_language": "Spanish"},
        ),
        priority=7,
    ),
    AgentDescriptor(
        agent_id="analyst-01",
        name="Data Analyst",
        description="Performs data analysis and generates insights",
        endpoint="http://localhost:8006",
        capabilities=[AgentCapability.DATA_ANALYSIS],
        streaming_mode=StreamingMode.DYNAMIC_MESSAGES,
        payload_schema=PayloadSchema(
            required_fields={"query": "string"},
            example={"query": "Analyze sales trends"},
        ),
        priority=9,
    ),
]

print(f"Mock registry loaded with {len(MOCK_AGENTS)} agents:")
for a in MOCK_AGENTS:
    print(f"  - {a.name:20s}  mode={a.streaming_mode.value:20s}  capabilities={[c.value for c in a.capabilities]}")

### Mock HTTP Servers

We use `aiohttp` to spin up lightweight servers that simulate the discovery service and individual workers. In a real deployment these would be separate Docker containers / Kubernetes pods.

In [None]:
from aiohttp import web


async def handle_discovery(request: web.Request) -> web.Response:
    """GET /agents -> return all registered mock agents."""
    agents_data = [a.model_dump() for a in MOCK_AGENTS]
    return web.json_response({"agents": agents_data})


async def handle_agent_health(request: web.Request) -> web.Response:
    """GET /health -> always healthy in the mock."""
    return web.json_response({"status": "ok"})


async def handle_dynamic_event_stream(request: web.Request) -> web.StreamResponse:
    """
    POST /stream -> SSE response that emits tokens one by one.
    Simulates a generative LLM worker.
    """
    body = await request.json()
    query = body.get("query", body.get("description", "default task"))

    simulated_tokens = f"Here is a generated response for: '{query}'. ".split()
    simulated_tokens += "This output is streamed token-by-token from the worker microservice.".split()

    resp = web.StreamResponse(
        status=200,
        headers={"Content-Type": "text/event-stream", "Cache-Control": "no-cache"},
    )
    await resp.prepare(request)

    for token in simulated_tokens:
        chunk = json.dumps({"token": token + " ", "type": "token"})
        await resp.write(f"data: {chunk}\n\n".encode())
        await asyncio.sleep(0.05)

    await resp.write(b'data: {"type": "done"}\n\n')
    return resp


async def handle_dynamic_message_stream(request: web.Request) -> web.StreamResponse:
    """
    POST /stream -> SSE response that emits complete chat messages.
    """
    body = await request.json()
    query = body.get("query", body.get("description", "default task"))

    messages_to_send = [
        f"Analyzing your request: '{query}'",
        "Processing with the language model...",
        f"Result: Here is the complete response for '{query}'. This was streamed as full messages.",
    ]

    resp = web.StreamResponse(
        status=200,
        headers={"Content-Type": "text/event-stream", "Cache-Control": "no-cache"},
    )
    await resp.prepare(request)

    for msg in messages_to_send:
        chunk = json.dumps({"role": "assistant", "content": msg})
        await resp.write(f"data: {chunk}\n\n".encode())
        await asyncio.sleep(0.2)

    await resp.write(b'data: {"type": "done"}\n\n')
    return resp


async def handle_static_invoke(request: web.Request) -> web.Response:
    """POST /invoke -> plain JSON response (no streaming)."""
    body = await request.json()
    return web.json_response({
        "response": f"Static result for task '{body.get('task_id', '?')}': "
                    "This is a deterministic, hardcoded answer.",
    })


_mock_servers = []


async def start_mock_discovery(port: int = 9000):
    app = web.Application()
    app.router.add_get("/agents", handle_discovery)
    app.router.add_get("/health", handle_agent_health)
    runner = web.AppRunner(app)
    await runner.setup()
    site = web.TCPSite(runner, "localhost", port)
    await site.start()
    _mock_servers.append(runner)
    logger.info("Mock Discovery Service running on :%d", port)


async def start_mock_worker(port: int, streaming_mode: StreamingMode):
    app = web.Application()
    app.router.add_get("/health", handle_agent_health)

    if streaming_mode == StreamingMode.DYNAMIC_EVENTS:
        app.router.add_post("/stream", handle_dynamic_event_stream)
    elif streaming_mode == StreamingMode.DYNAMIC_MESSAGES:
        app.router.add_post("/stream", handle_dynamic_message_stream)
    else:
        app.router.add_post("/invoke", handle_static_invoke)

    runner = web.AppRunner(app)
    await runner.setup()
    site = web.TCPSite(runner, "localhost", port)
    await site.start()
    _mock_servers.append(runner)
    logger.info("Mock Worker running on :%d  mode=%s", port, streaming_mode.value)


async def start_all_mocks():
    """Spin up the discovery service and one worker per mock agent."""
    await start_mock_discovery(9000)
    for agent in MOCK_AGENTS:
        port = int(agent.endpoint.split(":")[-1])
        await start_mock_worker(port, agent.streaming_mode)
    print(f"All mock servers running ({1 + len(MOCK_AGENTS)} total)")


async def stop_all_mocks():
    for runner in _mock_servers:
        await runner.cleanup()
    _mock_servers.clear()
    print("All mock servers stopped")


print("Mock server helpers defined")

## 11. End-to-End Demo

Start the mock servers, run the orchestrator with a sample query, and observe the full pipeline in action: planning -> discovery -> execution (with streaming) -> synthesis.

In [None]:
# Start mock infrastructure
await start_all_mocks()

In [None]:
# -------------------------------------------------------------------
# Demo 1: Non-streaming run (full result at the end)
# -------------------------------------------------------------------

demo_query = "Write a short poem about artificial intelligence and also look up the capital of France."

print("=" * 70)
print(f"QUERY: {demo_query}")
print("=" * 70)

result = await run_orchestrator(demo_query)

print("\n--- Final Response ---")
print(result.get("final_response", "(no response)"))
print("\n--- Execution Plan ---")
plan = result.get("execution_plan")
if plan:
    print(f"  Plan ID: {plan.plan_id}")
    print(f"  Reasoning: {plan.reasoning}")
    for t in plan.sub_tasks:
        print(f"  - {t.task_id}: {t.description} [{t.required_capability.value}]")

print("\n--- Worker Results ---")
for wr in result.get("worker_results", []):
    status = "OK" if wr.success else "FAIL"
    print(f"  [{status}] {wr.task_id} (agent={wr.agent_id}, mode={wr.streaming_mode.value}, {wr.latency_ms:.0f}ms)")
    preview = wr.response[:120] + ('...' if len(wr.response) > 120 else '')
    print(f"    Response: {preview}")

In [None]:
# -------------------------------------------------------------------
# Demo 2: Streaming run (events arrive in real time)
# -------------------------------------------------------------------

demo_query_2 = "Generate a Python function for binary search and summarize its time complexity."

print("=" * 70)
print(f"STREAMING QUERY: {demo_query_2}")
print("=" * 70)
print()

async for event in run_orchestrator_streaming(demo_query_2):
    prefix = f"[{event.agent_id}/{event.task_id}]"
    if event.event_type == "token":
        print(event.data, end="", flush=True)
    elif event.event_type == "message":
        print(f"\n{prefix} MSG: {event.data[:200]}")
    elif event.event_type == "status":
        print(f"\n{prefix} STATUS: {event.data}")
    elif event.event_type == "error":
        print(f"\n{prefix} ERROR: {event.data}")
    elif event.event_type == "done":
        print(f"\n{prefix} DONE")

print("\n" + "=" * 70)

In [None]:
# Cleanup mock servers
await stop_all_mocks()
await discovery_client.close()
await streaming_mux.close()

## 12. FastAPI Integration Example

Below is a reference implementation showing how to expose the orchestrator as a production-ready HTTP API with SSE streaming.

In [None]:
FASTAPI_EXAMPLE = '''
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import json

app = FastAPI(title="LangGraph Orchestrator API")


@app.post("/orchestrate")
async def orchestrate_non_streaming(request: Request):
    """Non-streaming: returns the full synthesized response."""
    body = await request.json()
    query = body["query"]
    result = await run_orchestrator(query)
    return {
        "response": result["final_response"],
        "plan": result["execution_plan"].model_dump() if result.get("execution_plan") else None,
        "worker_results": [r.model_dump() for r in result.get("worker_results", [])],
    }


@app.post("/orchestrate/stream")
async def orchestrate_streaming(request: Request):
    """
    Streaming: returns SSE events as the orchestrator executes.

    Each line is:  data: {"agent_id": "...", "event_type": "token|message|status|done", "data": "..."}

    The client reads this with EventSource or any SSE library.
    """
    body = await request.json()
    query = body["query"]

    async def event_generator():
        async for event in run_orchestrator_streaming(query):
            yield f"data: {event.model_dump_json()}\\n\\n"

    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
        headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
    )
'''

print("FastAPI integration example (reference only — not executed in notebook):")
print(FASTAPI_EXAMPLE)

## 13. Recommendations & Design Notes

### Architecture Recommendations

| Area | Recommendation | Why |
|------|----------------|-----|
| **Discovery caching** | Cache agent descriptors with a 5-min TTL and background refresh | Avoids hammering the discovery service on every request; stale cache is acceptable for short windows |
| **Circuit breakers** | Wrap worker calls in a circuit breaker (e.g. `aiobreaker`) | Prevents cascading failures when a worker microservice goes down |
| **Retry with backoff** | Use exponential backoff (2x delay, max 3 retries) for transient HTTP errors | Handles network blips without overwhelming workers |
| **Structured LLM output** | Use OpenAI function-calling / tool-use for the planner | Eliminates brittle JSON parsing from free-form text |
| **Observability** | Add OpenTelemetry spans per node and per worker call | Critical for debugging latency in a distributed system |
| **DAG execution** | Replace the 2-pass executor with a topological-sort DAG scheduler | Enables maximum parallelism when tasks have complex dependency trees |
| **Streaming protocol** | Standardize on NDJSON over SSE with a `StreamEvent` schema | Uniform protocol across all consumers (web, mobile, CLI) |
| **Idempotency** | Give each orchestration run a unique `run_id`; workers should be idempotent | Safe retries and deduplication |

### Streaming Design Notes

1. **Dynamic vs. Static is per-agent, not per-request.** The discovery service declares the mode. The orchestrator doesn't guess — it reads the agent descriptor.

2. **Event-level streaming** (token-by-token) is best for:
   - Real-time UX where users see text appearing
   - Long-running generative tasks

3. **Message-level streaming** (full messages) is best for:
   - Multi-turn conversational agents
   - Agents that produce structured intermediate steps

4. **Static responses** are best for:
   - Lookup / retrieval agents
   - Rule-based / deterministic agents
   - Agents that call external APIs and return a fixed-format result

5. **Back-pressure:** If the client can't consume events fast enough, consider adding a bounded async queue between the streaming multiplexer and the SSE endpoint.

### Security Considerations

- **mTLS between orchestrator and workers** — microservices should authenticate each other
- **API key rotation** via the discovery service metadata
- **Rate limiting** at the orchestrator level to prevent abuse
- **Input validation** — the planner's output should be schema-validated before dispatching to workers

### Scalability Path

```
Phase 1: Single orchestrator process, mock workers (this notebook)
Phase 2: Docker Compose with real workers, shared Redis for state
Phase 3: Kubernetes with auto-scaling workers, LangGraph Cloud for orchestration
Phase 4: Multi-region with geo-routed discovery service
```

## 14. Dependencies

Install the required packages:

```bash
pip install langchain-openai langgraph langchain-core pydantic httpx aiohttp
```

### Version Matrix (tested)

| Package | Version |
|---------|---------|
| langchain-openai | >= 0.1.0 |
| langgraph | >= 0.2.0 |
| langchain-core | >= 0.2.0 |
| pydantic | >= 2.0 |
| httpx | >= 0.27 |
| aiohttp | >= 3.9 |

---

*End of notebook.*