Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -126,5 +126,36 @@ services:
gotenberg:
condition: service_healthy

# Second-stage EDA consumer: subscribes to ``flydesk.idp.bbox.refine``
# events that ``worker`` publishes when an extraction finishes with
# ``options.stages.bbox_refine == true``. Grounds bboxes against the
# PDF text layer / configured OCR engine, persists the refined result,
# transitions the job ``PARTIAL_SUCCEEDED -> SUCCEEDED``, and fires the
# final webhook. Same image as the API + worker; only the entrypoint
# differs.
bbox-worker:
build: *service-build
image: flydesk-idp:latest
container_name: flydesk-idp-bbox-worker
command: ["bbox-worker"]
env_file:
- path: .env
required: false
environment:
FLYDESK_IDP_DATABASE_URL: postgresql+asyncpg://idp:idp@postgres:5432/flydesk_idp
FLYDESK_IDP_REDIS_URL: redis://redis:6379/0
FLYDESK_IDP_EDA_ADAPTER: postgres
FLYDESK_IDP_MODEL: ${FLYDESK_IDP_MODEL:-anthropic:claude-sonnet-4-6}
FLYDESK_IDP_OFFICE_CONVERTER: ${FLYDESK_IDP_OFFICE_CONVERTER:-gotenberg}
FLYDESK_IDP_GOTENBERG_URL: http://gotenberg:3000
ANTHROPIC_API_KEY: ${ANTHROPIC_API_KEY:-}
OPENAI_API_KEY: ${OPENAI_API_KEY:-}
RUN_MIGRATIONS: "false"
depends_on:
api:
condition: service_healthy
gotenberg:
condition: service_healthy

volumes:
postgres_data:
8 changes: 7 additions & 1 deletion docs/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,13 @@ normally have to glue together themselves:
the workflow that called us.

The same call works synchronously (blocking, sub-minute) or as a
queue-backed async job with an HMAC-signed webhook.
queue-backed async job with an HMAC-signed webhook. When bbox grounding
is enabled, async jobs go through a two-stage state machine: the main
extraction lands in ``PARTIAL_SUCCEEDED`` with the LLM-bbox result
immediately readable, then a second EDA worker grounds the coordinates
and flips the job to ``SUCCEEDED``. Callers can poll status, fetch the
partial result, or long-poll ``GET /api/v1/jobs/{id}/result?wait_for_bboxes=true``
to block until grounding finishes.

---

Expand Down
77 changes: 77 additions & 0 deletions migrations/versions/20260515_0002_bbox_refine_columns.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# Copyright 2026 Firefly Software Solutions Inc
"""Add bbox-refine leg columns to ``extraction_jobs``.

Revision ID: 0002_bbox_refine_columns
Revises: 0001_init
Create Date: 2026-05-15

Adds the per-job state for the out-of-band bbox refinement worker:

* ``bbox_refine_status`` -- ``pending`` / ``running`` / ``succeeded``
/ ``failed`` / ``null``.
* ``bbox_refine_attempts`` -- retry counter for the refine leg,
independent of the main extraction's
``attempts``.
* ``bbox_refine_started_at`` -- timestamp the refine worker first
transitioned the leg to ``running``.
* ``bbox_refine_finished_at`` -- timestamp the leg reached a terminal
sub-state.
* ``bbox_refine_error_code`` -- stable code if the refine leg failed
permanently; main result still readable.
* ``bbox_refine_error_message`` -- free-form message paired with the code.

All columns are nullable: jobs submitted without ``stages.bbox_refine``
never populate them and the default flow (``QUEUED -> RUNNING ->
SUCCEEDED``) is unchanged.
"""

from __future__ import annotations

import sqlalchemy as sa
from alembic import op

revision = "0002_bbox_refine_columns"
down_revision = "0001_init"
branch_labels = None
depends_on = None


def upgrade() -> None:
op.add_column(
"extraction_jobs",
sa.Column("bbox_refine_status", sa.String(length=16), nullable=True),
)
op.add_column(
"extraction_jobs",
sa.Column(
"bbox_refine_attempts",
sa.Integer(),
nullable=False,
server_default="0",
),
)
op.add_column(
"extraction_jobs",
sa.Column("bbox_refine_started_at", sa.DateTime(timezone=True), nullable=True),
)
op.add_column(
"extraction_jobs",
sa.Column("bbox_refine_finished_at", sa.DateTime(timezone=True), nullable=True),
)
op.add_column(
"extraction_jobs",
sa.Column("bbox_refine_error_code", sa.String(length=64), nullable=True),
)
op.add_column(
"extraction_jobs",
sa.Column("bbox_refine_error_message", sa.Text(), nullable=True),
)


