Skip to content

feat(bbox): async out-of-band refinement via second-stage EDA worker#8

Merged
ancongui merged 1 commit into
mainfrom
feat/bbox-refinement-async
May 15, 2026
Merged

feat(bbox): async out-of-band refinement via second-stage EDA worker#8
ancongui merged 1 commit into
mainfrom
feat/bbox-refinement-async

Conversation

@ancongui
Copy link
Copy Markdown
Contributor

Summary

Phase 2 of the bbox refinement initiative. The async (queue-backed) API now supports grounded bboxes without blocking the main extraction. When options.stages.bbox_refine == true:

  1. The main pipeline finishes in PARTIAL_SUCCEEDED with the LLM-bbox result immediately readable via GET /api/v1/jobs/{id}/result.
  2. JobWorker fires the partial webhook + publishes IDPBboxRefineRequested on a new EDA destination (flydesk.idp.bbox.refine).
  3. A new BboxRefineWorker — its own container, own consumer loop — subscribes to that destination, runs BboxRefiner against the persisted result, transitions PARTIAL_SUCCEEDED → REFINING_BBOXES → SUCCEEDED, and fires the final webhook.
  4. On refine failure: the partial result stays readable, the job stays PARTIAL_SUCCEEDED, and the bbox-refine sub-state column carries the failure context.

When bbox_refine == false (default), today's QUEUED → RUNNING → SUCCEEDED flow is unchanged.

State machine

JobStatus enum:

  • New: PARTIAL_SUCCEEDED, REFINING_BBOXES
  • New predicate: has_result (true for SUCCEEDED + PARTIAL_SUCCEEDED + REFINING_BBOXES)

BboxRefineStatus enum (sub-state, null for jobs that didn't ask):

  • pending (event published, worker hasn't picked it up)
  • running
  • succeeded
  • failed
QUEUED -> RUNNING -> SUCCEEDED                                          (refine=false)
QUEUED -> RUNNING -> PARTIAL_SUCCEEDED -> REFINING_BBOXES -> SUCCEEDED  (refine=true)
                                       \\-> stays PARTIAL_SUCCEEDED on refine failure
                                           (LLM-bbox result remains readable)

Persistence

Alembic migration 0002_bbox_refine_columns adds six nullable columns to extraction_jobs:

  • bbox_refine_status (text)
  • bbox_refine_attempts (int, default 0)
  • bbox_refine_started_at (timestamptz)
  • bbox_refine_finished_at (timestamptz)
  • bbox_refine_error_code (text)
  • bbox_refine_error_message (text)

Repository helpers: mark_partial_succeeded, mark_bbox_refining, mark_bbox_refined, mark_bbox_refine_failed.

API

  • GET /api/v1/jobs/{id}/result now serves PARTIAL_SUCCEEDED + REFINING_BBOXES too.
  • New query params:
    • wait_for_bboxes (bool): long-poll until the refiner finishes or the timeout elapses.
    • timeout (seconds, default 60.0): poll deadline.
    • On timeout the partial result is returned with the LLM-bbox version intact (never 408 — partial is a valid response shape).
  • GET /api/v1/jobs/{id} (status) now exposes the bbox refine sub-state, attempts, timestamps, and error context for callers that want fine-grained observability without polling the result endpoint.

Framework wiring

  • New CLI command flydesk-idp bbox-worker mirroring worker.
  • New bbox-worker service in docker-compose.yml (same image as api + worker, only entrypoint differs).
  • BboxRefineWorker follows the exact EventPublisher.subscribe(event_type, handler) + .start() pattern as JobWorker — pyfly EDA primitives, nothing bespoke.
  • Retry policy: exponential backoff with jitter up to bbox_refine_max_attempts (default 3); permanent errors (ValueError, content-policy, invalid-model) mark the leg failed immediately without retry.
  • Refiner reconstructs the document by re-running BinaryNormalizer.normalise on the saved input bytes — deterministic, idempotent, no need to persist normalised bytes alongside the job (keeps the "never store document bytes" invariant; the original base64 is already there for the worker's resume flow).

Test plan

  • pytest tests/unit197 passed, 1 skipped
  • ruff format --check + ruff check clean
  • pyright src/flydesk_idp 0 errors
  • 27 new unit tests:
    • test_job_status_enum.py (15): terminal + has_result predicates per status
    • test_bbox_refine_worker.py (4): happy path (grounding flips status), idempotent skip on non-PARTIAL_SUCCEEDED, drops unknown job, permanent-error path marks failed without republish
    • test_get_job_result_handler.py (8): returns result on SUCCEEDED / PARTIAL_SUCCEEDED / REFINING_BBOXES, raises JobNotReady on QUEUED / FAILED, returns None on missing job, long-poll respects timeout, long-poll exits early when status flips

Phase status

Phase Status PR
0: Binary normalization (DOCX/EML/ZIP/HEIC) merged #6
1: Sync bbox refinement (PyMuPDF + matcher) merged #7
2: Async out-of-band refinement this PR #8

OCR engine adapters (Paddle / Mistral) remain follow-up work — they swap into the existing OcrEngine Protocol without touching this PR's contract.

Phase 2 of the bbox refinement initiative. The async (queue-backed) API
now supports grounded bboxes without blocking the main extraction. When
``options.stages.bbox_refine == true``:

1. The main pipeline finishes in ``PARTIAL_SUCCEEDED`` with the
   LLM-bbox result immediately readable via ``GET .../{id}/result``.
2. ``JobWorker`` fires the partial webhook + publishes
   ``IDPBboxRefineRequested`` on a new EDA destination
   (``flydesk.idp.bbox.refine``).
3. A new ``BboxRefineWorker`` (own container, own consumer loop)
   subscribes to that destination, runs ``BboxRefiner`` against the
   persisted result, transitions ``PARTIAL_SUCCEEDED -> REFINING_BBOXES
   -> SUCCEEDED``, and fires the final webhook.
4. On refine failure: the partial result stays readable, the job stays
   ``PARTIAL_SUCCEEDED``, and the bbox-refine sub-state column carries
   the failure context for observability.

When ``bbox_refine == false`` (default), today's ``QUEUED -> RUNNING ->
SUCCEEDED`` flow is unchanged.

State machine:
* ``JobStatus`` enum gains ``PARTIAL_SUCCEEDED`` + ``REFINING_BBOXES``.
* ``has_result`` predicate (new) lets the result endpoint return the
  LLM-bbox payload while the refiner is still in flight.
* ``BboxRefineStatus`` enum tracks the sub-state of the refine leg
  (``pending`` / ``running`` / ``succeeded`` / ``failed``).

Persistence:
* Alembic migration ``0002_bbox_refine_columns`` adds the bbox-refine
  per-leg columns (status, attempts, started_at, finished_at, error
  code + message). All nullable so jobs that don't request refinement
  never populate them.
* Repository helpers: ``mark_partial_succeeded``, ``mark_bbox_refining``,
  ``mark_bbox_refined``, ``mark_bbox_refine_failed``.

API:
* ``GET /api/v1/jobs/{id}/result`` now serves PARTIAL_SUCCEEDED +
  REFINING_BBOXES too. New query params
  ``wait_for_bboxes`` (bool) + ``timeout`` (seconds) long-poll until the
  refiner finishes or the deadline elapses; on timeout the partial
  result is returned with the LLM-bbox version intact.
* ``GET /api/v1/jobs/{id}`` exposes the bbox refine sub-state, attempts,
  timestamps, and error context.

Framework wiring:
* New CLI command ``flydesk-idp bbox-worker`` mirroring ``worker``.
* New ``bbox-worker`` service in ``docker-compose.yml``.
* ``BboxRefineWorker`` follows the same ``EventPublisher.subscribe`` +
  ``.start()`` pattern as ``JobWorker``. Retry policy: exponential
  backoff with jitter up to ``bbox_refine_max_attempts``; permanent
  errors mark the leg failed without retry.

Tests: 27 new unit tests covering JobStatus predicates (is_terminal,
has_result), BboxRefineWorker happy path / idempotent skip / unknown
job / permanent error, GetJobResultHandler partial reads + long-poll +
not-ready branches. Full suite (197 tests) green; pyright + ruff clean.
@ancongui ancongui merged commit 3f19063 into main May 15, 2026
4 checks passed
@ancongui ancongui deleted the feat/bbox-refinement-async branch May 15, 2026 09:01
ancongui added a commit that referenced this pull request May 31, 2026
)

Phase 2 of the bbox refinement initiative. The async (queue-backed) API
now supports grounded bboxes without blocking the main extraction. When
``options.stages.bbox_refine == true``:

1. The main pipeline finishes in ``PARTIAL_SUCCEEDED`` with the
   LLM-bbox result immediately readable via ``GET .../{id}/result``.
2. ``JobWorker`` fires the partial webhook + publishes
   ``IDPBboxRefineRequested`` on a new EDA destination
   (``flydesk.idp.bbox.refine``).
3. A new ``BboxRefineWorker`` (own container, own consumer loop)
   subscribes to that destination, runs ``BboxRefiner`` against the
   persisted result, transitions ``PARTIAL_SUCCEEDED -> REFINING_BBOXES
   -> SUCCEEDED``, and fires the final webhook.
4. On refine failure: the partial result stays readable, the job stays
   ``PARTIAL_SUCCEEDED``, and the bbox-refine sub-state column carries
   the failure context for observability.

When ``bbox_refine == false`` (default), today's ``QUEUED -> RUNNING ->
SUCCEEDED`` flow is unchanged.

State machine:
* ``JobStatus`` enum gains ``PARTIAL_SUCCEEDED`` + ``REFINING_BBOXES``.
* ``has_result`` predicate (new) lets the result endpoint return the
  LLM-bbox payload while the refiner is still in flight.
* ``BboxRefineStatus`` enum tracks the sub-state of the refine leg
  (``pending`` / ``running`` / ``succeeded`` / ``failed``).

Persistence:
* Alembic migration ``0002_bbox_refine_columns`` adds the bbox-refine
  per-leg columns (status, attempts, started_at, finished_at, error
  code + message). All nullable so jobs that don't request refinement
  never populate them.
* Repository helpers: ``mark_partial_succeeded``, ``mark_bbox_refining``,
  ``mark_bbox_refined``, ``mark_bbox_refine_failed``.

API:
* ``GET /api/v1/jobs/{id}/result`` now serves PARTIAL_SUCCEEDED +
  REFINING_BBOXES too. New query params
  ``wait_for_bboxes`` (bool) + ``timeout`` (seconds) long-poll until the
  refiner finishes or the deadline elapses; on timeout the partial
  result is returned with the LLM-bbox version intact.
* ``GET /api/v1/jobs/{id}`` exposes the bbox refine sub-state, attempts,
  timestamps, and error context.

Framework wiring:
* New CLI command ``flydesk-idp bbox-worker`` mirroring ``worker``.
* New ``bbox-worker`` service in ``docker-compose.yml``.
* ``BboxRefineWorker`` follows the same ``EventPublisher.subscribe`` +
  ``.start()`` pattern as ``JobWorker``. Retry policy: exponential
  backoff with jitter up to ``bbox_refine_max_attempts``; permanent
  errors mark the leg failed without retry.

Tests: 27 new unit tests covering JobStatus predicates (is_terminal,
has_result), BboxRefineWorker happy path / idempotent skip / unknown
job / permanent error, GetJobResultHandler partial reads + long-poll +
not-ready branches. Full suite (197 tests) green; pyright + ruff clean.

Co-authored-by: ancongui <andres.contreras@soon.es>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant