diff --git a/src/deckhand/main.py b/src/deckhand/main.py index ea7ded8..556df84 100644 --- a/src/deckhand/main.py +++ b/src/deckhand/main.py @@ -23,6 +23,7 @@ from deckhand.agents.mock import MockAgent from deckhand.config.settings import Settings from deckhand.logging_config import configure_logging +from deckhand.metrics import Metrics from deckhand.orchestrator.actions import ActionRegistry from deckhand.orchestrator.events import build_error_event, build_event from deckhand.orchestrator.manager import Orchestrator @@ -46,6 +47,7 @@ plugin_registry: PluginRegistry | None = None settings: Settings | None = None rate_limiter: RateLimiter | None = None +metrics: Metrics | None = None _service_start_time: float | None = None SERVICE_VERSION = "0.3.0" @@ -61,9 +63,11 @@ async def lifespan(app: FastAPI): plugin_registry, \ settings, \ rate_limiter, \ + metrics, \ _service_start_time _service_start_time = time.time() + metrics = Metrics(started_at=_service_start_time) # Startup — load settings first so we can configure logging from them settings = Settings() @@ -93,7 +97,10 @@ async def lifespan(app: FastAPI): rate_limiter = RateLimiter(settings.rate_limit_rpm) # Initialize orchestrator - orchestrator = Orchestrator(state_persist_path=settings.state_file_path) + orchestrator = Orchestrator( + state_persist_path=settings.state_file_path, + metrics=metrics, + ) orchestrator.register_agent( MockAgent(agent_id="mock-1", project_root="/home/dev/project-alpha") ) @@ -102,8 +109,8 @@ async def lifespan(app: FastAPI): ) # Initialize registries - action_registry = ActionRegistry(orchestrator) - signal_registry = SignalRegistry() + action_registry = ActionRegistry(orchestrator, metrics=metrics) + signal_registry = SignalRegistry(metrics=metrics) plugin_registry = PluginRegistry( actions=action_registry, signals=signal_registry, @@ -298,6 +305,36 @@ async def health() -> dict[str, object]: } +# --------------------------------------------------------------------------- +# Metrics (unauthenticated) +# --------------------------------------------------------------------------- + + +@app.get("/metrics") +async def metrics_endpoint() -> dict[str, object]: + """Operational metrics snapshot. Unauthenticated for monitoring.""" + if orchestrator is None or metrics is None: + raise HTTPException(status_code=503, detail="Service not initialized") + + snapshot = metrics.snapshot() + + agents = list(orchestrator.list_agents()) + status_counts: dict[str, int] = {} + for agent in agents: + status = agent.status.value + status_counts[status] = status_counts.get(status, 0) + 1 + + snapshot["websocket_clients"] = orchestrator.event_bus.client_count + snapshot["agents"] = { + "count": len(agents), + "by_status": status_counts, + } + snapshot["state_store"] = { + "entry_count": orchestrator.state_store.entry_count(), + } + return snapshot + + # --------------------------------------------------------------------------- # Agent routes (read) # --------------------------------------------------------------------------- diff --git a/src/deckhand/metrics.py b/src/deckhand/metrics.py new file mode 100644 index 0000000..e4829b4 --- /dev/null +++ b/src/deckhand/metrics.py @@ -0,0 +1,55 @@ +from __future__ import annotations + +import time +from dataclasses import dataclass, field + + +@dataclass +class Metrics: + """Lightweight in-memory operational counters. + + Safe to use without external dependencies. Thread safety is not a concern + here: the service runs on a single asyncio event loop and counter + increments are atomic at the bytecode level for simple ints. + """ + + started_at: float = field(default_factory=time.time) + events_total: int = 0 + actions_total: int = 0 + actions_success: int = 0 + actions_failure: int = 0 + signals_total: int = 0 + signals_by_name: dict[str, int] = field(default_factory=dict) + + def record_event(self) -> None: + self.events_total += 1 + + def record_action(self, *, success: bool) -> None: + self.actions_total += 1 + if success: + self.actions_success += 1 + else: + self.actions_failure += 1 + + def record_signal(self, name: str) -> None: + self.signals_total += 1 + self.signals_by_name[name] = self.signals_by_name.get(name, 0) + 1 + + def snapshot(self) -> dict[str, object]: + uptime = max(time.time() - self.started_at, 1e-9) + return { + "uptime_seconds": uptime, + "events": { + "total": self.events_total, + "per_second": self.events_total / uptime, + }, + "actions": { + "total": self.actions_total, + "success": self.actions_success, + "failure": self.actions_failure, + }, + "signals": { + "total": self.signals_total, + "by_name": dict(self.signals_by_name), + }, + } diff --git a/src/deckhand/orchestrator/actions.py b/src/deckhand/orchestrator/actions.py index 3d175dd..e75732f 100644 --- a/src/deckhand/orchestrator/actions.py +++ b/src/deckhand/orchestrator/actions.py @@ -2,6 +2,7 @@ from typing import Any, Awaitable, Callable, Protocol +from deckhand.metrics import Metrics from deckhand.orchestrator.metadata import ActionMetadata @@ -19,8 +20,13 @@ async def provide_input(self, agent_id: str, text: str) -> None: ... class ActionRegistry: """Maps named actions to orchestrator commands.""" - def __init__(self, orchestrator: OrchestratorActions) -> None: + def __init__( + self, + orchestrator: OrchestratorActions, + metrics: Metrics | None = None, + ) -> None: self._orchestrator = orchestrator + self._metrics = metrics self._actions: dict[str, ActionHandler] = {} self._metadata: dict[str, ActionMetadata] = {} self._register_defaults() @@ -44,7 +50,14 @@ async def run(self, name: str, payload: dict[str, object]) -> None: handler = self._actions.get(name) if handler is None: raise KeyError(name) - await handler(payload) + try: + await handler(payload) + except Exception: + if self._metrics is not None: + self._metrics.record_action(success=False) + raise + if self._metrics is not None: + self._metrics.record_action(success=True) def list_actions(self) -> list[ActionMetadata]: """List all registered actions with metadata.""" diff --git a/src/deckhand/orchestrator/events.py b/src/deckhand/orchestrator/events.py index 6b62edc..17c6a9e 100644 --- a/src/deckhand/orchestrator/events.py +++ b/src/deckhand/orchestrator/events.py @@ -5,6 +5,7 @@ from fastapi import WebSocket +from deckhand.metrics import Metrics from deckhand.orchestrator.schemas import EventEnvelope, EventSource @@ -85,8 +86,9 @@ def build_error_event( class EventBus: """In-memory pub/sub for Deckhand events.""" - def __init__(self) -> None: + def __init__(self, metrics: Metrics | None = None) -> None: self._subscribers: set[WebSocket] = set() + self._metrics = metrics @property def client_count(self) -> int: @@ -124,6 +126,9 @@ async def emit(self, event: dict[str, Any]) -> None: ): raise ValueError("Event source must have 'kind' and 'id' fields") + if self._metrics is not None: + self._metrics.record_event() + dead: list[WebSocket] = [] for websocket in self._subscribers: try: diff --git a/src/deckhand/orchestrator/manager.py b/src/deckhand/orchestrator/manager.py index 60aed64..05353b1 100644 --- a/src/deckhand/orchestrator/manager.py +++ b/src/deckhand/orchestrator/manager.py @@ -3,6 +3,7 @@ from typing import Iterable from deckhand.agents.base import AgentBase +from deckhand.metrics import Metrics from deckhand.orchestrator.events import EventBus from deckhand.orchestrator.state import StateStore @@ -10,9 +11,14 @@ class Orchestrator: """Tracks agent lifecycle and routes commands to agents.""" - def __init__(self, state_persist_path: str | None = None) -> None: + def __init__( + self, + state_persist_path: str | None = None, + metrics: Metrics | None = None, + ) -> None: self.agents: dict[str, AgentBase] = {} - self.event_bus = EventBus() + self.metrics = metrics + self.event_bus = EventBus(metrics=metrics) self.state_store = StateStore(self.event_bus, persist_path=state_persist_path) def register_agent(self, agent: AgentBase) -> None: diff --git a/src/deckhand/orchestrator/signals.py b/src/deckhand/orchestrator/signals.py index d8c7dcf..50b86c0 100644 --- a/src/deckhand/orchestrator/signals.py +++ b/src/deckhand/orchestrator/signals.py @@ -2,6 +2,7 @@ from typing import Any, Awaitable, Callable +from deckhand.metrics import Metrics from deckhand.orchestrator.metadata import SignalMetadata @@ -11,9 +12,10 @@ class SignalRegistry: """Maps named signals to handlers.""" - def __init__(self) -> None: + def __init__(self, metrics: Metrics | None = None) -> None: self._signals: dict[str, SignalHandler] = {} self._metadata: dict[str, SignalMetadata] = {} + self._metrics = metrics def register( self, @@ -35,6 +37,8 @@ async def handle(self, name: str, payload: dict[str, object]) -> None: if handler is None: raise KeyError(name) await handler(payload) + if self._metrics is not None: + self._metrics.record_signal(name) def list_signals(self) -> list[SignalMetadata]: """List all registered signals with metadata.""" diff --git a/tests/test_bridge.py b/tests/test_bridge.py index 7fdcac6..4c47578 100644 --- a/tests/test_bridge.py +++ b/tests/test_bridge.py @@ -225,6 +225,38 @@ async def test_health_endpoint(client: AsyncClient) -> None: assert data["state_store"]["writable"] is True +async def test_metrics_endpoint(client: AsyncClient) -> None: + """GET /metrics returns operational counters and reflects activity.""" + # Trigger some activity: one successful action and one failing action. + resp = await client.post("/actions/agent.start", json={"agent_id": "mock-1"}) + assert resp.status_code == 200 + + resp = await client.post( + "/actions/agent.start", + json={}, # missing agent_id -> ValidationError + ) + assert resp.status_code in (400, 422) + + # Metrics is unauthenticated. + transport = ASGITransport(app=client._transport.app) + async with AsyncClient(transport=transport, base_url="http://test") as no_auth: + resp = await no_auth.get("/metrics") + assert resp.status_code == 200 + data = resp.json() + + assert data["uptime_seconds"] > 0 + assert data["events"]["total"] >= 1 + assert data["events"]["per_second"] >= 0 + # At least the successful action ran; the failing one was rejected before + # the handler by payload validation, so only success is guaranteed. + assert data["actions"]["total"] >= 1 + assert data["actions"]["success"] >= 1 + assert "by_status" in data["agents"] + assert data["agents"]["count"] >= 2 + assert data["websocket_clients"] == 0 + assert "entry_count" in data["state_store"] + + async def test_agent_without_context_uses_id_as_label(client: AsyncClient) -> None: """An agent with no project_root falls back to its ID for display_label.""" resp = await client.post(