Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions .fly/review_apps.worker.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ primary_region = "syd"

[build]

# Override the Dockerfile CMD (./start.sh) to run the worker binary
# directly. start.sh only launches ./main, so without this override the
# worker container would run the API instead of the consumer.
# Override the Dockerfile CMD to run start.sh with the worker binary.
# Passing "worker" as the argument makes start.sh launch ./worker while
# still starting the Alloy metrics sidecar, so review apps emit
# bee.worker.* and bee.broker.* series tagged with
# app=hover-worker-pr-<N> and environment=staging.
[processes]
worker = "./worker"
worker = "./start.sh worker"

# Environment overrides for preview. Scaled down from fly.worker.toml to
# fit within the Supabase preview branch's shared pool (see the
Expand Down
35 changes: 34 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,40 @@ On merge, CI will:

## [Unreleased]

_Add unreleased changes here._
### Added

- `bee.broker.outbox_sweep_total` counter with
`outcome={dispatched, retried, dead_lettered}` labels so partial-failure and
dead-letter rates are visible without a database session.
- `task_outbox_dead` table capturing rows whose `attempts` exceeded the retry
cap (default 10), with `dead_lettered_at` and `last_error` for triage.
- Outbox investigation notes at `docs/diagnostics/outbox-aging-investigation.md`
covering the oldest-age growth pattern, ranked hypotheses, and diagnostic
queries.

### Changed

- `Scheduler.ScheduleBatch` now returns a typed `*BatchError` on partial
pipeline failure, exposing `FailedIndices` so the outbox sweeper can `DELETE`
the succeeded rows and only bump the failed ones. Previously every row in a
500-row batch had attempts bumped whenever any single ZADD failed.
- Outbox sweeper bounds each tick's DB work with `SET LOCAL statement_timeout`
(default 5 s) to keep a wedged backend from holding locks indefinitely.
- `JobManager.CancelJob` now deletes `task_outbox` rows for the cancelled job in
the same transaction, preventing stale rows from contributing to the backlog
and oldest-age gauges.

### Fixed

- Worker Fly processes now launch via `scripts/start.sh` instead of running the
binary directly, so the Alloy metrics sidecar runs on every process. Without
this, `bee.worker.*` and `bee.broker.*` metrics from the prod `hover-worker`
app and every `hover-worker-pr-*` review app were silently dropped before
reaching Grafana Cloud.
- `scripts/start.sh` now accepts the binary name as `$1` (default `main`), so a
single script launches either the API or the worker alongside Alloy. Both
`fly.worker.toml` and `.fly/review_apps.worker.toml` point at
`./start.sh worker`.

## Full changelog history

Expand Down
233 changes: 233 additions & 0 deletions docs/diagnostics/outbox-aging-investigation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
# Outbox Oldest-Age Growth Investigation

Status: historical — investigation notes captured before the fixes landed in PR
#340. Code pointers and line numbers refer to the pre-fix tree. The "Suggested
Comment on lines +3 to +4
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Keep PR #340`` on the same line.

The hard wrap leaves #340 at the start of Line 4, which trips MD018 and can render oddly in Markdown. Fold that sentence onto one line, or escape the hash.

Suggested change
-Status: historical — investigation notes captured before the fixes landed in PR
-#340. Code pointers and line numbers refer to the pre-fix tree. The "Suggested
+Status: historical — investigation notes captured before the fixes landed in PR `#340`. Code pointers and line numbers refer to the pre-fix tree. The "Suggested
 fixes" section has all been implemented; see the Outcome section at the bottom
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Status: historical — investigation notes captured before the fixes landed in PR
#340. Code pointers and line numbers refer to the pre-fix tree. The "Suggested
Status: historical — investigation notes captured before the fixes landed in PR `#340`. Code pointers and line numbers refer to the pre-fix tree. The "Suggested
fixes" section has all been implemented; see the Outcome section at the bottom
🧰 Tools
🪛 markdownlint-cli2 (0.22.1)

[warning] 4-4: No space after hash on atx style heading

(MD018, no-missing-space-atx)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docs/diagnostics/outbox-aging-investigation.md` around lines 3 - 4, The
markdown breaks the phrase "PR `#340`" across lines causing MD018; in
docs/diagnostics/outbox-aging-investigation.md locate the sentence containing
"Status: historical — investigation notes captured before the fixes landed in
PR" and either join the following line so "PR `#340`." stays on the same line as
that sentence or escape the hash as "\#340" to prevent it being treated as a
header; update the sentence in place (do not change surrounding content) so the
document passes MD018.

fixes" section has all been implemented; see the Outcome section at the bottom
for the mapping. Keep this document for ops context on _why_ those changes
exist; use the current code as the source of truth for _what_ they do.

## Signal

`bee.broker.outbox_age_seconds` (the age-of-oldest-due-row gauge on
`public.task_outbox`) climbs linearly up to ~2.78 hours during a 3-hour
production run, then sawtooths with spikes to 5–11 hours afterwards.

Sweep throughput is otherwise fine: `task_outbox` row count stays bounded, so
the issue is a subset of rows that fail to drain rather than a volume problem.

## Baseline facts

- Sweep cadence (`fly.worker.toml`): `OUTBOX_SWEEP_INTERVAL_MS=200`,
`OUTBOX_SWEEP_BATCH_SIZE=500` → 150 k rows/minute headroom.
- Sweep query:
`SELECT … FROM task_outbox WHERE run_at <= NOW() ORDER BY run_at LIMIT $1 FOR UPDATE SKIP LOCKED`
([`internal/broker/outbox.go:120`][outbox-select]).
- On `ScheduleBatch` success: `DELETE` the claimed rows.
- On `ScheduleBatch` failure: `bumpAttempts` sets
`run_at = NOW() + LEAST(base * 2^attempts, MaxBackoff)`; `MaxBackoff = 5 min`
([`internal/broker/outbox.go:209`][outbox-bump]).
- Gauge sampler filters `run_at <= NOW()`, so future-dated retry rows cannot
inflate it ([`internal/broker/probe.go:120`][probe]).

Because the backoff is capped at 5 minutes, legitimate retry aging cannot
explain an 11-hour oldest-age. Some rows are either (a) repeatedly claimed and
re-bumped, or (b) locked/skipped by SKIP LOCKED across every sweep.

[outbox-select]: ../../internal/broker/outbox.go
[outbox-bump]: ../../internal/broker/outbox.go
[probe]: ../../internal/broker/probe.go

## Lifecycle summary

Writers to `task_outbox`:

| Path | Location | On-conflict |
| -------------------------------------- | -------------------------------------------------------------------- | ---------------------------------- |
| Bulk enqueue (pending + waiting tasks) | `internal/db/queue.go:1251` | `ON CONFLICT (task_id) DO NOTHING` |
| Single-row helper (manual root task) | `internal/db/outbox.go:29` | none |
| Waiting → pending promotion | `supabase/migrations/20260423000001_promote_waiting_with_outbox.sql` | `ON CONFLICT (task_id) DO NOTHING` |

Deleters:

| Path | Location | Trigger |
| ------------------------------------ | ------------------------------- | ------------------------------------------------------------------ |
| Sweeper success | `internal/broker/outbox.go:192` | after `ScheduleBatch` returns nil |
| _(none on cancel)_ | `internal/jobs/manager.go:654` | `CancelJob` marks tasks `skipped` but does not touch `task_outbox` |
| _(none on archive / pause / delete)_ | — | no job-lifecycle cleanup of `task_outbox` anywhere |

Retries do **not** create new outbox rows: `ScheduleAndAck` writes the retry
directly to the Redis ZSET and XACKs the original stream message in a single
`MULTI/EXEC` ([`internal/broker/scheduler.go:169`][sched-and-ack]).

[sched-and-ack]: ../../internal/broker/scheduler.go

## Hypotheses (in priority order)

### H1 — ScheduleBatch partial-failure amplifier (strong)

`Scheduler.ScheduleBatch` collects ZADD results from the pipeline and, if
**any** command returns an error, returns a single aggregate error without
per-entry information ([`internal/broker/scheduler.go:140`][sched-batch]).

The sweeper treats that aggregate as a full-batch failure and calls
`bumpAttempts` on every claimed id, even the ones whose ZADDs actually succeeded
([`internal/broker/outbox.go:182`][outbox-fail]).

Consequence:

- A single flaky ZADD (oversized ZSET, network hiccup, OOM on one shard) pushes
the other 499 rows forward by 2 s … 5 min.
- On the next tick those 499 rows are claimed again. If the flakiness persists,
they keep being re-bumped without ever being properly dispatched (they are
already in the ZSET, but the outbox row survives and will happily re-ZADD them
next sweep).
- The oldest-age gauge then ticks upward in lockstep with the backoff cap.
- When the flakiness clears, the whole backlog deletes in one sweep — a sawtooth
drop.

This matches the observed pattern: linear climb during the problem window, sharp
drop when dispatch succeeds, repeats.

[sched-batch]: ../../internal/broker/scheduler.go
[outbox-fail]: ../../internal/broker/outbox.go

### H2 — SKIP LOCKED starvation by a long transaction (possible)

`pg_stat_activity` showed two ~30 s transactions at 17:15 and 19:50 during the
observed window. A single long-running transaction holding an exclusive lock on
the oldest row would cause SKIP LOCKED to repeatedly skip it — the sweeper
always prefers younger rows (`ORDER BY run_at` takes whichever 500 rows it can
lock), so one sticky row at the head can pin the gauge at its age indefinitely.

30 s would not produce an 11-hour reading on its own; but if a supervisor
process (e.g. a migration, an ad-hoc query, an airlocked sweep tx) held a lock
for longer and was not captured in the two samples, that would suffice.

Tested against current prod: `task_outbox` is empty and there are no
long-running transactions right now, so the state that produced the spike has
already cleared. Diagnostic queries below will be the only way to catch it next
time.

### H3 — Orphan rows from cancelled / archived jobs (weak, not root cause)

`CancelJob` does not clean `task_outbox`, so cancelled jobs leave rows behind.
However, those rows still drain on the next sweep: `ScheduleBatch` does not
verify the job exists, the ZADD against the dead ZSET succeeds, and the row is
deleted.

Orphans may contribute transient age but not sustained aging — so this is not
the primary cause. Worth fixing for hygiene once H1/H2 are resolved.

### H4 — Metric definition (ruled out)

Previously listed as a hypothesis. The probe query filters
`WHERE run_at <= NOW()`, so future-dated rows cannot inflate the gauge
([`internal/broker/probe.go:120`][probe]). The 11-hour reading is real.

## Diagnostic queries (run next time the gauge climbs)

Save the output — without a live snapshot we cannot distinguish H1 from H2.

```sql
-- 1. How many rows are due and how old is the oldest?
SELECT count(*) AS due_rows,
min(run_at) AS oldest_run_at,
EXTRACT(EPOCH FROM NOW() - min(run_at)) AS oldest_age_sec,
max(attempts) AS max_attempts,
count(*) FILTER (WHERE attempts = 0) AS never_attempted,
count(*) FILTER (WHERE attempts > 0) AS attempted
FROM task_outbox
WHERE run_at <= NOW();

-- 2. Attempts distribution among due rows.
-- H1 predicts a long tail of rows with attempts > 5 (they have been
-- bumped repeatedly). Few rows with attempts = 0 argues against H2.
SELECT attempts, count(*)
FROM task_outbox
WHERE run_at <= NOW()
GROUP BY attempts
ORDER BY attempts DESC;

-- 3. Which jobs own the oldest rows, and are those jobs still active?
-- H3 predicts rows concentrated on jobs with status IN
-- ('cancelled', 'archived', 'failed'). H1 predicts rows concentrated on
-- *running* jobs whose ZSET is oversized or whose ZADD is failing.
SELECT o.job_id,
j.status,
count(*) AS rows_due,
min(o.run_at) AS oldest,
max(o.attempts) AS worst_attempts
FROM task_outbox o
LEFT JOIN jobs j ON j.id = o.job_id
WHERE o.run_at <= NOW()
GROUP BY o.job_id, j.status
ORDER BY min(o.run_at) ASC
LIMIT 20;

-- 4. Are the oldest rows being skipped by locks?
-- H2 predicts the oldest rows appear in pg_locks with granted=true held
-- by a long-running backend.
SELECT a.pid,
a.state,
NOW() - a.xact_start AS xact_age,
a.wait_event_type,
a.wait_event,
LEFT(a.query, 200) AS query_snippet
FROM pg_stat_activity a
JOIN pg_locks l ON l.pid = a.pid
JOIN pg_class c ON c.oid = l.relation
WHERE c.relname = 'task_outbox'
AND a.state <> 'idle'
ORDER BY a.xact_start NULLS LAST;
```

## Suggested fixes (do NOT implement until root cause is confirmed)

1. **Per-entry failure tracking in `ScheduleBatch`** (addresses H1). Return the
list of failed indices so the sweeper can `bumpAttempts(failed_ids)` and
`DELETE` the successes in the same tx. This is the narrowest change and the
most likely to matter.

2. **Cap attempts and dead-letter stuck rows** (defensive). After N attempts
(e.g. 10) move the row to a `task_outbox_dead` table with the error so it
stops contributing to the gauge and is visible for triage. Do not silently
drop.

3. **Clean up outbox on job cancel/archive** (addresses H3, hygiene). Delete
`task_outbox` rows for the job in the same transaction as the job status
update. Low-risk; rows would be dispatched anyway, this just avoids the extra
round-trip through Redis.

4. **Emit per-outcome counters from the sweeper**. Today we only record
`outbox_backlog` and `outbox_age_seconds`. A `bee.broker.outbox_sweep_total`
counter with `outcome={dispatched,retried,failed}` labels would tell us which
case dominates without needing an ad-hoc SQL session.

## Not in scope here

- Crawler 10 s timeout ceiling / high cancellation rate (orthogonal; upstream
site behaviour).
- `pg_stat_activity` 30 s+ transactions from the incident dashboards (may
confirm H2 if seen again, but do not expand scope unless the diagnostics above
point there).
- ScheduleBatch producing duplicate ZADDs on retry (idempotent — ZADD overwrites
the score, same member).

## Outcome (PR #340)

Every suggested fix above was implemented; this section records what landed so
the doc stays useful as an ops reference.

| Suggested fix | Implemented as |
| ----------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------- |
| 1. Per-entry failure tracking | `*broker.BatchError` (`internal/broker/scheduler.go`) + partition-by-index in `Sweeper.Tick` (`internal/broker/outbox.go`). |
| 2. Dead-letter cap | `OutboxSweeperOpts.MaxAttempts` (default 10) + `task_outbox_dead` table (migration `20260423132003_outbox_dead_letter.sql`). |
| 3. Cancel/archive cleanup | `jobs.CancelJob` now deletes the job's `task_outbox` rows in the same tx (`internal/jobs/manager.go`). |
Comment on lines +218 to +225
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Don’t imply archive cleanup landed if this PR only wires CancelJob.

Suggested fix 3 is still phrased as cancel/archive, but the Outcome table only maps jobs.CancelJob. With the Line 218 lead-in, ops readers can easily infer archive cleanup exists when the table says otherwise.

Suggested change
-Every suggested fix above was implemented; this section records what landed so
+Most suggested fixes above were implemented; this section records what landed so
 the doc stays useful as an ops reference.
@@
-| 3. Cancel/archive cleanup     | `jobs.CancelJob` now deletes the job's `task_outbox` rows in the same tx (`internal/jobs/manager.go`).                                    |
+| 3. Cancel cleanup             | `jobs.CancelJob` now deletes the job's `task_outbox` rows in the same tx (`internal/jobs/manager.go`).                                    |
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Every suggested fix above was implemented; this section records what landed so
the doc stays useful as an ops reference.
| Suggested fix | Implemented as |
| ----------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------- |
| 1. Per-entry failure tracking | `*broker.BatchError` (`internal/broker/scheduler.go`) + partition-by-index in `Sweeper.Tick` (`internal/broker/outbox.go`). |
| 2. Dead-letter cap | `OutboxSweeperOpts.MaxAttempts` (default 10) + `task_outbox_dead` table (migration `20260423132003_outbox_dead_letter.sql`). |
| 3. Cancel/archive cleanup | `jobs.CancelJob` now deletes the job's `task_outbox` rows in the same tx (`internal/jobs/manager.go`). |
Most suggested fixes above were implemented; this section records what landed so
the doc stays useful as an ops reference.
| Suggested fix | Implemented as |
| ----------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------- |
| 1. Per-entry failure tracking | `*broker.BatchError` (`internal/broker/scheduler.go`) + partition-by-index in `Sweeper.Tick` (`internal/broker/outbox.go`). |
| 2. Dead-letter cap | `OutboxSweeperOpts.MaxAttempts` (default 10) + `task_outbox_dead` table (migration `20260423132003_outbox_dead_letter.sql`). |
| 3. Cancel cleanup | `jobs.CancelJob` now deletes the job's `task_outbox` rows in the same tx (`internal/jobs/manager.go`). |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docs/diagnostics/outbox-aging-investigation.md` around lines 218 - 225, The
doc currently implies both cancel and archive cleanup landed, but only
jobs.CancelJob was implemented; update the Outcome table and surrounding text
(the "3. Cancel/archive cleanup" row and the Line 218 lead-in) to say only
"Cancel cleanup" or similar and reference jobs.CancelJob and its behavior
(deletes task_outbox rows in the same tx) so ops readers aren't misled into
thinking archive cleanup exists when it does not.

| 4. Per-outcome sweep counter | `bee.broker.outbox_sweep_total{outcome=dispatched\|retried\|dead_lettered}` via `observability.RecordBrokerOutboxSweep`. |
| (additional) Tick budget | `OutboxSweeperOpts.StatementTimeout` (default 5 s) wraps the whole tick in `context.WithTimeout` and `SET LOCAL statement_timeout` in-tx. |

The outcome counter is the primary signal for future triage: a rising `retried`
rate with flat `dispatched` reproduces the H1 symptom without needing the SQL
queries above. If `retried > 0` but age is still climbing and `dead_lettered` is
flat, suspect H2 (SKIP LOCKED starvation) and run the
`pg_stat_activity`/`pg_locks` query.
5 changes: 4 additions & 1 deletion fly.worker.toml
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,10 @@ primary_region = 'syd'
strategy = "immediate"

[processes]
worker = "./worker"
# Launch via start.sh so the Alloy metrics sidecar runs alongside the
# worker binary. Running ./worker directly skips Alloy, which silently
# drops every bee.worker.* and bee.broker.* metric from this process.
worker = "./start.sh worker"

# Always restart on exit. The worker has no [http_service] block, so Fly
# has no health-check hook to wake it up, and the default on-failure
Expand Down
Loading
Loading