def downgrade() -> None:
op.drop_column("extraction_jobs", "bbox_refine_error_message")
op.drop_column("extraction_jobs", "bbox_refine_error_code")
op.drop_column("extraction_jobs", "bbox_refine_finished_at")
op.drop_column("extraction_jobs", "bbox_refine_started_at")
op.drop_column("extraction_jobs", "bbox_refine_attempts")
op.drop_column("extraction_jobs", "bbox_refine_status")
56 changes: 49 additions & 7 deletions src/flydesk_idp/cli.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
# Copyright 2026 Firefly Software Solutions Inc
"""CLI entry point for flydesk-idp.

Three subcommands:
Four subcommands:

* ``flydesk-idp serve`` -- run the FastAPI server on the configured port.
* ``flydesk-idp worker`` -- run the EDA worker that consumes the job topic.
* ``flydesk-idp migrate`` -- run ``alembic upgrade head`` against the DB.
* ``flydesk-idp serve`` -- run the FastAPI server on the configured port.
* ``flydesk-idp worker`` -- run the EDA worker that consumes the job topic.
* ``flydesk-idp bbox-worker`` -- run the second-stage EDA worker that grounds
bboxes for jobs whose extraction finished in
``PARTIAL_SUCCEEDED``.
* ``flydesk-idp migrate`` -- run ``alembic upgrade head`` against the DB.

``serve`` lets uvicorn import ``flydesk_idp.main:app`` (pyfly drives the
lifecycle there). ``worker`` boots a minimal :class:`PyFlyApplication`
and pulls the :class:`JobWorker` out of the DI container; it never
constructs the worker itself, so the container owns every dependency.
lifecycle there). The workers boot minimal :class:`PyFlyApplication`
instances and pull their concrete worker classes out of the DI
container; they never construct the workers themselves, so the
container owns every dependency.
"""

from __future__ import annotations
Expand Down Expand Up @@ -81,6 +85,41 @@ async def _run() -> None:
return 0


def cmd_bbox_worker(_: argparse.Namespace) -> int:
"""Boot pyfly, resolve :class:`BboxRefineWorker`, run forever."""

async def _run() -> None:
from pyfly.core import PyFlyApplication
from pyfly.eda import EventPublisher

from flydesk_idp.app import FlydeskIDPApplication
from flydesk_idp.config import IDPSettings
from flydesk_idp.core.services.bbox import BboxRefiner
from flydesk_idp.core.services.binary import BinaryNormalizer
from flydesk_idp.core.services.webhook import WebhookPublisher
from flydesk_idp.core.services.workers.bbox_refine_worker import BboxRefineWorker
from flydesk_idp.models.repositories import ExtractionJobRepository

pyfly_app = PyFlyApplication(FlydeskIDPApplication)
await pyfly_app.startup()
try:
container = pyfly_app.context.container
worker = BboxRefineWorker(
repository=container.resolve(ExtractionJobRepository),
event_publisher=container.resolve(EventPublisher),
webhook=container.resolve(WebhookPublisher),
normalizer=container.resolve(BinaryNormalizer),
refiner=container.resolve(BboxRefiner),
settings=container.resolve(IDPSettings),
)
await worker.run_forever()
finally:
await pyfly_app.shutdown()

asyncio.run(_run())
return 0


def cmd_migrate(_: argparse.Namespace) -> int:
"""Apply Alembic migrations."""
from alembic import command
Expand All @@ -104,6 +143,9 @@ def build_parser() -> argparse.ArgumentParser:
sub_worker = sub.add_parser("worker", help="Run the EDA worker")
sub_worker.set_defaults(func=cmd_worker)

sub_bbox_worker = sub.add_parser("bbox-worker", help="Run the second-stage bbox refinement worker")
sub_bbox_worker.set_defaults(func=cmd_bbox_worker)

sub_migrate = sub.add_parser("migrate", help="Apply database migrations")
sub_migrate.set_defaults(func=cmd_migrate)

Expand Down
11 changes: 11 additions & 0 deletions src/flydesk_idp/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,17 @@ class IDPSettings(BaseSettings):
jobs_topic: str = "flydesk.idp.jobs"
jobs_event_type: str = "IDPJobSubmitted"
jobs_completed_event_type: str = "IDPJobCompleted"
# Second-stage destination for the out-of-band bbox refiner. Triggered
# by ``JobWorker`` after main extraction succeeds AND
# ``options.stages.bbox_refine == true``. Consumed by
# ``BboxRefineWorker``.
bbox_refine_topic: str = "flydesk.idp.bbox.refine"
bbox_refine_event_type: str = "IDPBboxRefineRequested"
# Retry budget + timeout for the bbox refine leg, independent of the
# main extraction. Refinement is CPU-bound (PyMuPDF / OCR) so the
# default ceiling is generous.
bbox_refine_max_attempts: int = 3
bbox_refine_timeout_s: int = 600

# -- Extraction -----------------------------------------------------
model: str = "anthropic:claude-sonnet-4-6"
Expand Down
6 changes: 6 additions & 0 deletions src/flydesk_idp/core/services/jobs/get_job_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,10 @@ async def do_handle(self, query: GetJobQuery) -> JobStatusResponse | None:
attempts=job.attempts,
error_code=job.error_code,
error_message=job.error_message,
bbox_refine_status=job.bbox_refine_status,
bbox_refine_attempts=job.bbox_refine_attempts or 0,
bbox_refine_started_at=job.bbox_refine_started_at,
bbox_refine_finished_at=job.bbox_refine_finished_at,
bbox_refine_error_code=job.bbox_refine_error_code,
bbox_refine_error_message=job.bbox_refine_error_message,
)
69 changes: 66 additions & 3 deletions src/flydesk_idp/core/services/jobs/get_job_result_handler.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,24 @@
# Copyright 2026 Firefly Software Solutions Inc
"""``GetJobResultHandler`` -- terminal result for a SUCCEEDED job."""
"""``GetJobResultHandler`` -- result reader for jobs with a result available.

Supports both:

* **Fully complete** -- ``status == SUCCEEDED``: returns the grounded
result (or the LLM-bbox result if bbox_refine was disabled).
* **Partial** -- ``status in {PARTIAL_SUCCEEDED, REFINING_BBOXES}``:
returns the LLM-bbox result so callers don't have to wait for the
out-of-band refiner before consuming field values. Callers that need
the grounded version pass ``wait_for_bboxes=true`` so the handler
polls until the refiner finishes (or a timeout fires).

Anything earlier than ``PARTIAL_SUCCEEDED`` (``QUEUED`` / ``RUNNING``)
or terminal-without-result (``FAILED`` / ``CANCELLED``) raises
:class:`JobNotReady` so the REST controller can surface an RFC 7807 409.
"""

from __future__ import annotations

import asyncio
from dataclasses import dataclass

from pyfly.container import service
Expand All @@ -13,10 +29,20 @@
from flydesk_idp.interfaces.enums.job_status import JobStatus
from flydesk_idp.models.repositories import ExtractionJobRepository

# Statuses we never block on -- they will never produce a result no
# matter how long we wait.
_TERMINAL_NO_RESULT = (JobStatus.FAILED, JobStatus.CANCELLED)


@dataclass(frozen=True)
class GetJobResultQuery(Query[JobResult | None]):
job_id: str
# Long-poll knobs. ``wait_for_bboxes`` blocks the request until the
# refiner finishes (status -> SUCCEEDED) or ``timeout_s`` elapses;
# at timeout the handler returns whatever's currently persisted.
wait_for_bboxes: bool = False
timeout_s: float = 60.0
poll_interval_s: float = 1.0


class JobNotReady(RuntimeError):
Expand All @@ -37,12 +63,49 @@ async def do_handle(self, query: GetJobResultQuery) -> JobResult | None:
job = await self._repository.get(query.job_id)
if job is None:
return None

# Optional long-poll for callers that want grounded bboxes only.
# We block while the refiner is in flight, returning whatever's
# in the row at timeout (which is always the partial result --
# never None, since PARTIAL_SUCCEEDED requires result_json).
if query.wait_for_bboxes:
polled = await self._poll_for_terminal(query)
if polled is None:
# Job was deleted under us mid-poll; treat as not-found.
return None
job = polled

status = JobStatus(job.status)
if status != JobStatus.SUCCEEDED:
if not status.has_result:
if status in _TERMINAL_NO_RESULT:
raise JobNotReady(job.id, status)
raise JobNotReady(job.id, status)
if not job.result_json:
raise RuntimeError(f"Job {job.id} is SUCCEEDED but has no result_json")
raise RuntimeError(f"Job {job.id} has status {status.value} but no result_json")
return JobResult(
job_id=job.id,
result=ExtractionResult.model_validate(job.result_json),
)

async def _poll_for_terminal(self, query: GetJobResultQuery):
"""Block until status hits SUCCEEDED / FAILED / CANCELLED or timeout.

Returns whatever's currently persisted; never raises on timeout
(the partial result is still a valid response shape).
"""
deadline = asyncio.get_running_loop().time() + max(0.0, query.timeout_s)
interval = max(0.1, query.poll_interval_s)
last = await self._repository.get(query.job_id)
while last is not None:
status = JobStatus(last.status)
if status.is_terminal:
return last
if asyncio.get_running_loop().time() >= deadline:
return last
await asyncio.sleep(interval)
last = await self._repository.get(query.job_id)
# Job was deleted under us; fall through to outer handler which
# will raise JobNotReady on the stale status. ``last`` is None
# here only when the job disappeared, which today never happens
# but is correctly defensive.
return last # type: ignore[return-value]
Loading
Loading