Skip to content

Bound outbox sweep aging#340

Closed
simonsmallchua wants to merge 4 commits into
mainfrom
investigate-outbox-aging
Closed

Bound outbox sweep aging#340
simonsmallchua wants to merge 4 commits into
mainfrom
investigate-outbox-aging

Conversation

@simonsmallchua
Copy link
Copy Markdown
Contributor

@simonsmallchua simonsmallchua commented Apr 23, 2026

Summary

Investigation + fixes for the bee.broker.outbox_age_seconds gauge climbing to 2.78 h during production runs and sawtoothing to 5–11 h afterwards.

See docs/diagnostics/outbox-aging-investigation.md for the full ranked hypotheses and diagnostic queries.

Fixes (all ~90%+ combined reduction in peak age expected)

  • ScheduleBatch per-entry failures — returns typed *BatchError with FailedIndices; sweeper DELETEs the successes and only bumps attempts on the entries that actually failed. Previously a single flaky ZADD bumped all 500 rows.
  • Dead-letter table task_outbox_dead — rows past MaxAttempts (default 10) move atomically with the failing error message attached. Bounds worst-case age to MaxAttempts × MaxBackoff = ~50 min regardless of which hypothesis is the real driver.
  • CancelJob outbox cleanup — deletes task_outbox rows for the cancelled job in the same tx as the status flip.
  • statement_timeout on sweep tx — 5 s budget so a wedged sweeper backend can't hold locks indefinitely (self-heals SKIP LOCKED starvation if the sweeper itself is the offender).
  • bee.broker.outbox_sweep_total counteroutcome={dispatched, retried, dead_lettered} labels so future incidents are diagnosable without a DB session.

What's NOT changed

  • Sweep cadence and batch size — unchanged (150k/min throughput is already ample).
  • ScheduleBatch public contract for non-sweeper callers — still a plain error; they can check errors.As(err, &*BatchError) if they want partial-failure info.

Test plan

  • go test ./internal/broker/ ./internal/jobs/ ./internal/observability/ — pass.
  • scripts/security-check.sh — clean (govulncheck, gosec, ESLint).
  • Integration tests TestOutboxSweeper_DeadLetter, TestOutboxSweeper_PartialFailure, existing HappyPath + ConcurrentClaim + RedisDown_RetriesSucceed against Supabase preview branch in CI.
  • Migration 20260423132003_outbox_dead_letter.sql applies cleanly on preview branch.
  • Observe bee.broker.outbox_sweep_total{outcome} split in Grafana after deploy to confirm the counters are wired.

Summary by CodeRabbit

  • New Features

    • Added outbox sweep metric tracking for dispatched/retried/dead-lettered outcomes
    • Introduced a dead‑letter table for outbox entries exceeding retry limits
    • Added outbox aging investigation diagnostics documentation
  • Changed

    • Sweeper bounds each tick with a statement timeout and records per‑outcome metrics
    • Partial dispatches now allow deleting succeeded rows and only retrying failed ones
    • Job cancellation now removes related outbox rows within the same transaction
    • Startup script/process changes ensure metrics sidecar runs for worker and app containers

@grafana-sync-hover-simon-smallchua
Copy link
Copy Markdown

Hey there! 👋
Grafana spotted some changes to your dashboard.

See the original and preview of hover-overview.json.


Posted by simonsmallchua.grafana.net · Repository: Repository (repository-27dc5a5)

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Apr 23, 2026

📝 Walkthrough

Walkthrough

Implements outbox dead‑lettering and partial‑batch error signaling, bounds sweeper ticks with statement_timeout, moves expired rows to task_outbox_dead, deletes outbox rows on job cancel within the same transaction, adds per‑tick sweep metrics, tests, a migration, docs, and changelog updates.

Changes

Cohort / File(s) Summary
Core Outbox Sweep & Dead‑lettering
internal/broker/outbox.go, internal/broker/outbox_integration_test.go, supabase/migrations/20260423132003_outbox_dead_letter.sql
Adds MaxAttempts and StatementTimeout options; bounds tick with SET LOCAL statement_timeout; partitions claimed rows by scheduler result to delete succeeded vs bump failed; moves exhausted rows to task_outbox_dead atomically; records per‑outcome metrics; adds integration tests.
Scheduler Batch Error Semantics
internal/broker/scheduler.go, internal/broker/scheduler_test.go
Introduces exported BatchError carrying failed indices and total; ScheduleBatch returns *BatchError for partial per‑entry failures and preserves pipeline (non‑Batch) errors; unit test ensures pipeline failures are surfaced as non‑BatchError.
Job Cancellation Cleanup
internal/jobs/manager.go
CancelJob now deletes task_outbox rows for the cancelled job inside the same DB transaction to avoid stale processing.
Observability
internal/observability/observability.go
Adds bee.broker.outbox_sweep_total counter with outcome label and RecordBrokerOutboxSweep(ctx, outcome, count) to emit dispatched/retried/dead_lettered metrics.
Startup & Deployment Scripts
scripts/start.sh, .fly/review_apps.worker.toml, fly.worker.toml
Makes startup script accept an app binary arg and forwards signals to the selected app; changes Fly process commands to use start.sh so the Alloy metrics sidecar runs for worker review/prod apps.
Documentation & Changelog
docs/diagnostics/outbox-aging-investigation.md, CHANGELOG.md
Adds an outbox aging investigation doc with hypotheses and triage SQL; updates changelog with outbox observability, behavior changes, and migration note.

Sequence Diagram

sequenceDiagram
    participant DB as Database
    participant Sweeper as Outbox Sweeper
    participant Scheduler as Scheduler
    participant Observability as Observability
    participant DeadLetter as task_outbox_dead

    Note over Sweeper,DB: sweeper tick begins (tickCtx + SET LOCAL statement_timeout)
    Sweeper->>DB: SELECT ... FOR UPDATE SKIP LOCKED
    DB-->>Sweeper: Claimed rows
    Sweeper->>Scheduler: ScheduleBatch(entries)

    alt Partial per-entry failures (BatchError)
        Scheduler-->>Sweeper: BatchError(FailedIndices, Total, Err)
        Sweeper->>DB: DELETE rows for succeeded indices
        Sweeper->>DB: UPDATE attempts/run_at for failed indices
    else All succeeded
        Scheduler-->>Sweeper: nil
        Sweeper->>DB: DELETE all claimed rows
    else Pipeline error (non-BatchError)
        Scheduler-->>Sweeper: error
        Sweeper->>DB: UPDATE attempts/run_at for all claimed rows
    end

    alt attempts >= MaxAttempts
        Sweeper->>DeadLetter: INSERT row(s) with last_error, dead_lettered_at
        Sweeper->>DB: DELETE from task_outbox
        Sweeper->>Observability: RecordBrokerOutboxSweep("dead_lettered", count)
    else Retried
        Sweeper->>Observability: RecordBrokerOutboxSweep("retried", count)
    else Dispatched
        Sweeper->>Observability: RecordBrokerOutboxSweep("dispatched", count)
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Possibly related PRs

Poem

🐰 I hopped through SELECTs and SET LOCAL timeouts,
Split succeeded hops from the ones with frowns,
Moved tired attempts to a dead‑letter bed,
Counted each sweep outcome as the metrics said,
Nibble, triage, hop — onward down the runes.

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 78.57% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'Bound outbox sweep aging' directly relates to the PR's main objective of addressing the bee.broker.outbox_age_seconds gauge climbing to ~2.78 hours. It specifically describes the primary fix strategy (bounding the worst-case age through dead-lettering at MaxAttempts).
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch investigate-outbox-aging

Comment @coderabbitai help to get the list of available commands and usage tips.

@supabase
Copy link
Copy Markdown

supabase Bot commented Apr 23, 2026

Updates to Preview Branch (investigate-outbox-aging) ↗︎

Deployments Status Updated
Database Fri, 24 Apr 2026 08:29:15 UTC
Services Fri, 24 Apr 2026 08:29:15 UTC
APIs Fri, 24 Apr 2026 08:29:15 UTC

Tasks are run on every commit but only new migration files are pushed.
Close and reopen this PR if you want to apply changes from existing seed or migration files.

Tasks Status Updated
Configurations Fri, 24 Apr 2026 08:29:17 UTC
Migrations Fri, 24 Apr 2026 08:29:19 UTC
Seeding Fri, 24 Apr 2026 08:29:21 UTC
Edge Functions Fri, 24 Apr 2026 08:29:21 UTC

View logs for this Workflow Run ↗︎.
Learn more about Supabase for Git ↗︎.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Apr 23, 2026

Release Versions

App patch: v0.33.0v0.33.1

Changelog

Added

  • bee.broker.outbox_sweep_total counter with
    outcome={dispatched, retried, dead_lettered} labels so partial-failure and
    dead-letter rates are visible without a database session.
  • task_outbox_dead table capturing rows whose attempts exceeded the retry
    cap (default 10), with dead_lettered_at and last_error for triage.
  • Outbox investigation notes at docs/diagnostics/outbox-aging-investigation.md
    covering the oldest-age growth pattern, ranked hypotheses, and diagnostic
    queries.

Changed

  • Scheduler.ScheduleBatch now returns a typed *BatchError on partial
    pipeline failure, exposing FailedIndices so the outbox sweeper can DELETE
    the succeeded rows and only bump the failed ones. Previously every row in a
    500-row batch had attempts bumped whenever any single ZADD failed.
  • Outbox sweeper bounds each tick's DB work with SET LOCAL statement_timeout
    (default 5 s) to keep a wedged backend from holding locks indefinitely.
  • JobManager.CancelJob now deletes task_outbox rows for the cancelled job in
    the same transaction, preventing stale rows from contributing to the backlog
    and oldest-age gauges.

Fixed

  • Worker Fly processes now launch via scripts/start.sh instead of running the
    binary directly, so the Alloy metrics sidecar runs on every process. Without
    this, bee.worker.* and bee.broker.* metrics from the prod hover-worker
    app and every hover-worker-pr-* review app were silently dropped before
    reaching Grafana Cloud.
  • scripts/start.sh now accepts the binary name as $1 (default main), so a
    single script launches either the API or the worker alongside Alloy. Both
    fly.worker.toml and .fly/review_apps.worker.toml point at
    ./start.sh worker.

@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 23, 2026

Codecov Report

❌ Patch coverage is 4.72973% with 141 lines in your changes missing coverage. Please review.
✅ All tests successful. No failed tests found.

Files with missing lines Patch % Lines
internal/broker/outbox.go 0.00% 114 Missing ⚠️
internal/broker/scheduler.go 38.88% 10 Missing and 1 partial ⚠️
internal/observability/observability.go 0.00% 11 Missing ⚠️
internal/jobs/manager.go 0.00% 5 Missing ⚠️

📢 Thoughts on this report? Let us know!

@github-actions
Copy link
Copy Markdown
Contributor

🐝 Review App Deployed

Homepage: https://hover-pr-340.fly.dev
Dashboard: https://hover-pr-340.fly.dev/dashboard

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
internal/broker/scheduler.go (1)

164-167: ⚠️ Potential issue | 🟠 Major

Don't return on Pipeline.Exec error before inspecting per-command results.

The error return at line 165 prevents per-command error inspection that occurs later (lines 173–186), collapsing partial command failures into a full-batch failure. In go-redis/v9, when some pipelined commands fail server-side, Exec returns a non-nil error and the command slice with per-command Err() values set. The current code ignores this per-command information, treating any Exec error as a complete batch failure instead of distinguishing partial failures where FailedIndices would apply.

Capture the Exec error without returning immediately; inspect per-command results first to properly handle partial-failure cases.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/broker/scheduler.go` around lines 164 - 167, The current early
return on pipe.Exec(ctx) hides per-command results needed to detect partial
failures; change the handling in schedule batch so that you capture the error
from pipe.Exec(ctx) into a variable (e.g., execErr) but do NOT return
immediately, iterate the returned cmds slice and inspect each command's Err() to
build/append FailedIndices and partial errors, and only return a final error
that incorporates execErr plus per-command failures if appropriate; update logic
around cmds, entries, and FailedIndices to distinguish full Exec failures from
server-side per-command errors.
internal/broker/outbox.go (1)

72-88: ⚠️ Potential issue | 🟠 Major

StatementTimeout is documented as default-on, but NewOutboxSweeper never applies that default.

DefaultOutboxSweeperOpts() sets a 5 s statement timeout, but NewOutboxSweeper() only backfills MaxAttempts and leaves StatementTimeout at zero. Any caller that constructs OutboxSweeperOpts{} directly will silently lose the new safeguard.

Suggested fix
 func NewOutboxSweeper(db *sql.DB, scheduler *Scheduler, opts OutboxSweeperOpts) *Sweeper {
 	if opts.Interval <= 0 {
 		opts.Interval = 500 * time.Millisecond
 	}
 	if opts.BatchSize <= 0 {
 		opts.BatchSize = 200
 	}
 	if opts.BaseBackoff <= 0 {
 		opts.BaseBackoff = 2 * time.Second
 	}
 	if opts.MaxBackoff <= 0 {
 		opts.MaxBackoff = 5 * time.Minute
 	}
 	if opts.MaxAttempts <= 0 {
 		opts.MaxAttempts = DefaultOutboxMaxAttempts
 	}
+	if opts.StatementTimeout <= 0 {
+		opts.StatementTimeout = 5 * time.Second
+	}
 	return &Sweeper{db: db, scheduler: scheduler, opts: opts}
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/broker/outbox.go` around lines 72 - 88, NewOutboxSweeper currently
fails to apply the default StatementTimeout from DefaultOutboxSweeperOpts, so
callers that pass an empty OutboxSweeperOpts get StatementTimeout == 0; update
NewOutboxSweeper to detect opts.StatementTimeout <= 0 and set it to the default
(use the value from DefaultOutboxSweeperOpts() or the documented 5s) alongside
the existing backfills for Interval/BatchSize/BaseBackoff/MaxBackoff/MaxAttempts
so the statement timeout safeguard is always applied even when opts is
constructed directly.
🧹 Nitpick comments (2)
supabase/migrations/20260423132003_outbox_dead_letter.sql (1)

20-20: Consider a unique index on original_id for dead-letter integrity and lookup speed.

You already query by original_id in tests/triage paths; a unique index would both accelerate that and guarantee one dead-letter row per source outbox row.

Suggested migration addition
+CREATE UNIQUE INDEX IF NOT EXISTS idx_task_outbox_dead_original_id
+  ON public.task_outbox_dead (original_id);

Also applies to: 39-43

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@supabase/migrations/20260423132003_outbox_dead_letter.sql` at line 20, Add a
unique index on the dead-letter table's original_id to enforce one dead-letter
row per source outbox row and speed lookups: modify the outbox_dead_letter
migration to create a unique index/constraint on original_id (e.g., CREATE
UNIQUE INDEX or ALTER TABLE ... ADD CONSTRAINT UNIQUE on original_id) and
include the same change where original_id is defined/used (lines referenced
around 39-43) so queries and tests that filter by original_id benefit from the
uniqueness and performance guarantee.
internal/broker/outbox_integration_test.go (1)

283-314: TestOutboxSweeper_PartialFailure currently validates only healthy dispatch.

The test name/comment says partial failure, but the assertions only prove successful sweep+delete. Consider renaming it to reflect healthy multi-row dispatch, or add a dedicated case that actually drives *BatchError handling.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/broker/outbox_integration_test.go` around lines 283 - 314, The test
TestOutboxSweeper_PartialFailure currently only asserts successful dispatch (no
attempts bumped) and doesn't exercise the partial-failure code path; either
rename the test to reflect a healthy dispatch scenario (e.g.,
TestOutboxSweeper_HealthyDispatch) or extend it to simulate a partial
ScheduleBatch failure and assert BatchError handling: locate the test function
TestOutboxSweeper_PartialFailure and modify it to (A) inject a scheduler
stub/mock where ScheduleBatch returns a *BatchError indicating some failed
entries and successful ones, then assert that only failed rows have attempts
incremented while successful rows are DELETEd after NewOutboxSweeper(...,
OutboxSweeperOpts{BatchSize: 50}).Tick(ctx), or (B) change the test name and
message to match the current healthy dispatch assertions.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@docs/diagnostics/outbox-aging-investigation.md`:
- Line 3: This doc is ambiguous about whether it describes the pre-fix state or
the post-fix behavior: either explicitly label the whole document (or the noted
sections) as a "pre-fix investigation snapshot" and keep the proposed fixes as
historical notes, OR update the text to reflect the merged behavior (remove/mark
as implemented the proposed fixes) so the doc matches current code; specifically
call out CancelJob and task_outbox behavior (e.g., state that CancelJob now
cleans task_outbox if that is true) and update the descriptions of the
counter/label changes to match the implemented counter names and semantics so
the file is accurate for ops.

In `@internal/broker/outbox.go`:
- Around line 247-255: The dead-lettering logic checks terminality with
"r.attempts+1 >= s.opts.MaxAttempts" but then appends the row unchanged so the
stored attempts value is one less than the terminal count; update the code path
that moves items to dead-letter (the deadLetter append and the
moveToDeadLetter() call sites) to record the bumped attempt count (i.e., use
r.attempts+1 or set attempts = r.attempts+1 before appending) so the persisted
dead-letter rows reflect the same attempt number used for the terminal decision;
apply the same fix to the other occurrence around moveToDeadLetter() referenced
in the review.
- Around line 140-150: The Tick transaction currently sets only PostgreSQL's
statement_timeout which won't cancel the whole transaction while the code waits
on ScheduleBatch; wrap the entire Tick operation in a context.WithTimeout (using
a sensible deadline, e.g. s.opts.StatementTimeout or a new s.opts.TickTimeout)
and use that derived ctx for beginning the transaction, all tx.ExecContext calls
and the call to ScheduleBatch so that a hung ScheduleBatch cancels the
transaction; remember to defer cancel() and propagate the timeout error back so
locks are released promptly.

---

Outside diff comments:
In `@internal/broker/outbox.go`:
- Around line 72-88: NewOutboxSweeper currently fails to apply the default
StatementTimeout from DefaultOutboxSweeperOpts, so callers that pass an empty
OutboxSweeperOpts get StatementTimeout == 0; update NewOutboxSweeper to detect
opts.StatementTimeout <= 0 and set it to the default (use the value from
DefaultOutboxSweeperOpts() or the documented 5s) alongside the existing
backfills for Interval/BatchSize/BaseBackoff/MaxBackoff/MaxAttempts so the
statement timeout safeguard is always applied even when opts is constructed
directly.

In `@internal/broker/scheduler.go`:
- Around line 164-167: The current early return on pipe.Exec(ctx) hides
per-command results needed to detect partial failures; change the handling in
schedule batch so that you capture the error from pipe.Exec(ctx) into a variable
(e.g., execErr) but do NOT return immediately, iterate the returned cmds slice
and inspect each command's Err() to build/append FailedIndices and partial
errors, and only return a final error that incorporates execErr plus per-command
failures if appropriate; update logic around cmds, entries, and FailedIndices to
distinguish full Exec failures from server-side per-command errors.

---

Nitpick comments:
In `@internal/broker/outbox_integration_test.go`:
- Around line 283-314: The test TestOutboxSweeper_PartialFailure currently only
asserts successful dispatch (no attempts bumped) and doesn't exercise the
partial-failure code path; either rename the test to reflect a healthy dispatch
scenario (e.g., TestOutboxSweeper_HealthyDispatch) or extend it to simulate a
partial ScheduleBatch failure and assert BatchError handling: locate the test
function TestOutboxSweeper_PartialFailure and modify it to (A) inject a
scheduler stub/mock where ScheduleBatch returns a *BatchError indicating some
failed entries and successful ones, then assert that only failed rows have
attempts incremented while successful rows are DELETEd after
NewOutboxSweeper(..., OutboxSweeperOpts{BatchSize: 50}).Tick(ctx), or (B) change
the test name and message to match the current healthy dispatch assertions.

In `@supabase/migrations/20260423132003_outbox_dead_letter.sql`:
- Line 20: Add a unique index on the dead-letter table's original_id to enforce
one dead-letter row per source outbox row and speed lookups: modify the
outbox_dead_letter migration to create a unique index/constraint on original_id
(e.g., CREATE UNIQUE INDEX or ALTER TABLE ... ADD CONSTRAINT UNIQUE on
original_id) and include the same change where original_id is defined/used
(lines referenced around 39-43) so queries and tests that filter by original_id
benefit from the uniqueness and performance guarantee.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: ef44366e-3e61-4dbe-b85e-025e9dea7b2f

📥 Commits

Reviewing files that changed from the base of the PR and between 0d2aff6 and d0d387b.

📒 Files selected for processing (9)
  • CHANGELOG.md
  • docs/diagnostics/outbox-aging-investigation.md
  • internal/broker/outbox.go
  • internal/broker/outbox_integration_test.go
  • internal/broker/scheduler.go
  • internal/broker/scheduler_test.go
  • internal/jobs/manager.go
  • internal/observability/observability.go
  • supabase/migrations/20260423132003_outbox_dead_letter.sql

Comment thread docs/diagnostics/outbox-aging-investigation.md Outdated
Comment thread internal/broker/outbox.go Outdated
Comment thread internal/broker/outbox.go
@grafana-sync-hover-simon-smallchua
Copy link
Copy Markdown

Hey there! 👋
Grafana spotted some changes to your dashboard.

See the original and preview of hover-overview.json.


Posted by simonsmallchua.grafana.net · Repository: Repository (repository-27dc5a5)

@github-actions
Copy link
Copy Markdown
Contributor

🐝 Review App Deployed

Homepage: https://hover-pr-340.fly.dev
Dashboard: https://hover-pr-340.fly.dev/dashboard

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
internal/broker/outbox.go (1)

72-88: ⚠️ Potential issue | 🟠 Major

Default StatementTimeout when callers leave it unset.

NewOutboxSweeper(...) now normalises MaxAttempts, but it still leaves StatementTimeout at zero. Any zero-value or partially-filled OutboxSweeperOpts will therefore run unbounded ticks, which undercuts the new lock-release guard.

Suggested change
 func NewOutboxSweeper(db *sql.DB, scheduler *Scheduler, opts OutboxSweeperOpts) *Sweeper {
 	if opts.Interval <= 0 {
 		opts.Interval = 500 * time.Millisecond
 	}
@@
 	if opts.MaxAttempts <= 0 {
 		opts.MaxAttempts = DefaultOutboxMaxAttempts
 	}
+	if opts.StatementTimeout <= 0 {
+		opts.StatementTimeout = DefaultOutboxSweeperOpts().StatementTimeout
+	}
 	return &Sweeper{db: db, scheduler: scheduler, opts: opts}
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/broker/outbox.go` around lines 72 - 88, NewOutboxSweeper currently
normalizes several fields on OutboxSweeperOpts but leaves StatementTimeout at
zero, allowing unbounded DB statements; update NewOutboxSweeper to set a
sensible default (e.g., 30s) when opts.StatementTimeout <= 0 so the Sweeper uses
a bounded statement timeout, referencing NewOutboxSweeper, OutboxSweeperOpts,
StatementTimeout and Sweeper in your change.
🧹 Nitpick comments (1)
internal/broker/outbox_integration_test.go (1)

246-280: Assert the persisted terminal attempt count too.

This test already hits the terminal retry boundary, but it only checks row movement and last_error. Adding an attempts assertion would lock in the off-by-one fix and stop the 9/10 literals drifting away from DefaultOutboxMaxAttempts.

Suggested change
-	_, err := db.ExecContext(context.Background(),
-		`UPDATE task_outbox SET attempts = $1 WHERE id = $2`, 9, id)
+	const maxAttempts = DefaultOutboxMaxAttempts
+	_, err := db.ExecContext(context.Background(),
+		`UPDATE task_outbox SET attempts = $1 WHERE id = $2`, maxAttempts-1, id)
 	require.NoError(t, err)
@@
-		MaxAttempts: 10,
+		MaxAttempts: maxAttempts,
 	})
@@
-	var dead int
-	var lastErr string
+	var dead int
+	var attempts int
+	var lastErr string
 	require.NoError(t, db.QueryRowContext(ctx,
-		`SELECT COUNT(*), COALESCE(MAX(last_error), '')
+		`SELECT COUNT(*), COALESCE(MAX(attempts), 0), COALESCE(MAX(last_error), '')
 		   FROM task_outbox_dead WHERE original_id = $1`, id,
-	).Scan(&dead, &lastErr))
+	).Scan(&dead, &attempts, &lastErr))
 	assert.Equal(t, 1, dead, "dead-lettered row must appear in task_outbox_dead")
+	assert.Equal(t, maxAttempts, attempts, "dead-lettered row must record the terminal attempt")
 	assert.NotEmpty(t, lastErr, "last_error must capture the ScheduleBatch failure")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/broker/outbox_integration_test.go` around lines 246 - 280, The test
seeds a row at attempts = 9 and checks movement to task_outbox_dead and
last_error but doesn't assert the persisted attempts value, risking an
off-by-one regressions against DefaultOutboxMaxAttempts; update the test (around
insertOutboxFixture, NewOutboxSweeper/OutboxSweeperOpts and the SELECT from
task_outbox_dead) to also query and assert that the attempts column in the
dead-letter row equals the expected terminal attempts (e.g.,
DefaultOutboxMaxAttempts or the MaxAttempts value passed into OutboxSweeper) so
the terminal count is locked in by the test.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@docs/diagnostics/outbox-aging-investigation.md`:
- Around line 3-4: The markdown breaks the phrase "PR `#340`" across lines causing
MD018; in docs/diagnostics/outbox-aging-investigation.md locate the sentence
containing "Status: historical — investigation notes captured before the fixes
landed in PR" and either join the following line so "PR `#340`." stays on the same
line as that sentence or escape the hash as "\#340" to prevent it being treated
as a header; update the sentence in place (do not change surrounding content) so
the document passes MD018.
- Around line 218-225: The doc currently implies both cancel and archive cleanup
landed, but only jobs.CancelJob was implemented; update the Outcome table and
surrounding text (the "3. Cancel/archive cleanup" row and the Line 218 lead-in)
to say only "Cancel cleanup" or similar and reference jobs.CancelJob and its
behavior (deletes task_outbox rows in the same tx) so ops readers aren't misled
into thinking archive cleanup exists when it does not.

---

Outside diff comments:
In `@internal/broker/outbox.go`:
- Around line 72-88: NewOutboxSweeper currently normalizes several fields on
OutboxSweeperOpts but leaves StatementTimeout at zero, allowing unbounded DB
statements; update NewOutboxSweeper to set a sensible default (e.g., 30s) when
opts.StatementTimeout <= 0 so the Sweeper uses a bounded statement timeout,
referencing NewOutboxSweeper, OutboxSweeperOpts, StatementTimeout and Sweeper in
your change.

---

Nitpick comments:
In `@internal/broker/outbox_integration_test.go`:
- Around line 246-280: The test seeds a row at attempts = 9 and checks movement
to task_outbox_dead and last_error but doesn't assert the persisted attempts
value, risking an off-by-one regressions against DefaultOutboxMaxAttempts;
update the test (around insertOutboxFixture, NewOutboxSweeper/OutboxSweeperOpts
and the SELECT from task_outbox_dead) to also query and assert that the attempts
column in the dead-letter row equals the expected terminal attempts (e.g.,
DefaultOutboxMaxAttempts or the MaxAttempts value passed into OutboxSweeper) so
the terminal count is locked in by the test.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 8f72022b-003f-4b93-aa38-f9597bd5b368

📥 Commits

Reviewing files that changed from the base of the PR and between d0d387b and ef98367.

📒 Files selected for processing (3)
  • docs/diagnostics/outbox-aging-investigation.md
  • internal/broker/outbox.go
  • internal/broker/outbox_integration_test.go

Comment on lines +3 to +4
Status: historical — investigation notes captured before the fixes landed in PR
#340. Code pointers and line numbers refer to the pre-fix tree. The "Suggested
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Keep PR #340`` on the same line.

The hard wrap leaves #340 at the start of Line 4, which trips MD018 and can render oddly in Markdown. Fold that sentence onto one line, or escape the hash.

Suggested change
-Status: historical — investigation notes captured before the fixes landed in PR
-#340. Code pointers and line numbers refer to the pre-fix tree. The "Suggested
+Status: historical — investigation notes captured before the fixes landed in PR `#340`. Code pointers and line numbers refer to the pre-fix tree. The "Suggested
 fixes" section has all been implemented; see the Outcome section at the bottom
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Status: historical — investigation notes captured before the fixes landed in PR
#340. Code pointers and line numbers refer to the pre-fix tree. The "Suggested
Status: historical — investigation notes captured before the fixes landed in PR `#340`. Code pointers and line numbers refer to the pre-fix tree. The "Suggested
fixes" section has all been implemented; see the Outcome section at the bottom
🧰 Tools
🪛 markdownlint-cli2 (0.22.1)

[warning] 4-4: No space after hash on atx style heading

(MD018, no-missing-space-atx)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docs/diagnostics/outbox-aging-investigation.md` around lines 3 - 4, The
markdown breaks the phrase "PR `#340`" across lines causing MD018; in
docs/diagnostics/outbox-aging-investigation.md locate the sentence containing
"Status: historical — investigation notes captured before the fixes landed in
PR" and either join the following line so "PR `#340`." stays on the same line as
that sentence or escape the hash as "\#340" to prevent it being treated as a
header; update the sentence in place (do not change surrounding content) so the
document passes MD018.

Comment on lines +218 to +225
Every suggested fix above was implemented; this section records what landed so
the doc stays useful as an ops reference.

| Suggested fix | Implemented as |
| ----------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------- |
| 1. Per-entry failure tracking | `*broker.BatchError` (`internal/broker/scheduler.go`) + partition-by-index in `Sweeper.Tick` (`internal/broker/outbox.go`). |
| 2. Dead-letter cap | `OutboxSweeperOpts.MaxAttempts` (default 10) + `task_outbox_dead` table (migration `20260423132003_outbox_dead_letter.sql`). |
| 3. Cancel/archive cleanup | `jobs.CancelJob` now deletes the job's `task_outbox` rows in the same tx (`internal/jobs/manager.go`). |
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Don’t imply archive cleanup landed if this PR only wires CancelJob.

Suggested fix 3 is still phrased as cancel/archive, but the Outcome table only maps jobs.CancelJob. With the Line 218 lead-in, ops readers can easily infer archive cleanup exists when the table says otherwise.

Suggested change
-Every suggested fix above was implemented; this section records what landed so
+Most suggested fixes above were implemented; this section records what landed so
 the doc stays useful as an ops reference.
@@
-| 3. Cancel/archive cleanup     | `jobs.CancelJob` now deletes the job's `task_outbox` rows in the same tx (`internal/jobs/manager.go`).                                    |
+| 3. Cancel cleanup             | `jobs.CancelJob` now deletes the job's `task_outbox` rows in the same tx (`internal/jobs/manager.go`).                                    |
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Every suggested fix above was implemented; this section records what landed so
the doc stays useful as an ops reference.
| Suggested fix | Implemented as |
| ----------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------- |
| 1. Per-entry failure tracking | `*broker.BatchError` (`internal/broker/scheduler.go`) + partition-by-index in `Sweeper.Tick` (`internal/broker/outbox.go`). |
| 2. Dead-letter cap | `OutboxSweeperOpts.MaxAttempts` (default 10) + `task_outbox_dead` table (migration `20260423132003_outbox_dead_letter.sql`). |
| 3. Cancel/archive cleanup | `jobs.CancelJob` now deletes the job's `task_outbox` rows in the same tx (`internal/jobs/manager.go`). |
Most suggested fixes above were implemented; this section records what landed so
the doc stays useful as an ops reference.
| Suggested fix | Implemented as |
| ----------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------- |
| 1. Per-entry failure tracking | `*broker.BatchError` (`internal/broker/scheduler.go`) + partition-by-index in `Sweeper.Tick` (`internal/broker/outbox.go`). |
| 2. Dead-letter cap | `OutboxSweeperOpts.MaxAttempts` (default 10) + `task_outbox_dead` table (migration `20260423132003_outbox_dead_letter.sql`). |
| 3. Cancel cleanup | `jobs.CancelJob` now deletes the job's `task_outbox` rows in the same tx (`internal/jobs/manager.go`). |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docs/diagnostics/outbox-aging-investigation.md` around lines 218 - 225, The
doc currently implies both cancel and archive cleanup landed, but only
jobs.CancelJob was implemented; update the Outcome table and surrounding text
(the "3. Cancel/archive cleanup" row and the Line 218 lead-in) to say only
"Cancel cleanup" or similar and reference jobs.CancelJob and its behavior
(deletes task_outbox rows in the same tx) so ops readers aren't misled into
thinking archive cleanup exists when it does not.

@grafana-sync-hover-simon-smallchua
Copy link
Copy Markdown

Hey there! 👋
Grafana spotted some changes to your dashboard.

See the original and preview of hover-overview.json.


Posted by simonsmallchua.grafana.net · Repository: Repository (repository-27dc5a5)

@github-actions
Copy link
Copy Markdown
Contributor

🐝 Review App Deployed

Homepage: https://hover-pr-340.fly.dev
Dashboard: https://hover-pr-340.fly.dev/dashboard

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
scripts/start.sh (1)

12-32: ⚠️ Potential issue | 🟡 Minor

Validate APP_BIN before starting Alloy.

If ./${APP_BIN} is invalid, the script exits at Line 31 after Alloy has already been spawned. Reorder this so startup fails cleanly before any sidecar process is launched.

Proposed diff
 APP_BIN="${1:-main}"
 
+if [ ! -x "./${APP_BIN}" ]; then
+  echo "start.sh: ./${APP_BIN} is not executable in $(pwd)" >&2
+  exit 127
+fi
+
 # Start Alloy metrics agent in background (skipped if either credential is absent)
 alloy_pid=""
 if [ -n "$GRAFANA_CLOUD_API_KEY" ] && [ -n "$GRAFANA_CLOUD_USER" ]; then
   echo "Starting Alloy metrics agent for ${APP_BIN}"
   /usr/local/bin/alloy run --storage.path=/tmp/alloy-wal /app/alloy.river &
   alloy_pid=$!
 else
   echo "Grafana Cloud credentials not fully set, skipping metrics agent"
 fi
@@
-if [ ! -x "./${APP_BIN}" ]; then
-  echo "start.sh: ./${APP_BIN} is not executable in $(pwd)" >&2
-  exit 127
-fi
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@scripts/start.sh` around lines 12 - 32, Validate the application binary
before launching the Alloy sidecar: move the executable check for ./${APP_BIN}
(the if [ ! -x "./${APP_BIN}" ] block) so it runs before the Alloy spawn code
that starts /usr/local/bin/alloy run and sets alloy_pid; ensure the script exits
(exit 127) if APP_BIN is not executable and only then proceed to start Alloy,
preserving the term() function and trap INT TERM behavior so no alloy process is
launched when the startup should fail.
🧹 Nitpick comments (1)
scripts/start.sh (1)

6-10: Consider restricting APP_BIN to known roles (main/worker).

Allowing arbitrary values for $1 makes misconfiguration easier; a small guard gives deterministic failures and a tighter startup contract.

Proposed diff
 APP_BIN="${1:-main}"
+case "$APP_BIN" in
+  main|worker) ;;
+  *)
+    echo "start.sh: unsupported binary '$APP_BIN' (allowed: main, worker)" >&2
+    exit 64
+    ;;
+esac
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@scripts/start.sh` around lines 6 - 10, The script currently assigns
APP_BIN="${1:-main}" allowing arbitrary binaries; add a validation step after
reading $1 to restrict allowed values to "main" or "worker" (default to "main"
when empty) and exit non-zero with a clear error message if an invalid role is
supplied. Update the start.sh logic that sets APP_BIN to normalize/validate the
input and fail fast, referencing the APP_BIN variable to enforce this tight
startup contract.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In `@scripts/start.sh`:
- Around line 12-32: Validate the application binary before launching the Alloy
sidecar: move the executable check for ./${APP_BIN} (the if [ ! -x
"./${APP_BIN}" ] block) so it runs before the Alloy spawn code that starts
/usr/local/bin/alloy run and sets alloy_pid; ensure the script exits (exit 127)
if APP_BIN is not executable and only then proceed to start Alloy, preserving
the term() function and trap INT TERM behavior so no alloy process is launched
when the startup should fail.

---

Nitpick comments:
In `@scripts/start.sh`:
- Around line 6-10: The script currently assigns APP_BIN="${1:-main}" allowing
arbitrary binaries; add a validation step after reading $1 to restrict allowed
values to "main" or "worker" (default to "main" when empty) and exit non-zero
with a clear error message if an invalid role is supplied. Update the start.sh
logic that sets APP_BIN to normalize/validate the input and fail fast,
referencing the APP_BIN variable to enforce this tight startup contract.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 1a0bddbb-64df-43d5-b95d-aecb1282dcd3

📥 Commits

Reviewing files that changed from the base of the PR and between ef98367 and 59ba992.

📒 Files selected for processing (4)
  • .fly/review_apps.worker.toml
  • CHANGELOG.md
  • fly.worker.toml
  • scripts/start.sh
✅ Files skipped from review due to trivial changes (1)
  • CHANGELOG.md

@simonsmallchua
Copy link
Copy Markdown
Contributor Author

Superseded by #342, which now contains all of this plus the Alloy sidecar fix and the 42P05 counter-sync fix on one branch.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant