Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 40 additions & 3 deletions src/deckhand/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from deckhand.agents.mock import MockAgent
from deckhand.config.settings import Settings
from deckhand.logging_config import configure_logging
from deckhand.metrics import Metrics
from deckhand.orchestrator.actions import ActionRegistry
from deckhand.orchestrator.events import build_error_event, build_event
from deckhand.orchestrator.manager import Orchestrator
Expand All @@ -46,6 +47,7 @@
plugin_registry: PluginRegistry | None = None
settings: Settings | None = None
rate_limiter: RateLimiter | None = None
metrics: Metrics | None = None
_service_start_time: float | None = None

SERVICE_VERSION = "0.3.0"
Expand All @@ -61,9 +63,11 @@ async def lifespan(app: FastAPI):
plugin_registry, \
settings, \
rate_limiter, \
metrics, \
_service_start_time

_service_start_time = time.time()
metrics = Metrics(started_at=_service_start_time)

# Startup — load settings first so we can configure logging from them
settings = Settings()
Expand Down Expand Up @@ -93,7 +97,10 @@ async def lifespan(app: FastAPI):
rate_limiter = RateLimiter(settings.rate_limit_rpm)

# Initialize orchestrator
orchestrator = Orchestrator(state_persist_path=settings.state_file_path)
orchestrator = Orchestrator(
state_persist_path=settings.state_file_path,
metrics=metrics,
)
orchestrator.register_agent(
MockAgent(agent_id="mock-1", project_root="/home/dev/project-alpha")
)
Expand All @@ -102,8 +109,8 @@ async def lifespan(app: FastAPI):
)

# Initialize registries
action_registry = ActionRegistry(orchestrator)
signal_registry = SignalRegistry()
action_registry = ActionRegistry(orchestrator, metrics=metrics)
signal_registry = SignalRegistry(metrics=metrics)
plugin_registry = PluginRegistry(
actions=action_registry,
signals=signal_registry,
Expand Down Expand Up @@ -298,6 +305,36 @@ async def health() -> dict[str, object]:
}


# ---------------------------------------------------------------------------
# Metrics (unauthenticated)
# ---------------------------------------------------------------------------


@app.get("/metrics")
async def metrics_endpoint() -> dict[str, object]:
"""Operational metrics snapshot. Unauthenticated for monitoring."""
if orchestrator is None or metrics is None:
raise HTTPException(status_code=503, detail="Service not initialized")

snapshot = metrics.snapshot()

agents = list(orchestrator.list_agents())
status_counts: dict[str, int] = {}
for agent in agents:
status = agent.status.value
status_counts[status] = status_counts.get(status, 0) + 1

snapshot["websocket_clients"] = orchestrator.event_bus.client_count
snapshot["agents"] = {
"count": len(agents),
"by_status": status_counts,
}
snapshot["state_store"] = {
"entry_count": orchestrator.state_store.entry_count(),
}
return snapshot


# ---------------------------------------------------------------------------
# Agent routes (read)
# ---------------------------------------------------------------------------
Expand Down
55 changes: 55 additions & 0 deletions src/deckhand/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from __future__ import annotations

import time
from dataclasses import dataclass, field


@dataclass
class Metrics:
"""Lightweight in-memory operational counters.

Safe to use without external dependencies. Thread safety is not a concern
here: the service runs on a single asyncio event loop and counter
increments are atomic at the bytecode level for simple ints.
"""

started_at: float = field(default_factory=time.time)
events_total: int = 0
actions_total: int = 0
actions_success: int = 0
actions_failure: int = 0
signals_total: int = 0
signals_by_name: dict[str, int] = field(default_factory=dict)

def record_event(self) -> None:
self.events_total += 1

def record_action(self, *, success: bool) -> None:
self.actions_total += 1
if success:
self.actions_success += 1
else:
self.actions_failure += 1

def record_signal(self, name: str) -> None:
self.signals_total += 1
self.signals_by_name[name] = self.signals_by_name.get(name, 0) + 1

def snapshot(self) -> dict[str, object]:
uptime = max(time.time() - self.started_at, 1e-9)
return {
"uptime_seconds": uptime,
"events": {
"total": self.events_total,
"per_second": self.events_total / uptime,
},
"actions": {
"total": self.actions_total,
"success": self.actions_success,
"failure": self.actions_failure,
},
"signals": {
"total": self.signals_total,
"by_name": dict(self.signals_by_name),
},
}
17 changes: 15 additions & 2 deletions src/deckhand/orchestrator/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from typing import Any, Awaitable, Callable, Protocol

from deckhand.metrics import Metrics
from deckhand.orchestrator.metadata import ActionMetadata


Expand All @@ -19,8 +20,13 @@ async def provide_input(self, agent_id: str, text: str) -> None: ...
class ActionRegistry:
"""Maps named actions to orchestrator commands."""

def __init__(self, orchestrator: OrchestratorActions) -> None:
def __init__(
self,
orchestrator: OrchestratorActions,
metrics: Metrics | None = None,
) -> None:
self._orchestrator = orchestrator
self._metrics = metrics
self._actions: dict[str, ActionHandler] = {}
self._metadata: dict[str, ActionMetadata] = {}
self._register_defaults()
Expand All @@ -44,7 +50,14 @@ async def run(self, name: str, payload: dict[str, object]) -> None:
handler = self._actions.get(name)
if handler is None:
raise KeyError(name)
await handler(payload)
try:
await handler(payload)
except Exception:
if self._metrics is not None:
self._metrics.record_action(success=False)
raise
if self._metrics is not None:
self._metrics.record_action(success=True)

def list_actions(self) -> list[ActionMetadata]:
"""List all registered actions with metadata."""
Expand Down
7 changes: 6 additions & 1 deletion src/deckhand/orchestrator/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from fastapi import WebSocket

from deckhand.metrics import Metrics
from deckhand.orchestrator.schemas import EventEnvelope, EventSource


Expand Down Expand Up @@ -85,8 +86,9 @@ def build_error_event(
class EventBus:
"""In-memory pub/sub for Deckhand events."""

def __init__(self) -> None:
def __init__(self, metrics: Metrics | None = None) -> None:
self._subscribers: set[WebSocket] = set()
self._metrics = metrics

@property
def client_count(self) -> int:
Expand Down Expand Up @@ -124,6 +126,9 @@ async def emit(self, event: dict[str, Any]) -> None:
):
raise ValueError("Event source must have 'kind' and 'id' fields")

if self._metrics is not None:
self._metrics.record_event()

dead: list[WebSocket] = []
for websocket in self._subscribers:
try:
Expand Down
10 changes: 8 additions & 2 deletions src/deckhand/orchestrator/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,22 @@
from typing import Iterable

from deckhand.agents.base import AgentBase
from deckhand.metrics import Metrics
from deckhand.orchestrator.events import EventBus
from deckhand.orchestrator.state import StateStore


class Orchestrator:
"""Tracks agent lifecycle and routes commands to agents."""

def __init__(self, state_persist_path: str | None = None) -> None:
def __init__(
self,
state_persist_path: str | None = None,
metrics: Metrics | None = None,
) -> None:
self.agents: dict[str, AgentBase] = {}
self.event_bus = EventBus()
self.metrics = metrics
self.event_bus = EventBus(metrics=metrics)
self.state_store = StateStore(self.event_bus, persist_path=state_persist_path)

def register_agent(self, agent: AgentBase) -> None:
Expand Down
6 changes: 5 additions & 1 deletion src/deckhand/orchestrator/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from typing import Any, Awaitable, Callable

from deckhand.metrics import Metrics
from deckhand.orchestrator.metadata import SignalMetadata


Expand All @@ -11,9 +12,10 @@
class SignalRegistry:
"""Maps named signals to handlers."""

def __init__(self) -> None:
def __init__(self, metrics: Metrics | None = None) -> None:
self._signals: dict[str, SignalHandler] = {}
self._metadata: dict[str, SignalMetadata] = {}
self._metrics = metrics

def register(
self,
Expand All @@ -35,6 +37,8 @@ async def handle(self, name: str, payload: dict[str, object]) -> None:
if handler is None:
raise KeyError(name)
await handler(payload)
if self._metrics is not None:
self._metrics.record_signal(name)

def list_signals(self) -> list[SignalMetadata]:
"""List all registered signals with metadata."""
Expand Down
32 changes: 32 additions & 0 deletions tests/test_bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,38 @@ async def test_health_endpoint(client: AsyncClient) -> None:
assert data["state_store"]["writable"] is True


async def test_metrics_endpoint(client: AsyncClient) -> None:
"""GET /metrics returns operational counters and reflects activity."""
# Trigger some activity: one successful action and one failing action.
resp = await client.post("/actions/agent.start", json={"agent_id": "mock-1"})
assert resp.status_code == 200

resp = await client.post(
"/actions/agent.start",
json={}, # missing agent_id -> ValidationError
)
assert resp.status_code in (400, 422)

# Metrics is unauthenticated.
transport = ASGITransport(app=client._transport.app)
async with AsyncClient(transport=transport, base_url="http://test") as no_auth:
resp = await no_auth.get("/metrics")
assert resp.status_code == 200
data = resp.json()

assert data["uptime_seconds"] > 0
assert data["events"]["total"] >= 1
assert data["events"]["per_second"] >= 0
# At least the successful action ran; the failing one was rejected before
# the handler by payload validation, so only success is guaranteed.
assert data["actions"]["total"] >= 1
assert data["actions"]["success"] >= 1
assert "by_status" in data["agents"]
assert data["agents"]["count"] >= 2
assert data["websocket_clients"] == 0
assert "entry_count" in data["state_store"]


async def test_agent_without_context_uses_id_as_label(client: AsyncClient) -> None:
"""An agent with no project_root falls back to its ID for display_label."""
resp = await client.post(
Expand Down