Skip to content

321barney/cobweb

Repository files navigation

🕸️ Cobweb (AgentPM)

The orchestration layer that treats agents like a project — critical path, earned value, budget gates, stage gates, built in.

Author: Barney (barnros89@gmail.com) | License: Apache 2.0


Why "Cobweb"?

A cobweb is a network of interconnected threads — every node connected, every path traceable, the whole structure visible at a glance. That's what this runtime does: connects agents into a managed web where every thread (task), every junction (dependency), and every path (critical or not) is tracked and controlled.

Cobwebs also catch things. This runtime catches failures, cost overruns, scope drift, and permission violations before they propagate.


The Problem

Multi-agent systems have four unsolved problems:

Problem Current state Cobweb solution
Orchestration failure Ad-hoc task graphs WBS + CPM (dependency analysis with critical path)
No observability "It's running somewhere" EVM (CPI, SPI, EAC — real-time health)
No identity/permissions All agents are admin RACI+G (Responsible, Accountable, Consulted, Informed, Gatekeeper)
Cost explosion "We burned $400 overnight" Budget gates + stage gates (alert → abort → failover)

Core insight: These "new" agent problems are old Project Management problems.


Installation

# From PyPI (when published)
pip install cobweb

# From source (development)
git clone https://github.com/321barney/cobweb.git
cd cobweb
pip install -e ".[dev]"

Requirements: Python ≥ 3.11 — no external dependencies for the core runtime.


Quick Start

1. Decorator-based (simplest)

import asyncio
from cobweb import PMRuntime, task, gate, get_registered_tasks, clear_registry

@task(id="WP-1", model="gpt-4o", est_cost=0.05,
      raci={"R": "research-agent", "A": "orchestrator"})
async def research(query: str = "", **kw):
    # Call your LLM here — report real costs via __cost__ protocol
    result = await your_llm_call(query)
    return {
        "output": result.text,
        "__cost__": result.usage.total_cost,   # real USD
        "__tokens__": result.usage.total_tokens,
    }

@task(id="WP-2", deps=["WP-1"], model="claude-3-sonnet", est_cost=0.08,
      raci={"R": "writer-agent", "A": "orchestrator"})
async def write_report(**kw):
    return {"output": "Report written", "__cost__": 0.06, "__tokens__": 2000}

@gate(id="WP-3", deps=["WP-2"],
      raci={"R": "eval-agent", "A": "orchestrator", "G": "human"})
async def quality_gate(**kw):
    return True

async def main():
    runtime = PMRuntime(budget=0.50, alert_threshold=0.6)
    for spec in get_registered_tasks():
        runtime.register_task(spec)

    results = await runtime.execute(query="quantum computing")

    evm = runtime.evm_report()
    print(f"CPI={evm.cpi:.2f}, SPI={evm.spi:.2f}, Health={evm.budget_health.value}")
    print(f"Critical path: {runtime.critical_path()}")
    print(f"Lessons: {runtime.lessons_learned()['recommendations']}")

clear_registry()
asyncio.run(main())

2. Programmatic (no decorators)

from cobweb import PMRuntime, TaskSpec, RACI

runtime = PMRuntime(budget=1.0)
runtime.register_task(TaskSpec(
    id="STEP-1", name="Collect Data", model="gpt-4o",
    est_cost=0.10, raci=RACI("collector", "orchestrator"),
))
runtime.register_task(TaskSpec(
    id="STEP-2", name="Analyze", deps=["STEP-1"], model="sonnet",
    est_cost=0.15, raci=RACI("analyst", "orchestrator"),
))
results = await runtime.execute()

3. Custom adapter (framework integration)

from cobweb import PMRuntime, CobwebAdapter, TaskSpec, TaskResult, TaskStatus

class MyFrameworkAdapter(CobwebAdapter):
    async def execute_task(self, task_spec, context):
        result = await my_framework.run(task_spec.func, **context)
        return TaskResult(
            status=TaskStatus.COMPLETED,
            actual_cost=result.cost,
            actual_tokens=result.tokens,
            output=result.output,
        )

    def get_supported_models(self):
        return ["gpt-4o", "claude-3-sonnet"]

    def estimate_cost(self, model, input_tokens, output_tokens):
        return (input_tokens + output_tokens) * 0.00001

runtime = PMRuntime(budget=2.0, adapter=MyFrameworkAdapter())

What the Runtime Does

┌─────────────────────────────────────────────────────────────┐
│  1. PLAN    │ Parse WBS → Register tasks → Compute CPM     │
│  2. SCHEDULE│ Topological sort → Fast-track parallel tasks  │
│  3. EXECUTE │ Dispatch via adapter → Budget gate pre-check  │
│  4. MONITOR │ EVM (CPI/SPI) → Cost variance → Stage gates  │
│  5. CLOSE   │ Lessons learned → Recommendations → Reports  │
└─────────────────────────────────────────────────────────────┘

PM Methodology → Agent Mechanics

PM Concept Agent Implementation
WBS @task decorator / TaskSpec registration
CPM Dependency graph → forward/backward pass → float analysis
EVM Real-time CPI, SPI, EAC, TCPI from actual costs
RACI Role assignment (R=agent, A=orchestrator, G=human)
Budget gates Alert at 75%, abort at 100%
Stage gates @gate decorator → human-in-the-loop approval
Fast-tracking Parallel dispatch of independent tasks
Crashing Failover to backup_model on failure
Float analysis Tasks with slack → safe to use cheaper models
3-point est. PERT: (O + 4M + P) / 6 for duration uncertainty

Real Cost Tracking Protocol

Task functions report real API costs by returning a dict with special keys:

@task(id="WP-1", model="gpt-4o", est_cost=0.05)
async def my_task(**kw):
    response = await openai_client.chat.completions.create(...)
    return {
        "output": response.choices[0].message.content,
        "__cost__": response.usage.total_tokens * 0.000005,  # real USD
        "__tokens__": response.usage.total_tokens,           # real count
        "__model__": "gpt-4o",                               # model used
    }

If your function returns a plain value, the adapter uses the estimated cost.


Event System (Observability)

from cobweb import PMRuntime, EventType

runtime = PMRuntime(budget=1.0)

# Subscribe to specific events
runtime.events.subscribe(EventType.TASK_COMPLETED, lambda e:
    print(f"Task {e.data['task_id']} done — ${e.data['actual_cost']:.3f}")
)

# Subscribe to ALL events (audit log)
runtime.events.subscribe_all(lambda e:
    my_logger.info(f"[{e.type.value}] {e.data}")
)

# Post-execution: query event history
for event in runtime.events.history:
    print(event)

22 event types covering the full lifecycle: workflow.started, task.started, task.completed, task.failed, gate.pending, gate.approved, budget.alert, budget.exceeded, etc.


API Reference

PMRuntime

PMRuntime(
    budget=1.0,              # BAC in USD
    alert_threshold=0.75,    # alert at 75% of budget
    abort_threshold=1.0,     # abort at 100% of budget
    max_concurrent=10,       # resource leveling
    mode=RuntimeMode.STANDARD,
    adapter=CustomAdapter(),  # or your custom adapter
    on_budget_alert=callback,
    on_gate=async_callback,
    on_failure=callback,
)
Method Returns Description
register_task(spec) None Add task to WBS
register_risk(risk) None Add to risk register
await execute(**kw) dict[str, TaskResult] Run full PM lifecycle
evm_report() EVMReport Current EVM snapshot
critical_path() list[str] Critical task IDs
baseline_comparison() list[dict] Est. vs actual per task
float_analysis() list[dict] Slack + model recommendations
lessons_learned() dict Analysis + recommendations
get_change_log() list[ChangeEntry] Full change control log

Package Structure

cobweb/
├── models.py       # TaskSpec, TaskResult, RACI, Risk, enums
├── cpm.py          # Critical Path Method engine
├── evm.py          # Earned Value Management engine
├── runtime.py      # PMRuntime orchestrator
├── decorators.py   # @task, @gate
├── events.py       # EventBus, 22 event types
├── exceptions.py   # 10 specific exception types
├── adapters.py     # CobwebAdapter ABC + CustomAdapter
└── py.typed        # PEP 561 marker

Development

git clone https://github.com/321barney/cobweb.git
cd cobweb
pip install -e ".[dev]"
python -m pytest tests/ -v           # 78 tests
python -m examples.research_agent    # full demo

Roadmap

  • v0.1 ✅ Python MVP — WBS, CPM, EVM, RACI, budget/stage gates, events
  • v0.2 ✅ Ext-Adapters (LangGraph, CrewAI) & Monte Carlo Simulation
  • v0.3 🔜 JavaScript/TypeScript port (same architecture)
  • v0.4 Dashboard UI, resource leveling, lessons-learned DB
  • v0.5 OpenTelemetry trace export integration

Contributing

Cobweb is in early alpha. If you're interested in the intersection of PM methodology and agentic AI, contributions are welcome. See CONTRIBUTING.md for guidelines.

Areas where help is needed:

  • Framework adapters (LangGraph, CrewAI, AutoGen)
  • OpenTelemetry trace export
  • Monte Carlo simulation engine
  • Dashboard UI
  • Documentation and examples

About

The orchestration layer that treats agents like a project — critical path, earned value, budget gates, stage gates, built in

Topics

Resources

License

Contributing

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages