Skip to content

fix: atomic state transitions + race-loser handling for concurrent workers#23

Merged
ancongui merged 1 commit into
mainfrom
fix/concurrency-atomic-state-transitions
May 19, 2026
Merged

fix: atomic state transitions + race-loser handling for concurrent workers#23
ancongui merged 1 commit into
mainfrom
fix/concurrency-atomic-state-transitions

Conversation

@ancongui
Copy link
Copy Markdown
Contributor

Summary

  • Every state-changing repository method is now a single atomic UPDATE … WHERE id=? AND status IN (legal_predecessors) RETURNING *. Concurrent writers get serialised by Postgres' row-level UPDATE lock; the WHERE precondition picks one winner and gives None to the loser. The lost-update + duplicate-processing bugs are gone.
  • mark_running and mark_bbox_refining accept a lease_seconds window so a worker crashing mid-run is recoverable on redelivery (stale lease → reclaim) but a fresh redelivery to a peer worker is rejected (fresh lease → bail).
  • Cancel handler is a single conditional UPDATE: the cancel/worker-claim race is settled by Postgres, not by a Python-side check.
  • Submit handler catches the IntegrityError from concurrent same-idempotency-key submits and returns the idempotent shape instead of a 500.
  • Workers (job + bbox) silently bail when their atomic claim returns None, and skip webhooks / republishes when finalisation didn't actually win.
  • 25 new unit tests + 5 real-Postgres integration tests. 340 passing total (was 286).

What was broken

Bug Before After
Two replicas dispatch the same event in parallel both run LLM extraction, both mark_running increments lose updates, both mark_succeeded last-writer-wins, webhook delivered twice exactly one worker claims; loser bails silently
Cancel races with worker claim TOCTOU window between SELECT + mark_cancelled; could clobber a RUNNING job to CANCELLED then worker still finishes atomic mark_cancelled WHERE status='QUEUED'; exactly one of cancel/claim wins
Concurrent submit with same idempotency_key second submit hits unique-index violation → 500 catches IntegrityError, re-resolves winner, returns idempotent 202
Worker crash mid-run redelivered event re-claims unconditionally, attempts double-counted stale lease → atomic re-claim with one attempt increment; fresh lease excludes peers
update(status='QUEUED') retry path could clobber a concurrent cancel atomic requeue_for_retry WHERE status='RUNNING'

Companion change

Requires fireflyframework-pyfly#13 (per-group advisory lock on PostgresEventBus._drain). Merged.

Test plan

  • tests/unit/test_extraction_job_repository.py — 18 tests against SQLite covering happy path + race losses + stale-lease reclaim + concurrent winners.
  • tests/unit/test_worker_concurrency.py — 5 tests verifying workers bail / don't double-fire webhooks when claims or finalisations return None.
  • tests/unit/test_submit_job_handler.py — 2 new tests for the IntegrityError recovery and re-raise paths.
  • tests/integration/test_postgres_concurrency.py — 4 real-Postgres tests for atomic transitions, cancel-vs-claim race, stale-lease reclaim, finalisation idempotency.
  • tests/integration/test_eda_advisory_lock.py — 2 concurrent PostgresEventBus instances against real Postgres deliver each of 20 events exactly once across the pair (no duplicates).
  • Full unit suite: 293 passed, 1 skipped (was 286, +7 net; +25 new minus refactored stubs).
  • Combined with pyfly EDA + integration: 340 passed, 1 skipped.

…rkers

Audit found that with multiple `worker` / `bbox-worker` replicas the
job pipeline duplicated work, lost updates, and let cancels race with
the worker -- the repository was all read-modify-write without any
row-level lock, and the cancel/submit handlers had TOCTOU windows.

Repository (`extraction_job_repository.py`):
- Every `mark_*` is now a single `UPDATE … WHERE id=? AND status IN
  (legal_predecessors) RETURNING *`. Postgres serialises concurrent
  writers via row-level UPDATE locks; the WHERE precondition picks
  exactly one winner. The loser gets None.
- `mark_running` and `mark_bbox_refining` accept `lease_seconds`. A
  fresh lease excludes redeliveries; a stale lease lets a crashed
  claim be reclaimed -- crash recovery without herd duplicates.
- New `requeue_for_retry` / `requeue_bbox_refine` for atomic
  RUNNING→QUEUED and REFINING_BBOXES→PARTIAL_SUCCEEDED transitions.

Config (`config.py`): adds `job_run_lease_s` (2x `async_timeout_s`)
and `bbox_refine_lease_s` (2x `bbox_refine_timeout_s`).

Workers (`job_worker.py`, `bbox_refine_worker.py`): pass lease into
the claim, bail silently when the atomic claim returns None, and
gate the webhook / republish on the finalisation actually winning.
Retry paths use the new atomic requeues.

