fix: periodic reaper revives orphan jobs left stuck by lost events#24
Merged
Conversation
The atomic-claim fix in #23 prevented duplicate processing but introduced a regression: when a worker crashed mid-RUN, the EDA redelivery's `mark_running` was rejected by the fresh lease, the bus cursor advanced past the event, and the job was left stuck in RUNNING forever. Same shape orphan exists for: 1. QUEUED rows whose submit handler crashed between row INSERT and outbox PUBLISH (the two are not co-transactional); 2. QUEUED rows whose retry-path `_delayed_publish` task was killed before its `asyncio.sleep` completed; 3. RUNNING rows whose claimant crashed past `job_run_lease_s`; 4. PARTIAL_SUCCEEDED rows whose bbox event was never published (main worker crashed between `mark_partial_succeeded` and `publisher.publish` for the refine topic); 5. REFINING_BBOXES rows whose bbox claimant crashed past `bbox_refine_lease_s`. All five were demonstrated against real Postgres before this change. Fix: a `JobReaper` (RUNNING + QUEUED) and `BboxReaper` (REFINING_BBOXES + PARTIAL_SUCCEEDED-pending) periodically query for the stuck rows and republish a fresh EDA event for each. Duplicate publishes across replicas are deduped at claim time by the existing atomic `mark_*` transitions, so running a reaper in every worker container is safe. Recovery time is bounded by `reaper_sweep_interval_s + job_run_lease_s` (default ~22 min for the shipped `async_timeout_s=1200`; falls proportionally for tighter timeouts). Tightens lease defaults from `2 * timeout` to `timeout + 60s` so crash recovery is faster while still leaving headroom for commit latency on the legitimate finalisation transitions. Wires both reapers as a sidecar task to their respective worker processes via `asyncio.wait(FIRST_COMPLETED)` -- worker failure takes the reaper down with it and vice versa, so a container restart fully resets both. Adds 4 new `find_stale_*` repository helpers + 10 unit tests covering each finder + reaper sweep behaviour + 3 real-Postgres integration tests proving end-to-end orphan revival. 353 passing (was 340).
ancongui
added a commit
that referenced
this pull request
May 19, 2026
* chore: clean up ruff findings introduced by #23 / #24 / #25 The Lint check failed on all three merged PRs because the new files tripped ``F401`` (unused ``typing.Any``), ``SIM105`` (replace ``try``/``except``/``pass`` with ``contextlib.suppress``), ``UP041`` (replace ``asyncio.TimeoutError`` with builtin ``TimeoutError``), ``I001`` (import ordering), and ``F841`` (unused local). The other CI jobs (Unit tests, SDK Python, SDK Java, Typecheck, Docling) were all green on each PR; the merges weren't gated on Lint. This is the follow-up sweep so ``ruff check`` is clean on ``main``. 11 errors fixed (8 auto-fixed by ``ruff --fix``, 3 manual). * chore: apply ``ruff format`` to the same files The Lint job runs both ``ruff check`` and ``ruff format --check``. The previous commit cleared the ``check`` half; this one runs ``ruff format`` over the 8 files in the same change set so the formatter half passes too. No behaviour change. --------- Co-authored-by: ancongui <andres.contreras@soon.es>
6 tasks
ancongui
added a commit
that referenced
this pull request
May 31, 2026
The atomic-claim fix in #23 prevented duplicate processing but introduced a regression: when a worker crashed mid-RUN, the EDA redelivery's `mark_running` was rejected by the fresh lease, the bus cursor advanced past the event, and the job was left stuck in RUNNING forever. Same shape orphan exists for: 1. QUEUED rows whose submit handler crashed between row INSERT and outbox PUBLISH (the two are not co-transactional); 2. QUEUED rows whose retry-path `_delayed_publish` task was killed before its `asyncio.sleep` completed; 3. RUNNING rows whose claimant crashed past `job_run_lease_s`; 4. PARTIAL_SUCCEEDED rows whose bbox event was never published (main worker crashed between `mark_partial_succeeded` and `publisher.publish` for the refine topic); 5. REFINING_BBOXES rows whose bbox claimant crashed past `bbox_refine_lease_s`. All five were demonstrated against real Postgres before this change. Fix: a `JobReaper` (RUNNING + QUEUED) and `BboxReaper` (REFINING_BBOXES + PARTIAL_SUCCEEDED-pending) periodically query for the stuck rows and republish a fresh EDA event for each. Duplicate publishes across replicas are deduped at claim time by the existing atomic `mark_*` transitions, so running a reaper in every worker container is safe. Recovery time is bounded by `reaper_sweep_interval_s + job_run_lease_s` (default ~22 min for the shipped `async_timeout_s=1200`; falls proportionally for tighter timeouts). Tightens lease defaults from `2 * timeout` to `timeout + 60s` so crash recovery is faster while still leaving headroom for commit latency on the legitimate finalisation transitions. Wires both reapers as a sidecar task to their respective worker processes via `asyncio.wait(FIRST_COMPLETED)` -- worker failure takes the reaper down with it and vice versa, so a container restart fully resets both. Adds 4 new `find_stale_*` repository helpers + 10 unit tests covering each finder + reaper sweep behaviour + 3 real-Postgres integration tests proving end-to-end orphan revival. 353 passing (was 340). Co-authored-by: ancongui <andres.contreras@soon.es>
ancongui
added a commit
that referenced
this pull request
May 31, 2026
* chore: clean up ruff findings introduced by #23 / #24 / #25 The Lint check failed on all three merged PRs because the new files tripped ``F401`` (unused ``typing.Any``), ``SIM105`` (replace ``try``/``except``/``pass`` with ``contextlib.suppress``), ``UP041`` (replace ``asyncio.TimeoutError`` with builtin ``TimeoutError``), ``I001`` (import ordering), and ``F841`` (unused local). The other CI jobs (Unit tests, SDK Python, SDK Java, Typecheck, Docling) were all green on each PR; the merges weren't gated on Lint. This is the follow-up sweep so ``ruff check`` is clean on ``main``. 11 errors fixed (8 auto-fixed by ``ruff --fix``, 3 manual). * chore: apply ``ruff format`` to the same files The Lint job runs both ``ruff check`` and ``ruff format --check``. The previous commit cleared the ``check`` half; this one runs ``ruff format`` over the 8 files in the same change set so the formatter half passes too. No behaviour change. --------- Co-authored-by: ancongui <andres.contreras@soon.es>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
A second-pass audit on top of #23 found that the atomic-claim fix had a regression: when a worker crashes mid-RUN, the EDA redelivery's
mark_runningis rejected by the fresh lease, the bus cursor advances past the event, and the job is stuck inRUNNINGforever. Four other orphan classes shared the same shape.This adds a periodic reaper (
JobReaper+BboxReaper) that closes the gap.The five orphan classes (all confirmed against real Postgres before this change)
QUEUEDJobReaperRUNNINGJobReaperQUEUED_delayed_publishtask was killed before itsasyncio.sleepcompletedJobReaperPARTIAL_SUCCEEDEDmark_partial_succeededand the bbox-refine publishBboxReaperREFINING_BBOXESBboxReaperHow the reaper works
Each worker process now runs a reaper as a sidecar task alongside its main consume loop. Every
reaper_sweep_interval_s(default 60s) the reaper queries for rows in stuck states past their threshold and republishes a freshIDPJobSubmitted/IDPBboxRefineRequestedevent. Duplicate publishes from multiple replicas are deduped at claim time by the atomicmark_*transitions from #23 — running the reaper in every worker replica is safe.Lease tweaks
Tightens defaults from
2 * timeouttotimeout + 60s:job_run_lease_sbbox_refine_lease_sCrash-recovery time is now bounded by
reaper_sweep_interval_s + job_run_lease_s≈ ~22 min default, and falls proportionally for tighterasync_timeout_soverrides.New settings
Test plan
tests/unit/test_reapers.py— 10 unit tests covering each new repository finder + reaper sweep behaviour + clean-shutdown + publisher-error resilience.tests/integration/test_reaper_postgres.py— 3 real-Postgres tests proving end-to-end revival of all five orphan classes plus a full crash-recovery cycle (claim → backdate started_at → reaper → fresh worker claims withattempts=2).cmd_workerandcmd_bbox_workerafter the newasyncio.wait(FIRST_COMPLETED)wiring.