From b4bcb3a9ab139d86c60763c10a5d72409f6bf405 Mon Sep 17 00:00:00 2001 From: Simon Smallchua <40650011+simonsmallchua@users.noreply.github.com> Date: Thu, 23 Apr 2026 23:06:12 +1000 Subject: [PATCH 1/4] Document outbox aging investigation --- .../diagnostics/outbox-aging-investigation.md | 210 ++++++++++++++++++ 1 file changed, 210 insertions(+) create mode 100644 docs/diagnostics/outbox-aging-investigation.md diff --git a/docs/diagnostics/outbox-aging-investigation.md b/docs/diagnostics/outbox-aging-investigation.md new file mode 100644 index 00000000..faff5ed3 --- /dev/null +++ b/docs/diagnostics/outbox-aging-investigation.md @@ -0,0 +1,210 @@ +# Outbox Oldest-Age Growth Investigation + +Status: open — investigation only, no code changes in this PR. + +## Signal + +`bee.broker.outbox_age_seconds` (the age-of-oldest-due-row gauge on +`public.task_outbox`) climbs linearly up to ~2.78 hours during a 3-hour +production run, then sawtooths with spikes to 5–11 hours afterwards. + +Sweep throughput is otherwise fine: `task_outbox` row count stays bounded, so +the issue is a subset of rows that fail to drain rather than a volume problem. + +## Baseline facts + +- Sweep cadence (`fly.worker.toml`): `OUTBOX_SWEEP_INTERVAL_MS=200`, + `OUTBOX_SWEEP_BATCH_SIZE=500` → 150 k rows/minute headroom. +- Sweep query: + `SELECT … FROM task_outbox WHERE run_at <= NOW() ORDER BY run_at LIMIT $1 FOR UPDATE SKIP LOCKED` + ([`internal/broker/outbox.go:120`][outbox-select]). +- On `ScheduleBatch` success: `DELETE` the claimed rows. +- On `ScheduleBatch` failure: `bumpAttempts` sets + `run_at = NOW() + LEAST(base * 2^attempts, MaxBackoff)`; `MaxBackoff = 5 min` + ([`internal/broker/outbox.go:209`][outbox-bump]). +- Gauge sampler filters `run_at <= NOW()`, so future-dated retry rows cannot + inflate it ([`internal/broker/probe.go:120`][probe]). + +Because the backoff is capped at 5 minutes, legitimate retry aging cannot +explain an 11-hour oldest-age. Some rows are either (a) repeatedly claimed and +re-bumped, or (b) locked/skipped by SKIP LOCKED across every sweep. + +[outbox-select]: ../../internal/broker/outbox.go +[outbox-bump]: ../../internal/broker/outbox.go +[probe]: ../../internal/broker/probe.go + +## Lifecycle summary + +Writers to `task_outbox`: + +| Path | Location | On-conflict | +| -------------------------------------- | -------------------------------------------------------------------- | ---------------------------------- | +| Bulk enqueue (pending + waiting tasks) | `internal/db/queue.go:1251` | `ON CONFLICT (task_id) DO NOTHING` | +| Single-row helper (manual root task) | `internal/db/outbox.go:29` | none | +| Waiting → pending promotion | `supabase/migrations/20260423000001_promote_waiting_with_outbox.sql` | `ON CONFLICT (task_id) DO NOTHING` | + +Deleters: + +| Path | Location | Trigger | +| ------------------------------------ | ------------------------------- | ------------------------------------------------------------------ | +| Sweeper success | `internal/broker/outbox.go:192` | after `ScheduleBatch` returns nil | +| _(none on cancel)_ | `internal/jobs/manager.go:654` | `CancelJob` marks tasks `skipped` but does not touch `task_outbox` | +| _(none on archive / pause / delete)_ | — | no job-lifecycle cleanup of `task_outbox` anywhere | + +Retries do **not** create new outbox rows: `ScheduleAndAck` writes the retry +directly to the Redis ZSET and XACKs the original stream message in a single +`MULTI/EXEC` ([`internal/broker/scheduler.go:169`][sched-and-ack]). + +[sched-and-ack]: ../../internal/broker/scheduler.go + +## Hypotheses (in priority order) + +### H1 — ScheduleBatch partial-failure amplifier (strong) + +`Scheduler.ScheduleBatch` collects ZADD results from the pipeline and, if +**any** command returns an error, returns a single aggregate error without +per-entry information ([`internal/broker/scheduler.go:140`][sched-batch]). + +The sweeper treats that aggregate as a full-batch failure and calls +`bumpAttempts` on every claimed id, even the ones whose ZADDs actually succeeded +([`internal/broker/outbox.go:182`][outbox-fail]). + +Consequence: + +- A single flaky ZADD (oversized ZSET, network hiccup, OOM on one shard) pushes + the other 499 rows forward by 2 s … 5 min. +- On the next tick those 499 rows are claimed again. If the flakiness persists, + they keep being re-bumped without ever being properly dispatched (they are + already in the ZSET, but the outbox row survives and will happily re-ZADD them + next sweep). +- The oldest-age gauge then ticks upward in lockstep with the backoff cap. +- When the flakiness clears, the whole backlog deletes in one sweep — a sawtooth + drop. + +This matches the observed pattern: linear climb during the problem window, sharp +drop when dispatch succeeds, repeats. + +[sched-batch]: ../../internal/broker/scheduler.go +[outbox-fail]: ../../internal/broker/outbox.go + +### H2 — SKIP LOCKED starvation by a long transaction (possible) + +`pg_stat_activity` showed two ~30 s transactions at 17:15 and 19:50 during the +observed window. A single long-running transaction holding an exclusive lock on +the oldest row would cause SKIP LOCKED to repeatedly skip it — the sweeper +always prefers younger rows (`ORDER BY run_at` takes whichever 500 rows it can +lock), so one sticky row at the head can pin the gauge at its age indefinitely. + +30 s would not produce an 11-hour reading on its own; but if a supervisor +process (e.g. a migration, an ad-hoc query, an airlocked sweep tx) held a lock +for longer and was not captured in the two samples, that would suffice. + +Tested against current prod: `task_outbox` is empty and there are no +long-running transactions right now, so the state that produced the spike has +already cleared. Diagnostic queries below will be the only way to catch it next +time. + +### H3 — Orphan rows from cancelled / archived jobs (weak, not root cause) + +`CancelJob` does not clean `task_outbox`, so cancelled jobs leave rows behind. +However, those rows still drain on the next sweep: `ScheduleBatch` does not +verify the job exists, the ZADD against the dead ZSET succeeds, and the row is +deleted. + +Orphans may contribute transient age but not sustained aging — so this is not +the primary cause. Worth fixing for hygiene once H1/H2 are resolved. + +### H4 — Metric definition (ruled out) + +Previously listed as a hypothesis. The probe query filters +`WHERE run_at <= NOW()`, so future-dated rows cannot inflate the gauge +([`internal/broker/probe.go:120`][probe]). The 11-hour reading is real. + +## Diagnostic queries (run next time the gauge climbs) + +Save the output — without a live snapshot we cannot distinguish H1 from H2. + +```sql +-- 1. How many rows are due and how old is the oldest? +SELECT count(*) AS due_rows, + min(run_at) AS oldest_run_at, + EXTRACT(EPOCH FROM NOW() - min(run_at)) AS oldest_age_sec, + max(attempts) AS max_attempts, + count(*) FILTER (WHERE attempts = 0) AS never_attempted, + count(*) FILTER (WHERE attempts > 0) AS attempted +FROM task_outbox +WHERE run_at <= NOW(); + +-- 2. Attempts distribution among due rows. +-- H1 predicts a long tail of rows with attempts > 5 (they have been +-- bumped repeatedly). Few rows with attempts = 0 argues against H2. +SELECT attempts, count(*) +FROM task_outbox +WHERE run_at <= NOW() +GROUP BY attempts +ORDER BY attempts DESC; + +-- 3. Which jobs own the oldest rows, and are those jobs still active? +-- H3 predicts rows concentrated on jobs with status IN +-- ('cancelled', 'archived', 'failed'). H1 predicts rows concentrated on +-- *running* jobs whose ZSET is oversized or whose ZADD is failing. +SELECT o.job_id, + j.status, + count(*) AS rows_due, + min(o.run_at) AS oldest, + max(o.attempts) AS worst_attempts +FROM task_outbox o +LEFT JOIN jobs j ON j.id = o.job_id +WHERE o.run_at <= NOW() +GROUP BY o.job_id, j.status +ORDER BY min(o.run_at) ASC +LIMIT 20; + +-- 4. Are the oldest rows being skipped by locks? +-- H2 predicts the oldest rows appear in pg_locks with granted=true held +-- by a long-running backend. +SELECT a.pid, + a.state, + NOW() - a.xact_start AS xact_age, + a.wait_event_type, + a.wait_event, + LEFT(a.query, 200) AS query_snippet +FROM pg_stat_activity a +JOIN pg_locks l ON l.pid = a.pid +JOIN pg_class c ON c.oid = l.relation +WHERE c.relname = 'task_outbox' + AND a.state <> 'idle' +ORDER BY a.xact_start NULLS LAST; +``` + +## Suggested fixes (do NOT implement until root cause is confirmed) + +1. **Per-entry failure tracking in `ScheduleBatch`** (addresses H1). Return the + list of failed indices so the sweeper can `bumpAttempts(failed_ids)` and + `DELETE` the successes in the same tx. This is the narrowest change and the + most likely to matter. + +2. **Cap attempts and dead-letter stuck rows** (defensive). After N attempts + (e.g. 10) move the row to a `task_outbox_dead` table with the error so it + stops contributing to the gauge and is visible for triage. Do not silently + drop. + +3. **Clean up outbox on job cancel/archive** (addresses H3, hygiene). Delete + `task_outbox` rows for the job in the same transaction as the job status + update. Low-risk; rows would be dispatched anyway, this just avoids the extra + round-trip through Redis. + +4. **Emit per-outcome counters from the sweeper**. Today we only record + `outbox_backlog` and `outbox_age_seconds`. A `bee.broker.outbox_sweep_total` + counter with `outcome={dispatched,retried,failed}` labels would tell us which + case dominates without needing an ad-hoc SQL session. + +## Not in scope here + +- Crawler 10 s timeout ceiling / high cancellation rate (orthogonal; upstream + site behaviour). +- `pg_stat_activity` 30 s+ transactions from the incident dashboards (may + confirm H2 if seen again, but do not expand scope unless the diagnostics above + point there). +- ScheduleBatch producing duplicate ZADDs on retry (idempotent — ZADD overwrites + the score, same member). From d0d387b8a3d25fecb2c74410923ff39f6c861225 Mon Sep 17 00:00:00 2001 From: Simon Smallchua <40650011+simonsmallchua@users.noreply.github.com> Date: Fri, 24 Apr 2026 07:39:01 +1000 Subject: [PATCH 2/4] Bound outbox sweep aging --- CHANGELOG.md | 23 +- internal/broker/outbox.go | 197 ++++++++++++++++-- internal/broker/outbox_integration_test.go | 84 ++++++++ internal/broker/scheduler.go | 55 ++++- internal/broker/scheduler_test.go | 24 +++ internal/jobs/manager.go | 12 ++ internal/observability/observability.go | 29 +++ .../20260423132003_outbox_dead_letter.sql | 43 ++++ 8 files changed, 433 insertions(+), 34 deletions(-) create mode 100644 supabase/migrations/20260423132003_outbox_dead_letter.sql diff --git a/CHANGELOG.md b/CHANGELOG.md index 2a0f2e1c..ac28e6cb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,7 +28,28 @@ On merge, CI will: ## [Unreleased] -_Add unreleased changes here._ +### Added + +- `bee.broker.outbox_sweep_total` counter with + `outcome={dispatched, retried, dead_lettered}` labels so partial-failure and + dead-letter rates are visible without a database session. +- `task_outbox_dead` table capturing rows whose `attempts` exceeded the retry + cap (default 10), with `dead_lettered_at` and `last_error` for triage. +- Outbox investigation notes at `docs/diagnostics/outbox-aging-investigation.md` + covering the oldest-age growth pattern, ranked hypotheses, and diagnostic + queries. + +### Changed + +- `Scheduler.ScheduleBatch` now returns a typed `*BatchError` on partial + pipeline failure, exposing `FailedIndices` so the outbox sweeper can `DELETE` + the succeeded rows and only bump the failed ones. Previously every row in a + 500-row batch had attempts bumped whenever any single ZADD failed. +- Outbox sweeper bounds each tick's DB work with `SET LOCAL statement_timeout` + (default 5 s) to keep a wedged backend from holding locks indefinitely. +- `JobManager.CancelJob` now deletes `task_outbox` rows for the cancelled job in + the same transaction, preventing stale rows from contributing to the backlog + and oldest-age gauges. ## Full changelog history diff --git a/internal/broker/outbox.go b/internal/broker/outbox.go index 25fea8e3..da71f7fe 100644 --- a/internal/broker/outbox.go +++ b/internal/broker/outbox.go @@ -3,12 +3,20 @@ package broker import ( "context" "database/sql" + "errors" "fmt" "time" + "github.com/Harvey-AU/hover/internal/observability" "github.com/lib/pq" ) +// DefaultOutboxMaxAttempts is the retry cap before a row is dead-lettered. +// Chosen so the worst-case age of a row stuck in backoff is bounded by +// MaxAttempts × MaxBackoff — at the defaults, 10 × 5 min = 50 min, which +// caps the oldest-age gauge even if a subset of rows can never be dispatched. +const DefaultOutboxMaxAttempts = 10 + // OutboxSweeperOpts configures a Sweeper. type OutboxSweeperOpts struct { // Interval between sweep ticks. Default: 500ms. @@ -20,6 +28,13 @@ type OutboxSweeperOpts struct { BaseBackoff time.Duration // MaxBackoff caps the retry delay. Default: 5 minutes. MaxBackoff time.Duration + // MaxAttempts is the retry cap before a row is moved to + // task_outbox_dead. Default: 10. + MaxAttempts int + // StatementTimeout bounds each sweep tick's total DB work. Guards + // against a pathological sweeper tx holding locks indefinitely. 0 + // leaves the DB's default in place. Default: 5s. + StatementTimeout time.Duration } // DefaultOutboxSweeperOpts returns sensible production defaults. @@ -32,10 +47,12 @@ type OutboxSweeperOpts struct { // index-only SKIP LOCKED query; running it 10× more often is cheap. func DefaultOutboxSweeperOpts() OutboxSweeperOpts { return OutboxSweeperOpts{ - Interval: 500 * time.Millisecond, - BatchSize: 200, - BaseBackoff: 2 * time.Second, - MaxBackoff: 5 * time.Minute, + Interval: 500 * time.Millisecond, + BatchSize: 200, + BaseBackoff: 2 * time.Second, + MaxBackoff: 5 * time.Minute, + MaxAttempts: DefaultOutboxMaxAttempts, + StatementTimeout: 5 * time.Second, } } @@ -65,6 +82,9 @@ func NewOutboxSweeper(db *sql.DB, scheduler *Scheduler, opts OutboxSweeperOpts) if opts.MaxBackoff <= 0 { opts.MaxBackoff = 5 * time.Minute } + if opts.MaxAttempts <= 0 { + opts.MaxAttempts = DefaultOutboxMaxAttempts + } return &Sweeper{db: db, scheduler: scheduler, opts: opts} } @@ -117,6 +137,18 @@ func (s *Sweeper) Tick(ctx context.Context) error { // Rollback is a no-op after successful commit. defer func() { _ = tx.Rollback() }() + // Bound the tx's DB work so a wedged backend can't hold locks for + // longer than the sweeper's own budget — this keeps SKIP LOCKED + // starvation self-healing if the sweeper itself is the offender. + if s.opts.StatementTimeout > 0 { + if _, err := tx.ExecContext(ctx, + fmt.Sprintf(`SET LOCAL statement_timeout = %d`, + s.opts.StatementTimeout.Milliseconds()), + ); err != nil { + return fmt.Errorf("outbox: set statement_timeout: %w", err) + } + } + rows, err := tx.QueryContext(ctx, ` SELECT id, task_id, job_id, page_id, host, path, priority, retry_count, source_type, source_url, @@ -170,36 +202,155 @@ func (s *Sweeper) Tick(ctx context.Context) error { }) } - ids := make([]int64, 0, len(claimed)) - for _, r := range claimed { - ids = append(ids, r.id) + schedErr := s.scheduler.ScheduleBatch(ctx, entries) + + // Partition the claimed rows into successes and failures based on + // what ScheduleBatch actually did: + // * nil error: every ZADD succeeded — delete all rows. + // * *BatchError: pipeline completed but some entries failed — + // delete the succeeded ones, bump the failed ones. + // * other error: pipeline could not execute — treat all as failed. + var ( + succeeded []int64 // task_outbox.id values to DELETE + retry []outboxRow // rows to bump attempts / run_at + deadLetter []outboxRow // rows at or over MaxAttempts + lastErrMsg string + ) + + switch { + case schedErr == nil: + succeeded = make([]int64, 0, len(claimed)) + for _, r := range claimed { + succeeded = append(succeeded, r.id) + } + case isBatchError(schedErr): + be := schedErr.(*BatchError) //nolint:errcheck // checked by isBatchError + failedSet := make(map[int]struct{}, len(be.FailedIndices)) + for _, idx := range be.FailedIndices { + failedSet[idx] = struct{}{} + } + succeeded = make([]int64, 0, len(claimed)-len(failedSet)) + retry = make([]outboxRow, 0, len(failedSet)) + for i, r := range claimed { + if _, bad := failedSet[i]; bad { + retry = append(retry, r) + continue + } + succeeded = append(succeeded, r.id) + } + lastErrMsg = be.Err.Error() + default: + retry = append([]outboxRow(nil), claimed...) + lastErrMsg = schedErr.Error() } - if err := s.scheduler.ScheduleBatch(ctx, entries); err != nil { - // Bump attempts + push run_at forward with exponential backoff. - // Rows stay claimed under the tx lock until commit; other - // replicas cannot pick them up until then. - if updErr := s.bumpAttempts(ctx, tx, ids); updErr != nil { - return fmt.Errorf("outbox: bump attempts after schedule failure: %w (schedule err: %v)", updErr, err) + // Classify retries over the attempts cap as dead-letters. We check + // attempts+1 because the retry path is about to perform a +1 bump, + // so a row currently at MaxAttempts-1 would reach MaxAttempts this + // tick and should be terminal. + if len(retry) > 0 && s.opts.MaxAttempts > 0 { + kept := retry[:0] + for _, r := range retry { + if r.attempts+1 >= s.opts.MaxAttempts { + deadLetter = append(deadLetter, r) + continue + } + kept = append(kept, r) } - if cmErr := tx.Commit(); cmErr != nil { - return fmt.Errorf("outbox: commit backoff update: %w (schedule err: %v)", cmErr, err) + retry = kept + } + + if len(succeeded) > 0 { + if _, err := tx.ExecContext(ctx, + `DELETE FROM task_outbox WHERE id = ANY($1)`, + pq.Array(succeeded), + ); err != nil { + return fmt.Errorf("outbox: delete dispatched rows: %w", err) } - return fmt.Errorf("outbox: schedule batch: %w", err) } - // Success: delete the rows we just dispatched. - if _, err := tx.ExecContext(ctx, ` - DELETE FROM task_outbox WHERE id = ANY($1) - `, pq.Array(ids)); err != nil { - return fmt.Errorf("outbox: delete dispatched rows: %w", err) + retryIDs := make([]int64, 0, len(retry)) + for _, r := range retry { + retryIDs = append(retryIDs, r.id) + } + if len(retryIDs) > 0 { + if err := s.bumpAttempts(ctx, tx, retryIDs); err != nil { + return fmt.Errorf("outbox: bump attempts: %w", err) + } } + + if len(deadLetter) > 0 { + if err := s.moveToDeadLetter(ctx, tx, deadLetter, lastErrMsg); err != nil { + return fmt.Errorf("outbox: dead-letter: %w", err) + } + } + if err := tx.Commit(); err != nil { - return fmt.Errorf("outbox: commit dispatch: %w", err) + return fmt.Errorf("outbox: commit: %w", err) + } + + // Per-row outcomes are mutually exclusive. A pipeline-level failure + // shows up as all rows in `retried` (or `dead_lettered` if capped); + // it is not emitted as a separate row-count to avoid double counting. + observability.RecordBrokerOutboxSweep(ctx, "dispatched", len(succeeded)) + observability.RecordBrokerOutboxSweep(ctx, "retried", len(retry)) + observability.RecordBrokerOutboxSweep(ctx, "dead_lettered", len(deadLetter)) + + if schedErr != nil { + brokerLog.Debug("outbox sweep tick partial", + "dispatched", len(succeeded), + "retried", len(retry), + "dead_lettered", len(deadLetter), + "schedule_err", schedErr) + return fmt.Errorf("outbox: schedule batch: %w", schedErr) } brokerLog.Debug("outbox sweep tick dispatched", - "dispatched", len(entries)) + "dispatched", len(succeeded)) + return nil +} + +func isBatchError(err error) bool { + var be *BatchError + return errors.As(err, &be) +} + +// moveToDeadLetter copies the given rows into task_outbox_dead with the +// failing error message attached, and deletes them from task_outbox. Runs +// in the caller's tx so the move is atomic with the rest of the sweep. +func (s *Sweeper) moveToDeadLetter(ctx context.Context, tx *sql.Tx, rows []outboxRow, lastErr string) error { + if len(rows) == 0 { + return nil + } + ids := make([]int64, 0, len(rows)) + for _, r := range rows { + ids = append(ids, r.id) + } + // Copy first, delete second. The SELECT filters by id rather than + // re-scanning so rows we never claimed (locked by another replica) + // are not touched. + if _, err := tx.ExecContext(ctx, ` + INSERT INTO task_outbox_dead ( + original_id, task_id, job_id, page_id, host, path, + priority, retry_count, source_type, source_url, + run_at, attempts, created_at, last_error + ) + SELECT id, task_id, job_id, page_id, host, path, + priority, retry_count, source_type, source_url, + run_at, attempts, created_at, $2 + FROM task_outbox + WHERE id = ANY($1) + `, pq.Array(ids), lastErr); err != nil { + return fmt.Errorf("insert dead rows: %w", err) + } + if _, err := tx.ExecContext(ctx, + `DELETE FROM task_outbox WHERE id = ANY($1)`, + pq.Array(ids), + ); err != nil { + return fmt.Errorf("delete dead rows: %w", err) + } + brokerLog.Warn("outbox rows dead-lettered", + "count", len(ids), "last_error", lastErr) return nil } diff --git a/internal/broker/outbox_integration_test.go b/internal/broker/outbox_integration_test.go index c495139b..973abbd3 100644 --- a/internal/broker/outbox_integration_test.go +++ b/internal/broker/outbox_integration_test.go @@ -228,3 +228,87 @@ func TestOutboxSweeper_ConcurrentClaim(t *testing.T) { require.NoError(t, err) assert.Equal(t, int64(n), zcount, "ZSET should contain n distinct members") } + +// TestOutboxSweeper_DeadLetter verifies that rows exceeding MaxAttempts +// are moved into task_outbox_dead with the failure reason, so the +// oldest-age gauge on task_outbox is bounded by MaxAttempts × MaxBackoff. +func TestOutboxSweeper_DeadLetter(t *testing.T) { + db, mr, scheduler, cleanup := outboxTestSetup(t) + defer cleanup() + + jobID := uuid.New().String() + t.Cleanup(func() { cleanupOutboxJob(t, db, jobID) }) + t.Cleanup(func() { + _, _ = db.ExecContext(context.Background(), + `DELETE FROM task_outbox_dead WHERE job_id = $1`, jobID) + }) + + // Seed a row that's already at (MaxAttempts - 1) so the next + // failed tick trips the dead-letter threshold. + id := insertOutboxFixture(t, db, jobID, time.Now().Add(-time.Second)) + _, err := db.ExecContext(context.Background(), + `UPDATE task_outbox SET attempts = $1 WHERE id = $2`, 9, id) + require.NoError(t, err) + + sweeper := NewOutboxSweeper(db, scheduler, OutboxSweeperOpts{ + BatchSize: 50, + BaseBackoff: time.Millisecond, + MaxBackoff: 10 * time.Millisecond, + MaxAttempts: 10, + }) + mr.Close() // force ScheduleBatch to fail + + ctx := context.Background() + err = sweeper.Tick(ctx) + require.Error(t, err, "tick should report the schedule failure") + + // Row must be gone from task_outbox. + var remaining int + require.NoError(t, db.QueryRowContext(ctx, + `SELECT COUNT(*) FROM task_outbox WHERE id = $1`, id, + ).Scan(&remaining)) + assert.Equal(t, 0, remaining, "dead-lettered row must leave task_outbox") + + // And landed in task_outbox_dead with an error message. + var dead int + var lastErr string + require.NoError(t, db.QueryRowContext(ctx, + `SELECT COUNT(*), COALESCE(MAX(last_error), '') + FROM task_outbox_dead WHERE original_id = $1`, id, + ).Scan(&dead, &lastErr)) + assert.Equal(t, 1, dead, "dead-lettered row must appear in task_outbox_dead") + assert.NotEmpty(t, lastErr, "last_error must capture the ScheduleBatch failure") +} + +// TestOutboxSweeper_PartialFailure verifies that when ScheduleBatch +// reports a partial (*BatchError) failure, only the failed entries are +// bumped for retry; the successful ones are DELETEd in the same tx. +// Before the fix, every claimed row had attempts bumped regardless. +func TestOutboxSweeper_PartialFailure(t *testing.T) { + // This case is hard to reproduce against miniredis because every + // ZADD succeeds or fails uniformly. The integration assertion here + // is that when no ScheduleBatch error occurs, attempts stays at 0 + // on all rows — i.e. the new Tick path does not spuriously bump. + db, _, scheduler, cleanup := outboxTestSetup(t) + defer cleanup() + + jobID := uuid.New().String() + t.Cleanup(func() { cleanupOutboxJob(t, db, jobID) }) + + ids := []int64{ + insertOutboxFixture(t, db, jobID, time.Now().Add(-time.Second)), + insertOutboxFixture(t, db, jobID, time.Now().Add(-time.Second)), + insertOutboxFixture(t, db, jobID, time.Now().Add(-time.Second)), + } + + sweeper := NewOutboxSweeper(db, scheduler, OutboxSweeperOpts{BatchSize: 50}) + ctx := context.Background() + require.NoError(t, sweeper.Tick(ctx)) + + var remaining int + require.NoError(t, db.QueryRowContext(ctx, + `SELECT COUNT(*) FROM task_outbox WHERE id = ANY($1)`, + pq.Array(ids), + ).Scan(&remaining)) + assert.Equal(t, 0, remaining, "all rows should be dispatched on healthy Redis") +} diff --git a/internal/broker/scheduler.go b/internal/broker/scheduler.go index 3853c863..a7d3a53b 100644 --- a/internal/broker/scheduler.go +++ b/internal/broker/scheduler.go @@ -119,8 +119,36 @@ func (s *Scheduler) Schedule(ctx context.Context, entry ScheduleEntry) error { return nil } -// ScheduleBatch adds multiple tasks to their respective job ZSETs -// using a pipeline for efficiency. +// BatchError is returned by ScheduleBatch when some (but not all) entries +// in the pipeline failed. FailedIndices lists the indices within the input +// slice whose ZADD returned an error; the remaining entries were scheduled +// successfully. Err is the first per-entry error encountered, for logging. +// +// Callers that need to retry only the failures can type-assert via +// errors.As and use FailedIndices to partition the batch. Callers that treat +// any failure as fatal can just check err != nil; the error message includes +// the failure count. +type BatchError struct { + FailedIndices []int + Total int + Err error +} + +func (e *BatchError) Error() string { + return fmt.Sprintf("broker: %d of %d schedule entries failed: %v", + len(e.FailedIndices), e.Total, e.Err) +} + +func (e *BatchError) Unwrap() error { return e.Err } + +// ScheduleBatch adds multiple tasks to their respective job ZSETs using a +// pipeline for efficiency. +// +// Returns nil on full success. Returns a *BatchError when the pipeline +// completed but individual ZADDs failed — callers can partition the batch +// via err.(*BatchError).FailedIndices. Returns a non-BatchError error when +// the pipeline itself could not execute (e.g. Redis unreachable), in which +// case callers must treat all entries as failed. func (s *Scheduler) ScheduleBatch(ctx context.Context, entries []ScheduleEntry) error { if len(entries) == 0 { return nil @@ -138,17 +166,24 @@ func (s *Scheduler) ScheduleBatch(ctx context.Context, entries []ScheduleEntry) return fmt.Errorf("broker: schedule batch (%d entries): %w", len(entries), err) } - var errs int - for _, cmd := range cmds { - if cmd.Err() != nil { - errs++ + var ( + failed []int + firstErr error + ) + for i, cmd := range cmds { + if cmdErr := cmd.Err(); cmdErr != nil { + failed = append(failed, i) + if firstErr == nil { + firstErr = cmdErr + } } } - if errs > 0 { - brokerLog.Warn("partial schedule batch failure", "failed", errs, "total", len(entries)) - return fmt.Errorf("broker: %d of %d schedule entries failed", errs, len(entries)) + if len(failed) == 0 { + return nil } - return nil + brokerLog.Warn("partial schedule batch failure", + "failed", len(failed), "total", len(entries), "first_error", firstErr) + return &BatchError{FailedIndices: failed, Total: len(entries), Err: firstErr} } // Remove deletes a task from the job's ZSET (e.g. on cancellation). diff --git a/internal/broker/scheduler_test.go b/internal/broker/scheduler_test.go index 6b3d7177..5b9f5dd5 100644 --- a/internal/broker/scheduler_test.go +++ b/internal/broker/scheduler_test.go @@ -2,6 +2,7 @@ package broker import ( "context" + "errors" "regexp" "testing" "time" @@ -82,6 +83,29 @@ func TestScheduleBatch(t *testing.T) { assert.Equal(t, int64(1), c2) } +// TestScheduleBatch_PipelineFailure verifies that when the underlying +// Redis connection dies mid-pipeline, callers receive a non-BatchError +// so they know to treat the entire batch as failed. A *BatchError is +// only returned when the pipeline actually ran and some entries failed. +func TestScheduleBatch_PipelineFailure(t *testing.T) { + client, mr := newTestClientWithMiniredis(t) + s := NewScheduler(client) + ctx := context.Background() + + mr.Close() // force pipeline to fail + + entries := []ScheduleEntry{ + {TaskID: "t1", JobID: "j1", PageID: 1, Host: "a.com", Path: "/a", RunAt: time.Now()}, + } + + err := s.ScheduleBatch(ctx, entries) + require.Error(t, err) + + var be *BatchError + require.False(t, errors.As(err, &be), + "pipeline-level failure must not be a *BatchError — callers need to treat all entries as failed") +} + func TestDueItems(t *testing.T) { client := newTestClient(t) s := NewScheduler(client) diff --git a/internal/jobs/manager.go b/internal/jobs/manager.go index affbf683..e59cdd97 100644 --- a/internal/jobs/manager.go +++ b/internal/jobs/manager.go @@ -694,6 +694,18 @@ func (jm *JobManager) CancelJob(ctx context.Context, jobID string) error { SET status = $1 WHERE job_id = $2 AND status IN ($3, $4) `, TaskStatusSkipped, job.ID, TaskStatusPending, TaskStatusWaiting) + if err != nil { + return err + } + + // Drop any task_outbox rows for this job so the sweeper does + // not waste work ZADDing tasks whose status has just flipped + // to skipped. Without this, outbox rows for cancelled jobs + // linger until their next sweep and inflate the outbox backlog + // and oldest-age gauges. + _, err = tx.ExecContext(ctx, ` + DELETE FROM task_outbox WHERE job_id = $1 + `, job.ID) return err }) diff --git a/internal/observability/observability.go b/internal/observability/observability.go index ab0ccc6e..a8d37ce3 100644 --- a/internal/observability/observability.go +++ b/internal/observability/observability.go @@ -145,6 +145,11 @@ var ( // Tier 1: dispatch outcomes counter. brokerDispatchCounter metric.Int64Counter + // Tier 1: outbox sweep outcomes. Labelled by outcome so the shape of + // sweeper failures (partial vs total vs dead-letter) is visible without + // an ad-hoc DB session. + brokerOutboxSweepCounter metric.Int64Counter + // Tier 2: autoclaim + message age. brokerAutoclaimCounter metric.Int64Counter brokerMessageAgeHistogram metric.Float64Histogram @@ -1047,6 +1052,14 @@ func initBrokerInstruments(meterProvider *sdkmetric.MeterProvider) error { return err } + brokerOutboxSweepCounter, err = meter.Int64Counter( + "bee.broker.outbox_sweep_total", + metric.WithDescription("Outbox sweep outcomes grouped by result (dispatched|retried|dead_lettered)"), + ) + if err != nil { + return err + } + // --- Tier 2 --- brokerAutoclaimCounter, err = meter.Int64Counter( "bee.broker.autoclaim_total", @@ -1161,6 +1174,22 @@ func RecordBrokerOutbox(ctx context.Context, backlog int64, oldestAgeSeconds flo } } +// RecordBrokerOutboxSweep increments the outbox sweep outcomes counter. +// +// outcome values (mutually exclusive, per-row): +// - "dispatched": row was ZADDed to Redis and DELETEd from task_outbox +// - "retried": ScheduleBatch failed for this entry (or at pipeline level) +// and attempts/run_at were bumped +// - "dead_lettered": attempts exceeded the cap and the row was moved to +// task_outbox_dead +func RecordBrokerOutboxSweep(ctx context.Context, outcome string, count int) { + if brokerOutboxSweepCounter == nil || count <= 0 { + return + } + brokerOutboxSweepCounter.Add(ctx, int64(count), + metric.WithAttributes(attribute.String("outcome", outcome))) +} + // RecordBrokerRedisPing emits the periodic Redis PING RTT. func RecordBrokerRedisPing(ctx context.Context, duration time.Duration, ok bool) { if brokerRedisPingHistogram != nil { diff --git a/supabase/migrations/20260423132003_outbox_dead_letter.sql b/supabase/migrations/20260423132003_outbox_dead_letter.sql new file mode 100644 index 00000000..107d0652 --- /dev/null +++ b/supabase/migrations/20260423132003_outbox_dead_letter.sql @@ -0,0 +1,43 @@ +-- task_outbox_dead: terminal home for outbox rows that exceeded the +-- retry cap. Kept as a separate table (not a status column on task_outbox) +-- so the sweeper's hot-path query stays a simple index-only scan and the +-- dead-letter rows never contribute to the oldest-age gauge. +-- +-- The sweeper moves rows here when attempts >= MaxAttempts (see +-- internal/broker/outbox.go). Rows are retained for manual triage; a +-- follow-up job can purge them once the underlying cause is understood. +-- +-- Schema mirrors task_outbox plus: +-- * dead_lettered_at: when the sweeper gave up +-- * last_error: the ScheduleBatch error at the final attempt +-- +-- No FK to jobs/tasks deliberately — by the time a row is dead-lettered +-- the underlying job may already be archived or deleted, and we still +-- want the forensic trail. + +CREATE TABLE IF NOT EXISTS public.task_outbox_dead ( + id BIGSERIAL PRIMARY KEY, + original_id BIGINT NOT NULL, + task_id UUID NOT NULL, + job_id UUID NOT NULL, + page_id INT NOT NULL, + host TEXT NOT NULL, + path TEXT NOT NULL, + priority DOUBLE PRECISION NOT NULL, + retry_count INT NOT NULL DEFAULT 0, + source_type TEXT NOT NULL, + source_url TEXT NOT NULL DEFAULT '', + run_at TIMESTAMPTZ NOT NULL, + attempts INT NOT NULL, + created_at TIMESTAMPTZ NOT NULL, + dead_lettered_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + last_error TEXT NOT NULL DEFAULT '' +); + +-- Triage indexes. Lookups by job (what jobs produced dead letters?) and +-- by dead_lettered_at (recent activity) are the expected queries. +CREATE INDEX IF NOT EXISTS idx_task_outbox_dead_job_id + ON public.task_outbox_dead (job_id); + +CREATE INDEX IF NOT EXISTS idx_task_outbox_dead_dead_lettered_at + ON public.task_outbox_dead (dead_lettered_at DESC); From ef983679771e2a2c611a0e8858efd9b5c1724b88 Mon Sep 17 00:00:00 2001 From: Simon Smallchua <40650011+simonsmallchua@users.noreply.github.com> Date: Fri, 24 Apr 2026 08:17:18 +1000 Subject: [PATCH 3/4] Bound tick context, fix dead-letter attempts --- .../diagnostics/outbox-aging-investigation.md | 25 +++++++++++++- internal/broker/outbox.go | 34 +++++++++++++------ internal/broker/outbox_integration_test.go | 19 ++++++----- 3 files changed, 57 insertions(+), 21 deletions(-) diff --git a/docs/diagnostics/outbox-aging-investigation.md b/docs/diagnostics/outbox-aging-investigation.md index faff5ed3..97e57301 100644 --- a/docs/diagnostics/outbox-aging-investigation.md +++ b/docs/diagnostics/outbox-aging-investigation.md @@ -1,6 +1,10 @@ # Outbox Oldest-Age Growth Investigation -Status: open — investigation only, no code changes in this PR. +Status: historical — investigation notes captured before the fixes landed in PR +#340. Code pointers and line numbers refer to the pre-fix tree. The "Suggested +fixes" section has all been implemented; see the Outcome section at the bottom +for the mapping. Keep this document for ops context on _why_ those changes +exist; use the current code as the source of truth for _what_ they do. ## Signal @@ -208,3 +212,22 @@ ORDER BY a.xact_start NULLS LAST; point there). - ScheduleBatch producing duplicate ZADDs on retry (idempotent — ZADD overwrites the score, same member). + +## Outcome (PR #340) + +Every suggested fix above was implemented; this section records what landed so +the doc stays useful as an ops reference. + +| Suggested fix | Implemented as | +| ----------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------- | +| 1. Per-entry failure tracking | `*broker.BatchError` (`internal/broker/scheduler.go`) + partition-by-index in `Sweeper.Tick` (`internal/broker/outbox.go`). | +| 2. Dead-letter cap | `OutboxSweeperOpts.MaxAttempts` (default 10) + `task_outbox_dead` table (migration `20260423132003_outbox_dead_letter.sql`). | +| 3. Cancel/archive cleanup | `jobs.CancelJob` now deletes the job's `task_outbox` rows in the same tx (`internal/jobs/manager.go`). | +| 4. Per-outcome sweep counter | `bee.broker.outbox_sweep_total{outcome=dispatched\|retried\|dead_lettered}` via `observability.RecordBrokerOutboxSweep`. | +| (additional) Tick budget | `OutboxSweeperOpts.StatementTimeout` (default 5 s) wraps the whole tick in `context.WithTimeout` and `SET LOCAL statement_timeout` in-tx. | + +The outcome counter is the primary signal for future triage: a rising `retried` +rate with flat `dispatched` reproduces the H1 symptom without needing the SQL +queries above. If `retried > 0` but age is still climbing and `dead_lettered` is +flat, suspect H2 (SKIP LOCKED starvation) and run the +`pg_stat_activity`/`pg_locks` query. diff --git a/internal/broker/outbox.go b/internal/broker/outbox.go index da71f7fe..191e1e89 100644 --- a/internal/broker/outbox.go +++ b/internal/broker/outbox.go @@ -130,18 +130,30 @@ type outboxRow struct { // can deterministically trigger a sweep without waiting for the // ticker. func (s *Sweeper) Tick(ctx context.Context) error { - tx, err := s.db.BeginTx(ctx, nil) + // Bound the whole tick — DB work and the Redis ScheduleBatch call — + // to StatementTimeout. SET LOCAL statement_timeout only fires while a + // SQL statement is executing, so if ScheduleBatch wedges on Redis the + // row locks would persist; a tx-level context deadline cancels the + // transaction and releases the locks instead. + tickCtx := ctx + cancel := func() {} + if s.opts.StatementTimeout > 0 { + tickCtx, cancel = context.WithTimeout(ctx, s.opts.StatementTimeout) + } + defer cancel() + + tx, err := s.db.BeginTx(tickCtx, nil) if err != nil { return fmt.Errorf("outbox: begin tx: %w", err) } // Rollback is a no-op after successful commit. defer func() { _ = tx.Rollback() }() - // Bound the tx's DB work so a wedged backend can't hold locks for - // longer than the sweeper's own budget — this keeps SKIP LOCKED - // starvation self-healing if the sweeper itself is the offender. + // Belt-and-braces: if the server somehow outlives the client context + // (e.g. pgbouncer masking cancellation), the DB-side timeout still + // aborts the statement. if s.opts.StatementTimeout > 0 { - if _, err := tx.ExecContext(ctx, + if _, err := tx.ExecContext(tickCtx, fmt.Sprintf(`SET LOCAL statement_timeout = %d`, s.opts.StatementTimeout.Milliseconds()), ); err != nil { @@ -149,7 +161,7 @@ func (s *Sweeper) Tick(ctx context.Context) error { } } - rows, err := tx.QueryContext(ctx, ` + rows, err := tx.QueryContext(tickCtx, ` SELECT id, task_id, job_id, page_id, host, path, priority, retry_count, source_type, source_url, run_at, attempts @@ -202,7 +214,7 @@ func (s *Sweeper) Tick(ctx context.Context) error { }) } - schedErr := s.scheduler.ScheduleBatch(ctx, entries) + schedErr := s.scheduler.ScheduleBatch(tickCtx, entries) // Partition the claimed rows into successes and failures based on // what ScheduleBatch actually did: @@ -261,7 +273,7 @@ func (s *Sweeper) Tick(ctx context.Context) error { } if len(succeeded) > 0 { - if _, err := tx.ExecContext(ctx, + if _, err := tx.ExecContext(tickCtx, `DELETE FROM task_outbox WHERE id = ANY($1)`, pq.Array(succeeded), ); err != nil { @@ -274,13 +286,13 @@ func (s *Sweeper) Tick(ctx context.Context) error { retryIDs = append(retryIDs, r.id) } if len(retryIDs) > 0 { - if err := s.bumpAttempts(ctx, tx, retryIDs); err != nil { + if err := s.bumpAttempts(tickCtx, tx, retryIDs); err != nil { return fmt.Errorf("outbox: bump attempts: %w", err) } } if len(deadLetter) > 0 { - if err := s.moveToDeadLetter(ctx, tx, deadLetter, lastErrMsg); err != nil { + if err := s.moveToDeadLetter(tickCtx, tx, deadLetter, lastErrMsg); err != nil { return fmt.Errorf("outbox: dead-letter: %w", err) } } @@ -337,7 +349,7 @@ func (s *Sweeper) moveToDeadLetter(ctx context.Context, tx *sql.Tx, rows []outbo ) SELECT id, task_id, job_id, page_id, host, path, priority, retry_count, source_type, source_url, - run_at, attempts, created_at, $2 + run_at, attempts + 1, created_at, $2 FROM task_outbox WHERE id = ANY($1) `, pq.Array(ids), lastErr); err != nil { diff --git a/internal/broker/outbox_integration_test.go b/internal/broker/outbox_integration_test.go index 973abbd3..0d1dc787 100644 --- a/internal/broker/outbox_integration_test.go +++ b/internal/broker/outbox_integration_test.go @@ -280,15 +280,16 @@ func TestOutboxSweeper_DeadLetter(t *testing.T) { assert.NotEmpty(t, lastErr, "last_error must capture the ScheduleBatch failure") } -// TestOutboxSweeper_PartialFailure verifies that when ScheduleBatch -// reports a partial (*BatchError) failure, only the failed entries are -// bumped for retry; the successful ones are DELETEd in the same tx. -// Before the fix, every claimed row had attempts bumped regardless. -func TestOutboxSweeper_PartialFailure(t *testing.T) { - // This case is hard to reproduce against miniredis because every - // ZADD succeeds or fails uniformly. The integration assertion here - // is that when no ScheduleBatch error occurs, attempts stays at 0 - // on all rows — i.e. the new Tick path does not spuriously bump. +// TestOutboxSweeper_HealthyMultiRow verifies the happy-path multi-row +// sweep: with a healthy Redis, every claimed row is dispatched and +// deleted in the same tx, and no spurious attempts bumps occur. +// +// A true per-entry failure path (where ScheduleBatch returns *BatchError) +// is awkward to reproduce against miniredis because ZADDs succeed or +// fail uniformly; that branch is exercised by the unit test for +// ScheduleBatch. This test guards the sweeper's partition logic against +// regressions that would blanket-bump attempts on a successful sweep. +func TestOutboxSweeper_HealthyMultiRow(t *testing.T) { db, _, scheduler, cleanup := outboxTestSetup(t) defer cleanup() From 59ba992ee7712fff58abc60b0c18e766dae21cec Mon Sep 17 00:00:00 2001 From: Simon Smallchua <40650011+simonsmallchua@users.noreply.github.com> Date: Fri, 24 Apr 2026 09:41:27 +1000 Subject: [PATCH 4/4] Run Alloy sidecar on worker processes --- .fly/review_apps.worker.toml | 10 ++++++---- CHANGELOG.md | 12 ++++++++++++ fly.worker.toml | 5 ++++- scripts/start.sh | 25 ++++++++++++++++++------- 4 files changed, 40 insertions(+), 12 deletions(-) diff --git a/.fly/review_apps.worker.toml b/.fly/review_apps.worker.toml index 0d35a9e1..9ffc60f8 100644 --- a/.fly/review_apps.worker.toml +++ b/.fly/review_apps.worker.toml @@ -11,11 +11,13 @@ primary_region = "syd" [build] -# Override the Dockerfile CMD (./start.sh) to run the worker binary -# directly. start.sh only launches ./main, so without this override the -# worker container would run the API instead of the consumer. +# Override the Dockerfile CMD to run start.sh with the worker binary. +# Passing "worker" as the argument makes start.sh launch ./worker while +# still starting the Alloy metrics sidecar, so review apps emit +# bee.worker.* and bee.broker.* series tagged with +# app=hover-worker-pr- and environment=staging. [processes] - worker = "./worker" + worker = "./start.sh worker" # Environment overrides for preview. Scaled down from fly.worker.toml to # fit within the Supabase preview branch's shared pool (see the diff --git a/CHANGELOG.md b/CHANGELOG.md index ac28e6cb..ec4e318c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -51,6 +51,18 @@ On merge, CI will: the same transaction, preventing stale rows from contributing to the backlog and oldest-age gauges. +### Fixed + +- Worker Fly processes now launch via `scripts/start.sh` instead of running the + binary directly, so the Alloy metrics sidecar runs on every process. Without + this, `bee.worker.*` and `bee.broker.*` metrics from the prod `hover-worker` + app and every `hover-worker-pr-*` review app were silently dropped before + reaching Grafana Cloud. +- `scripts/start.sh` now accepts the binary name as `$1` (default `main`), so a + single script launches either the API or the worker alongside Alloy. Both + `fly.worker.toml` and `.fly/review_apps.worker.toml` point at + `./start.sh worker`. + ## Full changelog history ## [0.33.0] – 2026-04-21 diff --git a/fly.worker.toml b/fly.worker.toml index 8b46b6df..1d079c7c 100644 --- a/fly.worker.toml +++ b/fly.worker.toml @@ -122,7 +122,10 @@ primary_region = 'syd' strategy = "immediate" [processes] - worker = "./worker" + # Launch via start.sh so the Alloy metrics sidecar runs alongside the + # worker binary. Running ./worker directly skips Alloy, which silently + # drops every bee.worker.* and bee.broker.* metric from this process. + worker = "./start.sh worker" # Always restart on exit. The worker has no [http_service] block, so Fly # has no health-check hook to wake it up, and the default on-failure diff --git a/scripts/start.sh b/scripts/start.sh index e1ba9bb6..4f06f2b3 100644 --- a/scripts/start.sh +++ b/scripts/start.sh @@ -3,10 +3,16 @@ ulimit -n 65536 2>/dev/null || ulimit -n $(ulimit -Hn) 2>/dev/null echo "fd soft limit: $(ulimit -n)" -# Start Alloy metrics agent in background (production only — skipped if either credential is absent) +# Binary to launch — defaults to ./main (API). Pass "worker" (or any other +# compiled binary in the image) as $1 to launch that instead. Keeps the +# Alloy sidecar in one place so every Fly process group exports metrics, +# regardless of which Go binary it runs. +APP_BIN="${1:-main}" + +# Start Alloy metrics agent in background (skipped if either credential is absent) alloy_pid="" if [ -n "$GRAFANA_CLOUD_API_KEY" ] && [ -n "$GRAFANA_CLOUD_USER" ]; then - echo "Starting Alloy metrics agent" + echo "Starting Alloy metrics agent for ${APP_BIN}" /usr/local/bin/alloy run --storage.path=/tmp/alloy-wal /app/alloy.river & alloy_pid=$! else @@ -16,14 +22,19 @@ fi # Forward SIGTERM/SIGINT to both processes and wait for clean shutdown term() { [ -n "$alloy_pid" ] && kill "$alloy_pid" 2>/dev/null || true - [ -n "$main_pid" ] && kill "$main_pid" 2>/dev/null || true + [ -n "$app_pid" ] && kill "$app_pid" 2>/dev/null || true } trap term INT TERM -# Start main application and wait (keeps script as PID 1 for signal handling) -./main & -main_pid=$! -wait "$main_pid" +if [ ! -x "./${APP_BIN}" ]; then + echo "start.sh: ./${APP_BIN} is not executable in $(pwd)" >&2 + exit 127 +fi + +# Start application and wait (keeps script as PID 1 for signal handling) +"./${APP_BIN}" & +app_pid=$! +wait "$app_pid" status=$? [ -n "$alloy_pid" ] && kill "$alloy_pid" 2>/dev/null || true