Claude/monitoring dashboard worker tt4 x7#87
Merged
Conversation
Introduces a new ``shepherd_monitor`` service that aggregates real-time
operational state for the rest of the platform and serves it via a
single-page dashboard with websocket updates.
- Worker heartbeats: every consumer that goes through
``shepherd_utils.shared.get_tasks`` now self-registers in Redis under
``worker:heartbeat:{stream}:{consumer}`` with a 15s TTL. The monitor
reads these to count alive/stale workers per type, so worker counts
work under both docker compose and kubernetes autoscaling without
introspecting deployment config.
- Poller: snapshots stream depths (XLEN, XPENDING, XINFO CONSUMERS),
Postgres query/callback state, and Redis health every few seconds.
- Rolling history: per-metric Redis sorted sets, retention configurable
via ``MONITOR_HISTORY_DAYS`` (default 3).
- Alerts: YAML rule config with threshold / heartbeat_lost /
oldest_callback_age types, per-rule cooldowns in Redis, Slack webhook
+ SMTP dispatch (both off by default until creds are set).
- Dashboard: vanilla HTML + Chart.js served at port 5440, websocket
push at ``/ws`` with polling fallback.
The monitor surfaced three problems with how shepherd cleans up after itself. Fix each at the source and add a periodic janitor in the monitor to sweep up existing leakage. 1. Redis streams: ``mark_task_as_complete`` only acked messages, so XLEN grew without bound. Delete each message right after the XACK. 2. Postgres ``callbacks`` table: only the timeout path in lookup workers and the happy path in ``merge_message`` ever removed rows, so any other completion path left them behind. ``finish_query`` now reaps callback rows for every query it terminates. Adds a ``reap_completed_callbacks`` helper for the janitor. 3. Per-ARA dashboard panel: the ``domain`` column was never written. ``add_query`` now accepts ``target`` and stores it; ``run_query`` passes it through. 4. Monitor janitor: new ``workers/monitor/janitor.py`` runs every 5 minutes and (a) trims each stream via XTRIM MINID using either the smallest pending id or last-delivered-id+1 so in-flight messages are never deleted, and (b) reaps any callback rows whose parent query is already COMPLETED. ``POST /api/admin/cleanup`` triggers it on demand.
Three follow-on cleanups for issues the dashboard surfaced: * Stale consumer entries: every worker restart picks a new UUID for CONSUMER, leaving dozens of phantom entries inside each Redis Streams consumer group. The janitor now cross-references heartbeat keys (the authoritative "alive" set) against XINFO CONSUMERS and runs XGROUP DELCONSUMER on entries that have no heartbeat, no pending messages, and have been idle for more than an hour. This also fixes the inflated "max idle" values in the dashboard, which were dominated by these phantoms. * History memory: writing a snapshot every poll tick (default 3s) was the dominant Redis consumer. New ``monitor_history_interval_sec`` setting (default 30s) decouples history persistence from the live UI tick rate. Each series is also now capped by sample count (``HISTORY_MAX_SAMPLES = 10000``) in addition to the existing time cutoff -- so the next write after deploying will shrink any oversized series in place.
Workers that get killed mid-task leave pending messages stuck in the consumer group's pending-entries list, referencing a consumer that will never come back. The earlier ``cleanup_stale_consumers`` pass deliberately leaves these alone (it only deletes consumers with zero pending) because dropping in-flight work is destructive. New ``POST /api/admin/reclaim_dead_consumers`` endpoint enumerates those messages, ACKs and XDELs them, then removes the dead consumer. Defaults to ``min_idle_seconds=3600`` and exposes a ``dry_run`` flag so the caller can preview the impact first. Intentionally not wired into the periodic janitor -- this is for manual triage. Note: in production a real fix is XAUTOCLAIM at worker startup so in-flight tasks get retried instead of dropped on restart. That's a separate, larger change.
Every worker now periodically scans its stream's pending-entries list and uses XCLAIM to take over messages whose owner is no longer alive. This closes the gap where a crashed or restarted worker would strand in-flight tasks forever -- they now get retried by another worker on the same stream. Multi-consumer safety is enforced by two independent checks: 1. **Heartbeat filter**: messages whose owner has a live heartbeat key are never claimed, even if their idle time is high. 2. **Idle floor** (``reclaim_min_idle_sec``, default 300s): the message must have been idle longer than the longest plausible legitimate task processing time. Even a momentary heartbeat refresh failure on an actively-working consumer cannot trigger a claim within this buffer. Reclaim runs once per ``reclaim_interval_sec`` (default 30s) inside the existing ``get_tasks`` poll loop. Reclaimed messages are fed through the normal task pipeline so the existing wrap_up_task / handle_task_failure error paths apply. Concurrent reclaim by multiple consumers is safe because XCLAIM is atomic per message; at most one caller wins each message.
A single global ``reclaim_min_idle_sec`` of 300s was wrong on both ends: too long for fast filter workers (orphans wouldn't be retried until well past the upstream 5-minute query budget), and barely enough for lookup workers that legitimately run up to ~210s. Replace it with a per-stream table in ``reclaim.PER_STREAM_MIN_IDLE_SEC`` keyed by worker stream name. Each entry sits just above that worker's worst-case legitimate processing time: - aragorn.lookup / bte.lookup / pathfinder / omnicorp / score: 240s - merge_message / arax.rank / score_paths: 60s - example.score: 30s - everything else (fast filter, entry, finish workers): 30s default Streams not in the table fall back to ``settings.reclaim_min_idle_sec``, which now defaults to 30s. ``reclaim_interval_sec`` drops to 10s so a crashed fast-worker task can be retried with time to spare. ``get_tasks`` also accepts an explicit ``reclaim_min_idle_sec`` override for workers that prefer to declare their threshold inline.
Three related improvements to how the dashboard handles workers coming
and going:
1. Persistent worker visibility. The dashboard used to derive its
worker list purely from live heartbeats, so a worker that died
silently disappeared from the UI. We now persist every worker type
we've ever seen in ``monitor:known_workers`` plus a
``monitor:worker_state:{stream}`` hash. The snapshot always includes
the union of current heartbeats and historical workers, and each
card carries a ``state`` (alive / scaled_down / crashed / unknown)
that drives its color and the new state pill in the corner.
2. Clean shutdown signalling. Workers now trap SIGTERM and SIGINT in
``Heartbeat`` and synchronously write a
``worker:shutdown:{stream}:{consumer}`` marker (TTL 120s, well over
the heartbeat TTL) before re-raising the signal. The poller uses
that marker at the moment a worker type transitions from alive to
zero: marker present means a clean scale-down, marker absent means a
crash. The decision is locked in at the transition so a marker
expiring later doesn't flip the state back.
3. Crash-only critical alerts. The Slack/email alert path that fires
when a worker hits zero alive now checks ``ev["kind"] == "crashed"``
and stays silent for clean scale-downs. The dashboard's new "Recent
Scaling Events" panel still surfaces both kinds, color-coded.
Cards also show backlog and a utilization bar (backlog / capacity) so
that when autoscaling kicks in you can see whether worker count is
keeping up with load. New admin endpoint
``POST /api/admin/forget_worker?name=X`` clears a retired worker type
from the known set.
A clean scale-down to zero is still a problem -- the user wants at least one instance of every worker type running at all times. Fire a critical alert in both cases; the message text differentiates a crash from a clean shutdown so the operator knows which one happened.
New durable historical view that complements the live dashboard. Live reads stay in Redis (recent, fast); historical reads go to Postgres (30-day retention, survives Redis flushes). Schema (three new tables, added to init_db.sql and self-healed on monitor startup via ``storage.ensure_schema``): * ``monitor_metrics(ts, metric, value)`` -- generic time-series. Poller now dual-writes every tick: Redis (live) + Postgres (history). Also captures Redis used memory, Postgres DB size, and per-worker capacity/backlog, so the History tab can show infra trends. * ``monitor_events(ts, type, worker, severity, detail, payload)`` -- scale_up/scale_down events from the poller and fired alerts from the alert engine are mirrored here. * ``monitor_task_latency(ts, stream, count, mean/p50/p90/p95/p99/min/max _ms)`` -- per-worker task durations aggregated into 30s buckets. Latency capture is automatic: ``shared.py:get_tasks`` stamps each delivered task with ``_started_at``, and ``wrap_up_task`` / ``handle_task_failure`` push the elapsed ms onto a bounded Redis list. A new ``latency.aggregator_loop`` in the monitor drains those lists every 30s, sorts and percentiles in-process, then inserts one row per stream into Postgres. No worker files need to change. New endpoints under ``/api/historical/``: - ``metrics`` (comma-separated names) - ``metrics_by_prefix`` - ``latency`` - ``events`` - ``summary`` All read helpers auto-downsample to ~200 points per series via an ``epoch / N`` floor bucket, so a 30-day query never returns 86k points per metric. History tab UI at ``/history`` is a separate page with tab navigation in the header. Range bar (1h/6h/24h/7d/30d + custom datetime) drives a manual refresh of: - Summary cards (queries / crashes / scale events / alerts) - Throughput line chart - Worker fleet stacked-style chart - Queue backlog per stream - Per-worker latency small-multiples (p50 / p95 / p99) - Utilization heatmap (workers x time) - Infra (Redis memory + Postgres DB size, PG connections + Redis ops) - Events timeline (color-coded dots) - Incidents table (severity >= warning) Retention: ``janitor.sweep_history_retention`` runs daily (rate-limited via a Redis flag) to DELETE rows older than 30 days from all three tables.
Two issues that showed up bringing the stack up cold. * Startup alert spam: when ``docker compose up`` runs, persistent worker state from the previous run says ``alive`` for every worker type, but current heartbeats haven't arrived yet. The poller's first tick classified every worker as transitioning to scaled_down (or crashed if no marker existed), which immediately fired worker-down alerts to Slack. The AlertEngine now captures its boot time and suppresses worker-down dispatches for ``MONITOR_STARTUP_GRACE_SEC`` (default 90s). Real worker losses that happen after grace still alert; the events themselves still flow through the poller and the History event log. * Redis healthcheck silently passing during ``LOADING``. The original ``redis-cli ping`` returned exit 0 in two cases that aren't healthy: unauthenticated (NOAUTH error) and dataset still loading (LOADING error). Dependent services then started up and got connection errors. The healthcheck now authenticates and requires an exact ``PONG`` response; ``start_period: 60s`` gives a large RDB time to load before failure retries start counting against it.
``add_query`` now accepts an optional ``target`` arg and writes it into the ``shepherd_brain.domain`` column so the History tab can report per-ARA query volume. The existing test pinned the exact INSERT params to the old 5-tuple. Update it to expect the trailing ``domain`` value (None by default) and add a second test that exercises the populated target path.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
No description provided.