Cancel (`cancel_job_handler.py`): single conditional UPDATE.
Disambiguates 404 vs 409 only after the atomic write decides.

Submit (`submit_job_handler.py`): catches `IntegrityError` on the
partial-unique-index collision two concurrent same-key submits cause,
re-reads the winning row, and returns the idempotent response shape.

Tests: 25 new unit tests + 5 real-Postgres integration tests covering
atomic transitions, race-loser bail behaviour, idempotency-key races,
stale-lease reclaim, and the per-group EDA advisory lock dispatching
each event exactly once across two concurrent bus instances. 340
passing total (was 286).
@ancongui ancongui merged commit 9c19ad3 into main May 19, 2026
5 of 6 checks passed
@ancongui ancongui deleted the fix/concurrency-atomic-state-transitions branch May 19, 2026 09:31
ancongui added a commit that referenced this pull request May 19, 2026
The atomic-claim fix in #23 prevented duplicate processing but
introduced a regression: when a worker crashed mid-RUN, the EDA
redelivery's `mark_running` was rejected by the fresh lease, the bus
cursor advanced past the event, and the job was left stuck in RUNNING
forever. Same shape orphan exists for:

  1. QUEUED rows whose submit handler crashed between row INSERT and
     outbox PUBLISH (the two are not co-transactional);
  2. QUEUED rows whose retry-path `_delayed_publish` task was killed
     before its `asyncio.sleep` completed;
  3. RUNNING rows whose claimant crashed past `job_run_lease_s`;
  4. PARTIAL_SUCCEEDED rows whose bbox event was never published
     (main worker crashed between `mark_partial_succeeded` and
     `publisher.publish` for the refine topic);
  5. REFINING_BBOXES rows whose bbox claimant crashed past
     `bbox_refine_lease_s`.

All five were demonstrated against real Postgres before this change.

Fix: a `JobReaper` (RUNNING + QUEUED) and `BboxReaper`
(REFINING_BBOXES + PARTIAL_SUCCEEDED-pending) periodically query for
the stuck rows and republish a fresh EDA event for each. Duplicate
publishes across replicas are deduped at claim time by the existing
atomic `mark_*` transitions, so running a reaper in every worker
container is safe. Recovery time is bounded by
`reaper_sweep_interval_s + job_run_lease_s` (default ~22 min for the
shipped `async_timeout_s=1200`; falls proportionally for tighter
timeouts).

Tightens lease defaults from `2 * timeout` to `timeout + 60s` so
crash recovery is faster while still leaving headroom for commit
latency on the legitimate finalisation transitions.

Wires both reapers as a sidecar task to their respective worker
processes via `asyncio.wait(FIRST_COMPLETED)` -- worker failure
takes the reaper down with it and vice versa, so a container restart
fully resets both.

Adds 4 new `find_stale_*` repository helpers + 10 unit tests
covering each finder + reaper sweep behaviour + 3 real-Postgres
integration tests proving end-to-end orphan revival. 353 passing
(was 340).

Co-authored-by: ancongui <andres.contreras@soon.es>
ancongui added a commit that referenced this pull request May 19, 2026
* chore: clean up ruff findings introduced by #23 / #24 / #25

The Lint check failed on all three merged PRs because the new files
tripped ``F401`` (unused ``typing.Any``), ``SIM105`` (replace
``try``/``except``/``pass`` with ``contextlib.suppress``), ``UP041``
(replace ``asyncio.TimeoutError`` with builtin ``TimeoutError``),
``I001`` (import ordering), and ``F841`` (unused local).

The other CI jobs (Unit tests, SDK Python, SDK Java, Typecheck, Docling)
were all green on each PR; the merges weren't gated on Lint. This is
the follow-up sweep so ``ruff check`` is clean on ``main``.

11 errors fixed (8 auto-fixed by ``ruff --fix``, 3 manual).

* chore: apply ``ruff format`` to the same files

The Lint job runs both ``ruff check`` and ``ruff format --check``.
The previous commit cleared the ``check`` half; this one runs
``ruff format`` over the 8 files in the same change set so the
formatter half passes too.

No behaviour change.

---------

Co-authored-by: ancongui <andres.contreras@soon.es>
ancongui added a commit that referenced this pull request May 31, 2026
…rkers (#23)

Audit found that with multiple `worker` / `bbox-worker` replicas the
job pipeline duplicated work, lost updates, and let cancels race with
the worker -- the repository was all read-modify-write without any
row-level lock, and the cancel/submit handlers had TOCTOU windows.

Repository (`extraction_job_repository.py`):
- Every `mark_*` is now a single `UPDATE … WHERE id=? AND status IN
  (legal_predecessors) RETURNING *`. Postgres serialises concurrent
  writers via row-level UPDATE locks; the WHERE precondition picks
  exactly one winner. The loser gets None.
- `mark_running` and `mark_bbox_refining` accept `lease_seconds`. A
  fresh lease excludes redeliveries; a stale lease lets a crashed
  claim be reclaimed -- crash recovery without herd duplicates.
- New `requeue_for_retry` / `requeue_bbox_refine` for atomic
  RUNNING→QUEUED and REFINING_BBOXES→PARTIAL_SUCCEEDED transitions.

Config (`config.py`): adds `job_run_lease_s` (2x `async_timeout_s`)
and `bbox_refine_lease_s` (2x `bbox_refine_timeout_s`).

Workers (`job_worker.py`, `bbox_refine_worker.py`): pass lease into
the claim, bail silently when the atomic claim returns None, and
gate the webhook / republish on the finalisation actually winning.
Retry paths use the new atomic requeues.

Cancel (`cancel_job_handler.py`): single conditional UPDATE.
Disambiguates 404 vs 409 only after the atomic write decides.

Submit (`submit_job_handler.py`): catches `IntegrityError` on the
partial-unique-index collision two concurrent same-key submits cause,
re-reads the winning row, and returns the idempotent response shape.

Tests: 25 new unit tests + 5 real-Postgres integration tests covering
atomic transitions, race-loser bail behaviour, idempotency-key races,
stale-lease reclaim, and the per-group EDA advisory lock dispatching
each event exactly once across two concurrent bus instances. 340
passing total (was 286).

Co-authored-by: ancongui <andres.contreras@soon.es>
ancongui added a commit that referenced this pull request May 31, 2026
The atomic-claim fix in #23 prevented duplicate processing but
introduced a regression: when a worker crashed mid-RUN, the EDA
redelivery's `mark_running` was rejected by the fresh lease, the bus
cursor advanced past the event, and the job was left stuck in RUNNING
forever. Same shape orphan exists for:

  1. QUEUED rows whose submit handler crashed between row INSERT and
     outbox PUBLISH (the two are not co-transactional);
  2. QUEUED rows whose retry-path `_delayed_publish` task was killed
     before its `asyncio.sleep` completed;
  3. RUNNING rows whose claimant crashed past `job_run_lease_s`;
  4. PARTIAL_SUCCEEDED rows whose bbox event was never published
     (main worker crashed between `mark_partial_succeeded` and
     `publisher.publish` for the refine topic);
  5. REFINING_BBOXES rows whose bbox claimant crashed past
     `bbox_refine_lease_s`.

All five were demonstrated against real Postgres before this change.

Fix: a `JobReaper` (RUNNING + QUEUED) and `BboxReaper`
(REFINING_BBOXES + PARTIAL_SUCCEEDED-pending) periodically query for
the stuck rows and republish a fresh EDA event for each. Duplicate
publishes across replicas are deduped at claim time by the existing
atomic `mark_*` transitions, so running a reaper in every worker
container is safe. Recovery time is bounded by
`reaper_sweep_interval_s + job_run_lease_s` (default ~22 min for the
shipped `async_timeout_s=1200`; falls proportionally for tighter
timeouts).

Tightens lease defaults from `2 * timeout` to `timeout + 60s` so
crash recovery is faster while still leaving headroom for commit
latency on the legitimate finalisation transitions.

Wires both reapers as a sidecar task to their respective worker
processes via `asyncio.wait(FIRST_COMPLETED)` -- worker failure
takes the reaper down with it and vice versa, so a container restart
fully resets both.

Adds 4 new `find_stale_*` repository helpers + 10 unit tests
covering each finder + reaper sweep behaviour + 3 real-Postgres
integration tests proving end-to-end orphan revival. 353 passing
(was 340).

Co-authored-by: ancongui <andres.contreras@soon.es>
ancongui added a commit that referenced this pull request May 31, 2026
* chore: clean up ruff findings introduced by #23 / #24 / #25

The Lint check failed on all three merged PRs because the new files
tripped ``F401`` (unused ``typing.Any``), ``SIM105`` (replace
``try``/``except``/``pass`` with ``contextlib.suppress``), ``UP041``
(replace ``asyncio.TimeoutError`` with builtin ``TimeoutError``),
``I001`` (import ordering), and ``F841`` (unused local).

The other CI jobs (Unit tests, SDK Python, SDK Java, Typecheck, Docling)
were all green on each PR; the merges weren't gated on Lint. This is
the follow-up sweep so ``ruff check`` is clean on ``main``.

11 errors fixed (8 auto-fixed by ``ruff --fix``, 3 manual).

* chore: apply ``ruff format`` to the same files

The Lint job runs both ``ruff check`` and ``ruff format --check``.
The previous commit cleared the ``check`` half; this one runs
``ruff format`` over the 8 files in the same change set so the
formatter half passes too.

No behaviour change.

---------

Co-authored-by: ancongui <andres.contreras@soon.es>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant