# 01 â€” Patron Orquestador-Workers

**Objetivo**: Implementar el patron donde un agente orquestador descompone tareas complejas en subtareas y las despacha a workers especializados.

## Contenido
1. Diseño del orquestador
2. Workers especializados (Batman, Spider-Man, Sintetizador)
3. Despacho con `Send()` de LangGraph
4. Sintetizacion de resultados

In [None]:
import os
import json
import time
from typing import TypedDict, Annotated, Literal
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_core.messages import HumanMessage, SystemMessage
from langgraph.graph import StateGraph, START, END
from langgraph.types import Send
from pydantic import BaseModel, Field
import chromadb

load_dotenv()

llm = ChatOpenAI(model="gpt-5-mini", temperature=0)
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")

print("=" * 60)
print("PATRON ORQUESTADOR-WORKERS")
print("=" * 60)

In [None]:
# ============================================================
# SETUP: Cargar comics en ChromaDB
# ============================================================

from langchain_text_splitters import RecursiveCharacterTextSplitter

chroma_client = chromadb.Client()
splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50)

for personaje in ["batman", "spiderman"]:
    with open(f"../data/{personaje}_comics.json") as f:
        comics = json.load(f)
    
    chunks, metas = [], []
    for comic in comics:
        for i, chunk in enumerate(splitter.split_text(comic["contenido"])):
            chunks.append(chunk)
            metas.append({"personaje": personaje, "arco": comic["arco"], "tema": comic["tema"], "titulo": comic["titulo"]})
    
    embs = embeddings.embed_documents(chunks)
    col = chroma_client.create_collection(name=f"orch_{personaje}", metadata={"hnsw:space": "cosine"})
    col.add(ids=[f"{personaje}_{i}" for i in range(len(chunks))], embeddings=embs, documents=chunks, metadatas=metas)
    print(f"Coleccion orch_{personaje}: {col.count()} chunks")

## 1. Diseño del Orquestador

El orquestador recibe una pregunta compleja y la descompone en subtareas.

```
                    ┌──────────────┐
                    │ Orquestador  │
                    │ (descompone) │
                    └──────┬───────┘
                           │
              ┌────────────┼────────────┐
              ▼            ▼            ▼
        ┌──────────┐ ┌──────────┐ ┌──────────┐
        │ Worker   │ │ Worker   │ │ Worker   │
        │ Batman   │ │ Spider   │ │ General  │
        └────┬─────┘ └────┬─────┘ └────┬─────┘
             │             │            │
             └─────────────┼────────────┘
                           ▼
                    ┌──────────────┐
                    │ Sintetizador │
                    └──────────────┘
```

In [None]:
# ============================================================
# MODELO DE DESCOMPOSICION
# ============================================================

class Subtarea(BaseModel):
    """Una subtarea generada por el orquestador."""
    id: int = Field(description="ID de la subtarea")
    descripcion: str = Field(description="Descripcion de lo que debe investigar")
    worker: Literal["batman", "spiderman", "general"] = Field(description="Worker asignado")

class PlanOrquestacion(BaseModel):
    """Plan de descomposicion del orquestador."""
    pregunta_original: str = Field(description="La pregunta original del usuario")
    subtareas: list[Subtarea] = Field(description="Lista de subtareas a ejecutar")
    estrategia: str = Field(description="Breve descripcion de la estrategia de descomposicion")

orchestrator_llm = llm.with_structured_output(PlanOrquestacion)

# Test
plan = orchestrator_llm.invoke(
    "Descompone esta pregunta en subtareas para workers especializados en Batman y Spider-Man:\n\n"
    "Quien seria mejor lider en una crisis: Batman o Spider-Man? Considera su experiencia con equipos, "
    "su estilo de liderazgo, y momentos donde demostraron liderazgo."
)

print("Plan de orquestacion:")
print(f"  Estrategia: {plan.estrategia}")
for st in plan.subtareas:
    print(f"  [{st.worker:10s}] Subtarea {st.id}: {st.descripcion}")

In [None]:
# ============================================================
# ESTADOS Y WORKERS
# ============================================================

class OrchestratorState(TypedDict):
    pregunta: str
    plan: dict | None
    resultados_workers: list[dict]
    respuesta_final: str


class WorkerState(TypedDict):
    subtarea: str
    worker_type: str
    resultado: str


def nodo_orquestador(state: OrchestratorState) -> dict:
    """Descompone la pregunta en subtareas."""
    plan = orchestrator_llm.invoke(
        f"Descompone esta pregunta en subtareas para workers especializados en Batman y Spider-Man:\n\n{state['pregunta']}"
    )
    return {"plan": plan.model_dump()}


def dispatch_workers(state: OrchestratorState) -> list[Send]:
    """Despacha subtareas a workers usando Send()."""
    plan = state["plan"]
    sends = []
    for subtarea in plan["subtareas"]:
        sends.append(Send("worker", {
            "subtarea": subtarea["descripcion"],
            "worker_type": subtarea["worker"],
            "resultado": "",
        }))
    return sends


def nodo_worker(state: WorkerState) -> dict:
    """Worker que ejecuta una subtarea."""
    worker_type = state["worker_type"]
    subtarea = state["subtarea"]
    
    # Buscar en ChromaDB segun el tipo de worker
    if worker_type in ("batman", "spiderman"):
        col = chroma_client.get_collection(f"orch_{worker_type}")
        emb = embeddings.embed_query(subtarea)
        results = col.query(query_embeddings=[emb], n_results=3, include=["documents", "metadatas"])
        contexto = "\n".join([f"[{m['arco']}]: {d[:300]}" for d, m in zip(results["documents"][0], results["metadatas"][0])])
    else:
        # Worker general busca en ambas
        contexto_parts = []
        for p in ["batman", "spiderman"]:
            col = chroma_client.get_collection(f"orch_{p}")
            emb = embeddings.embed_query(subtarea)
            results = col.query(query_embeddings=[emb], n_results=2, include=["documents", "metadatas"])
            contexto_parts.extend([f"[{m['personaje']}/{m['arco']}]: {d[:200]}" for d, m in zip(results["documents"][0], results["metadatas"][0])])
        contexto = "\n".join(contexto_parts)
    
    response = llm.invoke([
        SystemMessage(content=f"Eres un worker especializado en {worker_type}. Responde esta subtarea basandote en el contexto.\n\nContexto:\n{contexto}"),
        HumanMessage(content=subtarea),
    ])
    
    return {"resultado": response.content}


def collect_results(state: OrchestratorState) -> dict:
    """No-op: los resultados se acumulan automaticamente."""
    return {}


def nodo_sintetizador(state: OrchestratorState) -> dict:
    """Sintetiza los resultados de todos los workers."""
    resultados = state.get("resultados_workers", [])
    
    resumen_workers = "\n\n".join([
        f"Worker {i+1} ({r.get('worker_type', '?')}): {r.get('resultado', 'sin resultado')[:500]}"
        for i, r in enumerate(resultados)
    ])
    
    response = llm.invoke([
        SystemMessage(content="Eres un sintetizador. Combina las investigaciones de los workers en una respuesta coherente y completa. Cita las fuentes cuando sea posible. Responde en espa\u00f1ol."),
        HumanMessage(content=f"Pregunta original: {state['pregunta']}\n\nResultados de investigacion:\n{resumen_workers}"),
    ])
    
    return {"respuesta_final": response.content}


print("Workers y sintetizador definidos")

In [None]:
# ============================================================
# CONSTRUCCION Y EJECUCION
# ============================================================

# Simplified version without Send() for compatibility
def run_orchestrator(pregunta: str) -> dict:
    """Ejecuta el patron orquestador-workers manualmente."""
    t0 = time.time()
    
    # 1. Orquestador descompone
    plan = orchestrator_llm.invoke(
        f"Descompone esta pregunta en subtareas para workers de Batman y Spider-Man:\n\n{pregunta}"
    )
    
    print(f"Plan: {plan.estrategia}")
    for st in plan.subtareas:
        print(f"  [{st.worker}] {st.descripcion}")
    
    # 2. Workers ejecutan en secuencia (en prod seria paralelo)
    resultados = []
    for subtarea in plan.subtareas:
        worker_state = {"subtarea": subtarea.descripcion, "worker_type": subtarea.worker, "resultado": ""}
        result = nodo_worker(worker_state)
        resultados.append({**worker_state, **result})
        print(f"  Worker {subtarea.worker} completado ({len(result['resultado'])} chars)")
    
    # 3. Sintetizador combina
    synth_state = {"pregunta": pregunta, "resultados_workers": resultados, "plan": plan.model_dump(), "respuesta_final": ""}
    final = nodo_sintetizador(synth_state)
    
    latencia = (time.time() - t0) * 1000
    
    return {
        "pregunta": pregunta,
        "plan": plan.model_dump(),
        "num_workers": len(resultados),
        "respuesta": final["respuesta_final"],
        "latencia_ms": round(latencia, 1),
    }


print("=" * 60)
print("EJECUCION DEL ORQUESTADOR")
print("=" * 60)

result = run_orchestrator(
    "Quien seria mejor lider en una crisis: Batman o Spider-Man? "
    "Considera su experiencia con equipos, estilo de liderazgo, y momentos clave."
)

print(f"\nRESPUESTA FINAL:")
print(result["respuesta"][:600])
print(f"\nMetricas: {result['num_workers']} workers, {result['latencia_ms']}ms")

## Takeaways

1. El **orquestador** descompone preguntas complejas en subtareas manejables
2. Los **workers** son especialistas que acceden a su propia base de conocimiento
3. El **sintetizador** combina resultados en una respuesta coherente
4. Este patron escala bien: agregar un nuevo especialista es agregar un worker
5. En produccion, los workers pueden ejecutarse en **paralelo** usando `Send()`
6. El costo es proporcional al numero de subtareas (mas descomposicion = mas caro)