diff --git a/docker-compose.yml b/docker-compose.yml index 2a840fc..4ef8743 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -126,5 +126,36 @@ services: gotenberg: condition: service_healthy + # Second-stage EDA consumer: subscribes to ``flydesk.idp.bbox.refine`` + # events that ``worker`` publishes when an extraction finishes with + # ``options.stages.bbox_refine == true``. Grounds bboxes against the + # PDF text layer / configured OCR engine, persists the refined result, + # transitions the job ``PARTIAL_SUCCEEDED -> SUCCEEDED``, and fires the + # final webhook. Same image as the API + worker; only the entrypoint + # differs. + bbox-worker: + build: *service-build + image: flydesk-idp:latest + container_name: flydesk-idp-bbox-worker + command: ["bbox-worker"] + env_file: + - path: .env + required: false + environment: + FLYDESK_IDP_DATABASE_URL: postgresql+asyncpg://idp:idp@postgres:5432/flydesk_idp + FLYDESK_IDP_REDIS_URL: redis://redis:6379/0 + FLYDESK_IDP_EDA_ADAPTER: postgres + FLYDESK_IDP_MODEL: ${FLYDESK_IDP_MODEL:-anthropic:claude-sonnet-4-6} + FLYDESK_IDP_OFFICE_CONVERTER: ${FLYDESK_IDP_OFFICE_CONVERTER:-gotenberg} + FLYDESK_IDP_GOTENBERG_URL: http://gotenberg:3000 + ANTHROPIC_API_KEY: ${ANTHROPIC_API_KEY:-} + OPENAI_API_KEY: ${OPENAI_API_KEY:-} + RUN_MIGRATIONS: "false" + depends_on: + api: + condition: service_healthy + gotenberg: + condition: service_healthy + volumes: postgres_data: diff --git a/docs/overview.md b/docs/overview.md index 61a4316..6b03d9e 100644 --- a/docs/overview.md +++ b/docs/overview.md @@ -39,7 +39,13 @@ normally have to glue together themselves: the workflow that called us. The same call works synchronously (blocking, sub-minute) or as a -queue-backed async job with an HMAC-signed webhook. +queue-backed async job with an HMAC-signed webhook. When bbox grounding +is enabled, async jobs go through a two-stage state machine: the main +extraction lands in ``PARTIAL_SUCCEEDED`` with the LLM-bbox result +immediately readable, then a second EDA worker grounds the coordinates +and flips the job to ``SUCCEEDED``. Callers can poll status, fetch the +partial result, or long-poll ``GET /api/v1/jobs/{id}/result?wait_for_bboxes=true`` +to block until grounding finishes. --- diff --git a/migrations/versions/20260515_0002_bbox_refine_columns.py b/migrations/versions/20260515_0002_bbox_refine_columns.py new file mode 100644 index 0000000..10e7a5f --- /dev/null +++ b/migrations/versions/20260515_0002_bbox_refine_columns.py @@ -0,0 +1,77 @@ +# Copyright 2026 Firefly Software Solutions Inc +"""Add bbox-refine leg columns to ``extraction_jobs``. + +Revision ID: 0002_bbox_refine_columns +Revises: 0001_init +Create Date: 2026-05-15 + +Adds the per-job state for the out-of-band bbox refinement worker: + +* ``bbox_refine_status`` -- ``pending`` / ``running`` / ``succeeded`` + / ``failed`` / ``null``. +* ``bbox_refine_attempts`` -- retry counter for the refine leg, + independent of the main extraction's + ``attempts``. +* ``bbox_refine_started_at`` -- timestamp the refine worker first + transitioned the leg to ``running``. +* ``bbox_refine_finished_at`` -- timestamp the leg reached a terminal + sub-state. +* ``bbox_refine_error_code`` -- stable code if the refine leg failed + permanently; main result still readable. +* ``bbox_refine_error_message`` -- free-form message paired with the code. + +All columns are nullable: jobs submitted without ``stages.bbox_refine`` +never populate them and the default flow (``QUEUED -> RUNNING -> +SUCCEEDED``) is unchanged. +""" + +from __future__ import annotations + +import sqlalchemy as sa +from alembic import op + +revision = "0002_bbox_refine_columns" +down_revision = "0001_init" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.add_column( + "extraction_jobs", + sa.Column("bbox_refine_status", sa.String(length=16), nullable=True), + ) + op.add_column( + "extraction_jobs", + sa.Column( + "bbox_refine_attempts", + sa.Integer(), + nullable=False, + server_default="0", + ), + ) + op.add_column( + "extraction_jobs", + sa.Column("bbox_refine_started_at", sa.DateTime(timezone=True), nullable=True), + ) + op.add_column( + "extraction_jobs", + sa.Column("bbox_refine_finished_at", sa.DateTime(timezone=True), nullable=True), + ) + op.add_column( + "extraction_jobs", + sa.Column("bbox_refine_error_code", sa.String(length=64), nullable=True), + ) + op.add_column( + "extraction_jobs", + sa.Column("bbox_refine_error_message", sa.Text(), nullable=True), + ) + + +def downgrade() -> None: + op.drop_column("extraction_jobs", "bbox_refine_error_message") + op.drop_column("extraction_jobs", "bbox_refine_error_code") + op.drop_column("extraction_jobs", "bbox_refine_finished_at") + op.drop_column("extraction_jobs", "bbox_refine_started_at") + op.drop_column("extraction_jobs", "bbox_refine_attempts") + op.drop_column("extraction_jobs", "bbox_refine_status") diff --git a/src/flydesk_idp/cli.py b/src/flydesk_idp/cli.py index 0cdbec6..bcc6aff 100644 --- a/src/flydesk_idp/cli.py +++ b/src/flydesk_idp/cli.py @@ -1,16 +1,20 @@ # Copyright 2026 Firefly Software Solutions Inc """CLI entry point for flydesk-idp. -Three subcommands: +Four subcommands: -* ``flydesk-idp serve`` -- run the FastAPI server on the configured port. -* ``flydesk-idp worker`` -- run the EDA worker that consumes the job topic. -* ``flydesk-idp migrate`` -- run ``alembic upgrade head`` against the DB. +* ``flydesk-idp serve`` -- run the FastAPI server on the configured port. +* ``flydesk-idp worker`` -- run the EDA worker that consumes the job topic. +* ``flydesk-idp bbox-worker`` -- run the second-stage EDA worker that grounds + bboxes for jobs whose extraction finished in + ``PARTIAL_SUCCEEDED``. +* ``flydesk-idp migrate`` -- run ``alembic upgrade head`` against the DB. ``serve`` lets uvicorn import ``flydesk_idp.main:app`` (pyfly drives the -lifecycle there). ``worker`` boots a minimal :class:`PyFlyApplication` -and pulls the :class:`JobWorker` out of the DI container; it never -constructs the worker itself, so the container owns every dependency. +lifecycle there). The workers boot minimal :class:`PyFlyApplication` +instances and pull their concrete worker classes out of the DI +container; they never construct the workers themselves, so the +container owns every dependency. """ from __future__ import annotations @@ -81,6 +85,41 @@ async def _run() -> None: return 0 +def cmd_bbox_worker(_: argparse.Namespace) -> int: + """Boot pyfly, resolve :class:`BboxRefineWorker`, run forever.""" + + async def _run() -> None: + from pyfly.core import PyFlyApplication + from pyfly.eda import EventPublisher + + from flydesk_idp.app import FlydeskIDPApplication + from flydesk_idp.config import IDPSettings + from flydesk_idp.core.services.bbox import BboxRefiner + from flydesk_idp.core.services.binary import BinaryNormalizer + from flydesk_idp.core.services.webhook import WebhookPublisher + from flydesk_idp.core.services.workers.bbox_refine_worker import BboxRefineWorker + from flydesk_idp.models.repositories import ExtractionJobRepository + + pyfly_app = PyFlyApplication(FlydeskIDPApplication) + await pyfly_app.startup() + try: + container = pyfly_app.context.container + worker = BboxRefineWorker( + repository=container.resolve(ExtractionJobRepository), + event_publisher=container.resolve(EventPublisher), + webhook=container.resolve(WebhookPublisher), + normalizer=container.resolve(BinaryNormalizer), + refiner=container.resolve(BboxRefiner), + settings=container.resolve(IDPSettings), + ) + await worker.run_forever() + finally: + await pyfly_app.shutdown() + + asyncio.run(_run()) + return 0 + + def cmd_migrate(_: argparse.Namespace) -> int: """Apply Alembic migrations.""" from alembic import command @@ -104,6 +143,9 @@ def build_parser() -> argparse.ArgumentParser: sub_worker = sub.add_parser("worker", help="Run the EDA worker") sub_worker.set_defaults(func=cmd_worker) + sub_bbox_worker = sub.add_parser("bbox-worker", help="Run the second-stage bbox refinement worker") + sub_bbox_worker.set_defaults(func=cmd_bbox_worker) + sub_migrate = sub.add_parser("migrate", help="Apply database migrations") sub_migrate.set_defaults(func=cmd_migrate) diff --git a/src/flydesk_idp/config.py b/src/flydesk_idp/config.py index eca15fc..43abc88 100644 --- a/src/flydesk_idp/config.py +++ b/src/flydesk_idp/config.py @@ -44,6 +44,17 @@ class IDPSettings(BaseSettings): jobs_topic: str = "flydesk.idp.jobs" jobs_event_type: str = "IDPJobSubmitted" jobs_completed_event_type: str = "IDPJobCompleted" + # Second-stage destination for the out-of-band bbox refiner. Triggered + # by ``JobWorker`` after main extraction succeeds AND + # ``options.stages.bbox_refine == true``. Consumed by + # ``BboxRefineWorker``. + bbox_refine_topic: str = "flydesk.idp.bbox.refine" + bbox_refine_event_type: str = "IDPBboxRefineRequested" + # Retry budget + timeout for the bbox refine leg, independent of the + # main extraction. Refinement is CPU-bound (PyMuPDF / OCR) so the + # default ceiling is generous. + bbox_refine_max_attempts: int = 3 + bbox_refine_timeout_s: int = 600 # -- Extraction ----------------------------------------------------- model: str = "anthropic:claude-sonnet-4-6" diff --git a/src/flydesk_idp/core/services/jobs/get_job_handler.py b/src/flydesk_idp/core/services/jobs/get_job_handler.py index afcec82..486e027 100644 --- a/src/flydesk_idp/core/services/jobs/get_job_handler.py +++ b/src/flydesk_idp/core/services/jobs/get_job_handler.py @@ -38,4 +38,10 @@ async def do_handle(self, query: GetJobQuery) -> JobStatusResponse | None: attempts=job.attempts, error_code=job.error_code, error_message=job.error_message, + bbox_refine_status=job.bbox_refine_status, + bbox_refine_attempts=job.bbox_refine_attempts or 0, + bbox_refine_started_at=job.bbox_refine_started_at, + bbox_refine_finished_at=job.bbox_refine_finished_at, + bbox_refine_error_code=job.bbox_refine_error_code, + bbox_refine_error_message=job.bbox_refine_error_message, ) diff --git a/src/flydesk_idp/core/services/jobs/get_job_result_handler.py b/src/flydesk_idp/core/services/jobs/get_job_result_handler.py index cf924d4..7eba340 100644 --- a/src/flydesk_idp/core/services/jobs/get_job_result_handler.py +++ b/src/flydesk_idp/core/services/jobs/get_job_result_handler.py @@ -1,8 +1,24 @@ # Copyright 2026 Firefly Software Solutions Inc -"""``GetJobResultHandler`` -- terminal result for a SUCCEEDED job.""" +"""``GetJobResultHandler`` -- result reader for jobs with a result available. + +Supports both: + +* **Fully complete** -- ``status == SUCCEEDED``: returns the grounded + result (or the LLM-bbox result if bbox_refine was disabled). +* **Partial** -- ``status in {PARTIAL_SUCCEEDED, REFINING_BBOXES}``: + returns the LLM-bbox result so callers don't have to wait for the + out-of-band refiner before consuming field values. Callers that need + the grounded version pass ``wait_for_bboxes=true`` so the handler + polls until the refiner finishes (or a timeout fires). + +Anything earlier than ``PARTIAL_SUCCEEDED`` (``QUEUED`` / ``RUNNING``) +or terminal-without-result (``FAILED`` / ``CANCELLED``) raises +:class:`JobNotReady` so the REST controller can surface an RFC 7807 409. +""" from __future__ import annotations +import asyncio from dataclasses import dataclass from pyfly.container import service @@ -13,10 +29,20 @@ from flydesk_idp.interfaces.enums.job_status import JobStatus from flydesk_idp.models.repositories import ExtractionJobRepository +# Statuses we never block on -- they will never produce a result no +# matter how long we wait. +_TERMINAL_NO_RESULT = (JobStatus.FAILED, JobStatus.CANCELLED) + @dataclass(frozen=True) class GetJobResultQuery(Query[JobResult | None]): job_id: str + # Long-poll knobs. ``wait_for_bboxes`` blocks the request until the + # refiner finishes (status -> SUCCEEDED) or ``timeout_s`` elapses; + # at timeout the handler returns whatever's currently persisted. + wait_for_bboxes: bool = False + timeout_s: float = 60.0 + poll_interval_s: float = 1.0 class JobNotReady(RuntimeError): @@ -37,12 +63,49 @@ async def do_handle(self, query: GetJobResultQuery) -> JobResult | None: job = await self._repository.get(query.job_id) if job is None: return None + + # Optional long-poll for callers that want grounded bboxes only. + # We block while the refiner is in flight, returning whatever's + # in the row at timeout (which is always the partial result -- + # never None, since PARTIAL_SUCCEEDED requires result_json). + if query.wait_for_bboxes: + polled = await self._poll_for_terminal(query) + if polled is None: + # Job was deleted under us mid-poll; treat as not-found. + return None + job = polled + status = JobStatus(job.status) - if status != JobStatus.SUCCEEDED: + if not status.has_result: + if status in _TERMINAL_NO_RESULT: + raise JobNotReady(job.id, status) raise JobNotReady(job.id, status) if not job.result_json: - raise RuntimeError(f"Job {job.id} is SUCCEEDED but has no result_json") + raise RuntimeError(f"Job {job.id} has status {status.value} but no result_json") return JobResult( job_id=job.id, result=ExtractionResult.model_validate(job.result_json), ) + + async def _poll_for_terminal(self, query: GetJobResultQuery): + """Block until status hits SUCCEEDED / FAILED / CANCELLED or timeout. + + Returns whatever's currently persisted; never raises on timeout + (the partial result is still a valid response shape). + """ + deadline = asyncio.get_running_loop().time() + max(0.0, query.timeout_s) + interval = max(0.1, query.poll_interval_s) + last = await self._repository.get(query.job_id) + while last is not None: + status = JobStatus(last.status) + if status.is_terminal: + return last + if asyncio.get_running_loop().time() >= deadline: + return last + await asyncio.sleep(interval) + last = await self._repository.get(query.job_id) + # Job was deleted under us; fall through to outer handler which + # will raise JobNotReady on the stale status. ``last`` is None + # here only when the job disappeared, which today never happens + # but is correctly defensive. + return last # type: ignore[return-value] diff --git a/src/flydesk_idp/core/services/workers/bbox_refine_worker.py b/src/flydesk_idp/core/services/workers/bbox_refine_worker.py new file mode 100644 index 0000000..0aae59a --- /dev/null +++ b/src/flydesk_idp/core/services/workers/bbox_refine_worker.py @@ -0,0 +1,330 @@ +# Copyright 2026 Firefly Software Solutions Inc +"""``BboxRefineWorker`` -- second-stage EDA worker for grounded bbox refinement. + +Subscribes to ``IDPSettings.bbox_refine_event_type`` on +``IDPSettings.bbox_refine_topic``. Each event carries one ``job_id`` +whose main extraction has already finished with +``JobStatus.PARTIAL_SUCCEEDED`` and whose ``options.stages.bbox_refine`` +was ``true``. + +Per-event lifecycle: + +1. Load the job row. +2. Skip if the job is already past ``REFINING_BBOXES`` (idempotent + re-delivery from at-least-once buses is normal). +3. Transition ``PARTIAL_SUCCEEDED -> REFINING_BBOXES`` and bump the + refine attempts counter atomically. +4. Re-run :class:`BinaryNormalizer` on the saved input bytes to recover + the per-file LLM-renderable rows. (Deterministic; cheaper than + persisting the normalised bytes alongside the job.) +5. For each :class:`ExtractedDocument` in the persisted result, find + the matching normalised binary by ``source_file`` and call + :class:`BboxRefiner.refine` against that document's field groups. +6. Re-serialise the mutated result, transition the job to + :class:`JobStatus.SUCCEEDED`, and fire the final webhook. + +Failures degrade gracefully: the partial result is **never** dropped. +Retryable errors (timeouts, transient OCR engine failures) re-publish +the same event with exponential backoff up to +``IDPSettings.bbox_refine_max_attempts``; permanent errors mark the +refine leg ``failed`` and the job reverts to ``PARTIAL_SUCCEEDED`` with +its LLM-bbox result intact. +""" + +from __future__ import annotations + +import asyncio +import base64 +import logging +import random +import socket +import time +from datetime import UTC, datetime +from typing import Any + +from pyfly.eda import EventEnvelope, EventPublisher + +from flydesk_idp.config import IDPSettings +from flydesk_idp.core.observability import log_outbound +from flydesk_idp.core.services.bbox import BboxRefiner +from flydesk_idp.core.services.binary import BinaryNormalizer, NormalisedBinary +from flydesk_idp.core.services.webhook import WebhookPublisher +from flydesk_idp.interfaces.dtos.extract import ExtractionResult +from flydesk_idp.interfaces.dtos.webhook import JobWebhookPayload +from flydesk_idp.interfaces.enums.job_status import JobStatus +from flydesk_idp.models.repositories import ExtractionJobRepository + +logger = logging.getLogger(__name__) + + +# Same permanent-error hints the JobWorker uses; the refiner can hit +# the same provider-side failure classes via OCR adapters. +_PERMANENT_ERROR_HINTS: tuple[str, ...] = ( + "content policy", + "content_filter", + "moderation", + "invalid api key", + "incorrect api key", + "unsupported model", + "model_not_found", + "input_validation_error", + "invalid_request_error", +) + + +def _is_permanent(exc: Exception) -> bool: + if isinstance(exc, (ValueError, TypeError)): + return True + message = str(exc).lower() + return any(hint in message for hint in _PERMANENT_ERROR_HINTS) + + +class BboxRefineWorker: + """Second-stage EDA consumer: ground bboxes after main extraction.""" + + def __init__( + self, + *, + repository: ExtractionJobRepository, + event_publisher: EventPublisher, + webhook: WebhookPublisher, + normalizer: BinaryNormalizer, + refiner: BboxRefiner, + settings: IDPSettings, + consumer_id: str | None = None, + ) -> None: + self._repository = repository + self._publisher = event_publisher + self._webhook = webhook + self._normalizer = normalizer + self._refiner = refiner + self._settings = settings + self._consumer_id = consumer_id or f"bbox-worker-{socket.gethostname()}" + self._stop = asyncio.Event() + + async def run_forever(self) -> None: + # Subscribe before start() -- the EDA adapters only spin up the + # consumer loop when at least one handler is registered. + self._publisher.subscribe(self._settings.bbox_refine_event_type, self._on_event) + await self._publisher.start() + logger.info( + "BboxRefineWorker %s started (adapter=%s, destination=%s, event_type=%s)", + self._consumer_id, + self._settings.eda_adapter, + self._settings.bbox_refine_topic, + self._settings.bbox_refine_event_type, + ) + try: + await self._stop.wait() + finally: + await self._publisher.stop() + + def stop(self) -> None: + self._stop.set() + + # ------------------------------------------------------------------ + + async def _on_event(self, envelope: EventEnvelope) -> None: + job_id = envelope.payload.get("job_id") if isinstance(envelope.payload, dict) else None + if not job_id: + logger.warning( + "Received %s event without job_id: %r -- dropping", + envelope.event_type, + envelope.payload, + ) + return + await self._process(str(job_id)) + + async def _process(self, job_id: str) -> None: + job = await self._repository.get(job_id) + if job is None: + logger.warning("EDA delivered unknown bbox-refine job %s -- dropping", job_id) + return + current = JobStatus(job.status) + # Idempotent re-delivery guard: only PARTIAL_SUCCEEDED is the + # legal entry state. SUCCEEDED / FAILED / CANCELLED / REFINING + # mean someone else handled this already. + if current != JobStatus.PARTIAL_SUCCEEDED: + logger.info( + "Skipping bbox refine for job %s: status=%s (not PARTIAL_SUCCEEDED)", + job.id, + current.value, + ) + return + + job = await self._repository.mark_bbox_refining(job.id) or job + attempts = job.bbox_refine_attempts or 1 + log_outbound( + "bbox-worker", + op="bbox.refine", + status="started", + latency_ms=0.0, + job_id=job.id, + attempt=attempts, + ) + + started = time.monotonic() + try: + refined = await asyncio.wait_for( + self._refine_job_result(job), + timeout=self._settings.bbox_refine_timeout_s, + ) + await self._repository.mark_bbox_refined( + job.id, result=refined.model_dump(mode="json", by_alias=True) + ) + log_outbound( + "bbox-worker", + op="bbox.refine", + status="ok", + latency_ms=(time.monotonic() - started) * 1000, + job_id=job.id, + attempt=attempts, + ) + await self._fire_webhook( + job_id=job.id, + status=JobStatus.SUCCEEDED, + result=refined, + metadata=job.metadata_json or {}, + callback_url=job.callback_url, + correlation=_extract_correlation(job.metadata_json), + ) + except Exception as exc: # noqa: BLE001 + permanent = _is_permanent(exc) + exhausted = attempts >= self._settings.bbox_refine_max_attempts + terminal = permanent or exhausted + error_code = "PERMANENT_ERROR" if permanent else "BBOX_REFINE_FAILED" + log_outbound( + "bbox-worker", + op="bbox.refine", + status="error", + latency_ms=(time.monotonic() - started) * 1000, + job_id=job.id, + attempt=attempts, + permanent=permanent, + exhausted=exhausted, + error=type(exc).__name__, + ) + if terminal: + await self._repository.mark_bbox_refine_failed(job.id, code=error_code, message=str(exc)) + # No webhook on bbox-refine permanent failure: the caller + # already received the ``idp.job.partial`` payload with + # the LLM-bbox result; nothing new to deliver. + else: + delay = self._backoff_delay(attempts) + logger.warning( + "Bbox refine for job %s failed attempt %d (%s); re-publishing in %.1fs", + job.id, + attempts, + exc, + delay, + ) + # Revert to PARTIAL_SUCCEEDED so the next delivery's + # status check passes. + await self._repository.update( + job.id, + status=JobStatus.PARTIAL_SUCCEEDED.value, + bbox_refine_status="pending", + ) + asyncio.create_task(self._delayed_publish(job.id, delay)) + + # ------------------------------------------------------------------ + + async def _refine_job_result(self, job: Any) -> ExtractionResult: + """Reconstruct the per-document bytes + run the refiner per doc.""" + if not job.result_json: + raise ValueError(f"job {job.id} has no result_json to refine") + result = ExtractionResult.model_validate(job.result_json) + + schema = job.schema_json or {} + # ``schema_json`` carries the original document bytes the submit + # handler stored. Single-file shape is the only one persisted + # today (multi-file requests fan out at orchestrator time). + encoded = schema.get("document_content_base64") or "" + if not encoded: + raise ValueError(f"job {job.id} has no document_content_base64 in schema_json") + document_bytes = base64.b64decode(encoded) + + normalised: list[NormalisedBinary] = await self._normalizer.normalise( + document_bytes, + declared_media_type=schema.get("document_content_type"), + filename=job.filename, + ) + # Index by filename for source_file lookups. Multi-row inputs + # carry their normalised filename in ``row.filename``. + by_filename: dict[str, NormalisedBinary] = {row.filename: row for row in normalised} + # Fallback row when ``source_file`` is null (legacy single-doc). + fallback = normalised[0] if normalised else None + + language_hint = (job.options_json or {}).get("language_hint") + + for document in result.documents: + if not document.fields: + continue + row = by_filename.get(document.source_file or "", fallback) + if row is None: + continue + await self._refiner.refine( + document_bytes=row.bytes, + media_type=row.media_type, + page_count=row.page_count, + groups=document.fields, + language_hint=language_hint, + ) + return result + + def _backoff_delay(self, attempts: int) -> float: + base = self._settings.retry_base_delay_s + ceiling = self._settings.retry_max_delay_s + raw = base * (2 ** max(0, attempts - 1)) + capped = min(ceiling, raw) + jitter = capped * 0.2 * random.random() + return capped + jitter + + async def _delayed_publish(self, job_id: str, delay_s: float) -> None: + try: + await asyncio.sleep(delay_s) + await self._publisher.publish( + destination=self._settings.bbox_refine_topic, + event_type=self._settings.bbox_refine_event_type, + payload={"job_id": job_id}, + ) + log_outbound( + "eda", + op="republish.bbox_refine", + status="ok", + latency_ms=delay_s * 1000, + job_id=job_id, + ) + except Exception as exc: # noqa: BLE001 + logger.error("Failed to re-publish bbox refine job %s after backoff: %s", job_id, exc) + + async def _fire_webhook( + self, + *, + job_id: str, + status: JobStatus, + result: ExtractionResult | None, + metadata: dict[str, Any], + callback_url: str | None, + correlation: dict[str, str] | None = None, + ) -> None: + if not callback_url: + return + clean_metadata = {k: v for k, v in (metadata or {}).items() if not k.startswith("_")} + payload = JobWebhookPayload( + job_id=job_id, + status=status, + occurred_at=datetime.now(UTC), + metadata=clean_metadata, + result=result, + ) + await self._webhook.deliver(callback_url, payload, extra_headers=correlation or {}) + + +def _extract_correlation(metadata: dict[str, Any] | None) -> dict[str, str]: + if not metadata: + return {} + raw = metadata.get("_correlation") + if not isinstance(raw, dict): + return {} + return {str(k): str(v) for k, v in raw.items() if v} diff --git a/src/flydesk_idp/core/services/workers/job_worker.py b/src/flydesk_idp/core/services/workers/job_worker.py index 7b84683..b8a300c 100644 --- a/src/flydesk_idp/core/services/workers/job_worker.py +++ b/src/flydesk_idp/core/services/workers/job_worker.py @@ -159,9 +159,18 @@ async def _process(self, job_id: str) -> None: result = await asyncio.wait_for( self._orchestrator.execute(request), timeout=self._settings.async_timeout_s ) - await self._repository.mark_succeeded( - job.id, result=result.model_dump(mode="json", by_alias=True) - ) + result_payload = result.model_dump(mode="json", by_alias=True) + # Branch on bbox_refine: when the caller asked for grounded + # coordinates, the job becomes ``PARTIAL_SUCCEEDED`` here and + # the actual grounding is delegated to ``BboxRefineWorker`` via + # a second EDA event. The result is already readable -- only + # the bboxes change between PARTIAL_SUCCEEDED and SUCCEEDED. + wants_bbox_refine = bool(getattr(request.options.stages, "bbox_refine", False)) + terminal_status = JobStatus.PARTIAL_SUCCEEDED if wants_bbox_refine else JobStatus.SUCCEEDED + if wants_bbox_refine: + await self._repository.mark_partial_succeeded(job.id, result=result_payload) + else: + await self._repository.mark_succeeded(job.id, result=result_payload) log_outbound( "worker", op="job.run", @@ -169,15 +178,31 @@ async def _process(self, job_id: str) -> None: latency_ms=(time.monotonic() - started) * 1000, job_id=job.id, attempt=attempts, + terminal=terminal_status.value, ) await self._fire_webhook( job_id=job.id, - status=JobStatus.SUCCEEDED, + status=terminal_status, result=result, metadata=job.metadata_json or {}, callback_url=job.callback_url, correlation=_extract_correlation(job.metadata_json), ) + if wants_bbox_refine: + await self._publisher.publish( + destination=self._settings.bbox_refine_topic, + event_type=self._settings.bbox_refine_event_type, + payload={"job_id": job.id}, + headers=_extract_correlation(job.metadata_json), + ) + log_outbound( + "eda", + op="publish.bbox_refine", + status="ok", + latency_ms=0.0, + job_id=job.id, + destination=self._settings.bbox_refine_topic, + ) except Exception as exc: # noqa: BLE001 permanent = _is_permanent(exc) exhausted = attempts >= self._settings.job_max_attempts diff --git a/src/flydesk_idp/interfaces/dtos/job.py b/src/flydesk_idp/interfaces/dtos/job.py index a8cb544..8b9495a 100644 --- a/src/flydesk_idp/interfaces/dtos/job.py +++ b/src/flydesk_idp/interfaces/dtos/job.py @@ -42,6 +42,20 @@ class JobStatusResponse(BaseModel): attempts: int = 0 error_code: str | None = None error_message: str | None = None + bbox_refine_status: str | None = Field( + default=None, + description=( + "Sub-state of the bbox-refine leg when ``options.stages.bbox_refine`` " + "was enabled at submit time. One of ``pending`` (event published, " + "worker hasn't picked it up), ``running``, ``succeeded``, ``failed``. " + "``null`` when the job didn't ask for refinement." + ), + ) + bbox_refine_attempts: int = 0 + bbox_refine_started_at: datetime | None = None + bbox_refine_finished_at: datetime | None = None + bbox_refine_error_code: str | None = None + bbox_refine_error_message: str | None = None class JobResult(BaseModel): diff --git a/src/flydesk_idp/interfaces/enums/job_status.py b/src/flydesk_idp/interfaces/enums/job_status.py index f8981da..76c3191 100644 --- a/src/flydesk_idp/interfaces/enums/job_status.py +++ b/src/flydesk_idp/interfaces/enums/job_status.py @@ -1,12 +1,26 @@ # Copyright 2026 Firefly Software Solutions Inc """Async-job lifecycle states. -The state machine is: +Two state machines live in parallel: + +**Default flow** (``options.stages.bbox_refine == false``):: QUEUED -> RUNNING -> SUCCEEDED | FAILED QUEUED -> CANCELLED (only while still QUEUED) -A job that has already started cannot be cancelled. +**Bbox-refine flow** (``options.stages.bbox_refine == true``):: + + QUEUED -> RUNNING -> PARTIAL_SUCCEEDED -> REFINING_BBOXES -> SUCCEEDED + \\-> (stays PARTIAL_SUCCEEDED if + bbox refine fails -- the + LLM-bbox result is still + readable; bbox_refine_status + column carries the failure) + +A job that has already started cannot be cancelled. ``PARTIAL_SUCCEEDED`` +results are queryable via ``GET /api/v1/jobs/{id}/result`` -- they carry +the full extraction with LLM-estimated bboxes; the grounded bboxes land +once the bbox refiner finishes and the status transitions to ``SUCCEEDED``. """ from __future__ import annotations @@ -17,10 +31,40 @@ class JobStatus(StrEnum): QUEUED = "QUEUED" RUNNING = "RUNNING" + PARTIAL_SUCCEEDED = "PARTIAL_SUCCEEDED" + REFINING_BBOXES = "REFINING_BBOXES" SUCCEEDED = "SUCCEEDED" FAILED = "FAILED" CANCELLED = "CANCELLED" @property def is_terminal(self) -> bool: + """True when no further state transition is expected for this job.""" return self in (JobStatus.SUCCEEDED, JobStatus.FAILED, JobStatus.CANCELLED) + + @property + def has_result(self) -> bool: + """True when the job carries a readable ExtractionResult. + + ``PARTIAL_SUCCEEDED`` and ``REFINING_BBOXES`` are readable too -- + the LLM-bbox version of the result is already persisted; bbox + grounding is an additive overlay that lands later. + """ + return self in ( + JobStatus.PARTIAL_SUCCEEDED, + JobStatus.REFINING_BBOXES, + JobStatus.SUCCEEDED, + ) + + +class BboxRefineStatus(StrEnum): + """Sub-state for the out-of-band bbox refinement leg. + + Populated only when ``options.stages.bbox_refine == true``. ``null`` + on the job row means the refiner was never requested (default flow). + """ + + PENDING = "pending" # event published, worker has not picked it up + RUNNING = "running" # worker is grounding bboxes right now + SUCCEEDED = "succeeded" # bboxes grounded, job is now ``SUCCEEDED`` + FAILED = "failed" # refiner failed; job stays ``PARTIAL_SUCCEEDED`` diff --git a/src/flydesk_idp/models/entities/extraction_job.py b/src/flydesk_idp/models/entities/extraction_job.py index f83e4a9..5e2e26e 100644 --- a/src/flydesk_idp/models/entities/extraction_job.py +++ b/src/flydesk_idp/models/entities/extraction_job.py @@ -56,6 +56,16 @@ class ExtractionJob(Base): started_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) finished_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) + # Bbox-refine leg state -- populated only when the caller enabled + # ``options.stages.bbox_refine``. ``null`` for jobs that never asked + # for grounding. See ``interfaces/enums/job_status.py::BboxRefineStatus``. + bbox_refine_status: Mapped[str | None] = mapped_column(String(16), nullable=True) + bbox_refine_attempts: Mapped[int] = mapped_column(Integer, nullable=False, default=0) + bbox_refine_started_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) + bbox_refine_finished_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) + bbox_refine_error_code: Mapped[str | None] = mapped_column(String(64), nullable=True) + bbox_refine_error_message: Mapped[str | None] = mapped_column(Text, nullable=True) + __table_args__ = ( Index( "uq_extraction_jobs_idempotency_key", diff --git a/src/flydesk_idp/models/repositories/extraction_job_repository.py b/src/flydesk_idp/models/repositories/extraction_job_repository.py index 7c61648..4ca5be2 100644 --- a/src/flydesk_idp/models/repositories/extraction_job_repository.py +++ b/src/flydesk_idp/models/repositories/extraction_job_repository.py @@ -122,6 +122,70 @@ async def mark_failed(self, job_id: str, *, code: str, message: str) -> Extracti async def mark_cancelled(self, job_id: str) -> ExtractionJob | None: return await self.update(job_id, status="CANCELLED", finished_at=_utcnow()) + # -- bbox-refine leg ---------------------------------------------- + + async def mark_partial_succeeded(self, job_id: str, *, result: dict[str, Any]) -> ExtractionJob | None: + """Main extraction done; bbox refine pending. + + Persists the LLM-bbox result, transitions the job to + ``PARTIAL_SUCCEEDED``, and stamps the bbox leg as ``pending``. + Callers reading ``GET /api/v1/jobs/{id}/result`` get the + ungrounded result immediately; grounded coordinates land once + the refine worker finishes. + """ + return await self.update( + job_id, + status="PARTIAL_SUCCEEDED", + result_json=result, + error_code=None, + error_message=None, + bbox_refine_status="pending", + ) + + async def mark_bbox_refining(self, job_id: str) -> ExtractionJob | None: + """Bbox worker has picked up the event and started grounding. + + Atomically transitions ``PARTIAL_SUCCEEDED`` -> ``REFINING_BBOXES`` + and increments the bbox attempt counter so retries are bounded. + """ + async with self._session_factory() as session: + job = await session.get(ExtractionJob, job_id) + if job is None: + return None + job.status = "REFINING_BBOXES" + job.bbox_refine_status = "running" + job.bbox_refine_started_at = _utcnow() + job.bbox_refine_attempts = (job.bbox_refine_attempts or 0) + 1 + await session.commit() + await session.refresh(job) + return job + + async def mark_bbox_refined(self, job_id: str, *, result: dict[str, Any]) -> ExtractionJob | None: + """Refiner produced grounded coordinates; flip to fully SUCCEEDED.""" + return await self.update( + job_id, + status="SUCCEEDED", + finished_at=_utcnow(), + result_json=result, + bbox_refine_status="succeeded", + bbox_refine_finished_at=_utcnow(), + bbox_refine_error_code=None, + bbox_refine_error_message=None, + ) + + async def mark_bbox_refine_failed(self, job_id: str, *, code: str, message: str) -> ExtractionJob | None: + """Refiner gave up; revert to ``PARTIAL_SUCCEEDED`` so the LLM-bbox + result stays readable. Failure context is captured on the row. + """ + return await self.update( + job_id, + status="PARTIAL_SUCCEEDED", + bbox_refine_status="failed", + bbox_refine_finished_at=_utcnow(), + bbox_refine_error_code=code, + bbox_refine_error_message=message, + ) + def _utcnow() -> datetime: return datetime.now(UTC) diff --git a/src/flydesk_idp/web/controllers/jobs_controller.py b/src/flydesk_idp/web/controllers/jobs_controller.py index 5d6bf97..cc92bb5 100644 --- a/src/flydesk_idp/web/controllers/jobs_controller.py +++ b/src/flydesk_idp/web/controllers/jobs_controller.py @@ -16,6 +16,7 @@ Body, Header, PathVar, + QueryParam, Valid, delete_mapping, get_mapping, @@ -112,15 +113,33 @@ async def get_status(self, job_id: PathVar[str]) -> JobStatusResponse: return status @get_mapping("/{job_id}/result") - async def get_result(self, job_id: PathVar[str]) -> JobResult: - """Fetch the final ``ExtractionResult`` of a finished job. - - Valid only once the job is ``SUCCEEDED`` -- while it's still - running or queued the endpoint returns ``409 job_not_ready``. - Unknown ``job_id`` returns ``404``. + async def get_result( + self, + job_id: PathVar[str], + wait_for_bboxes: QueryParam[bool] = False, + timeout: QueryParam[float] = 60.0, + ) -> JobResult: + """Fetch the ``ExtractionResult`` of a finished or partial job. + + Returns the result when the job is in ``SUCCEEDED``, + ``PARTIAL_SUCCEEDED``, or ``REFINING_BBOXES``. ``QUEUED`` / + ``RUNNING`` / ``FAILED`` / ``CANCELLED`` return + ``409 job_not_ready``. Unknown ``job_id`` returns ``404``. + + ``wait_for_bboxes=true`` long-polls the row until the bbox refiner + finishes (status -> ``SUCCEEDED``) or ``timeout`` (seconds, default + 60) elapses; on timeout the partial result is returned with the + LLM-bbox version intact. Useful for callers that submitted async + but need grounded coordinates inline. """ try: - result = await self._queries.query(GetJobResultQuery(job_id=job_id)) + result = await self._queries.query( + GetJobResultQuery( + job_id=job_id, + wait_for_bboxes=bool(wait_for_bboxes), + timeout_s=float(timeout), + ) + ) except JobNotReady as exc: raise _http_problem(409, "job_not_ready", "Job not ready", str(exc)) from exc if result is None: diff --git a/tests/unit/test_bbox_refine_worker.py b/tests/unit/test_bbox_refine_worker.py new file mode 100644 index 0000000..efaaa2a --- /dev/null +++ b/tests/unit/test_bbox_refine_worker.py @@ -0,0 +1,250 @@ +# Copyright 2026 Firefly Software Solutions Inc +"""``BboxRefineWorker`` -- second-stage EDA worker behaviour.""" + +from __future__ import annotations + +import base64 +import io +from dataclasses import dataclass, field +from datetime import UTC, datetime +from typing import Any + +import pytest +from reportlab.pdfgen import canvas + +from flydesk_idp.config import IDPSettings +from flydesk_idp.core.services.bbox import ( + BboxRefiner, + NoneOcrEngine, + PyMuPDFWordExtractor, + ValueMatcher, + WordRouter, +) +from flydesk_idp.core.services.binary import BinaryNormalizer +from flydesk_idp.core.services.binary.archive import ArchiveUnpacker +from flydesk_idp.core.services.binary.email import EmailUnpacker +from flydesk_idp.core.services.binary.image import ImageNormalizer +from flydesk_idp.core.services.binary.libreoffice import LibreOfficeConverter +from flydesk_idp.core.services.binary.pdf_guard import PdfGuard +from flydesk_idp.core.services.workers.bbox_refine_worker import BboxRefineWorker +from flydesk_idp.interfaces.dtos.bbox import BboxSource, BoundingBox +from flydesk_idp.interfaces.dtos.extract import ExtractedDocument, ExtractionResult +from flydesk_idp.interfaces.dtos.field import ExtractedField, ExtractedFieldGroup +from flydesk_idp.interfaces.enums.job_status import JobStatus + + +def _real_pdf() -> bytes: + buf = io.BytesIO() + c = canvas.Canvas(buf) + c.drawString(100, 750, "Customer: Acme Corporation Madrid") + c.showPage() + c.save() + return buf.getvalue() + + +def _result_with_field(value: str) -> ExtractionResult: + field_ = ExtractedField( + fieldName="customer_name", + fieldValueFound=value, + pagesFound=[1], + bbox=BoundingBox(xmin=0.05, ymin=0.05, xmax=0.95, ymax=0.95), + ) + group = ExtractedFieldGroup(fieldGroupName="customer", fieldGroupFields=[field_]) + doc = ExtractedDocument( + document_type="invoice", + pages=[1], + fields=[group], + source_file="invoice.pdf", + ) + return ExtractionResult( + request_id="00000000-0000-0000-0000-000000000001", + documents=[doc], + model="anthropic:claude-sonnet-4-6", + latency_ms=1000, + ) + + +# ----------------------------------------------------------------- stubs + + +@dataclass +class _StubJob: + id: str = "job-1" + status: str = JobStatus.PARTIAL_SUCCEEDED.value + filename: str = "invoice.pdf" + schema_json: dict[str, Any] = field(default_factory=dict) + options_json: dict[str, Any] = field(default_factory=dict) + result_json: dict[str, Any] = field(default_factory=dict) + metadata_json: dict[str, Any] = field(default_factory=dict) + callback_url: str | None = None + bbox_refine_status: str | None = "pending" + bbox_refine_attempts: int = 0 + started_at: datetime | None = None + finished_at: datetime | None = None + created_at: datetime = field(default_factory=lambda: datetime.now(UTC)) + + +class _StubRepo: + def __init__(self, job: _StubJob) -> None: + self.job = job + self.calls: list[tuple[str, dict[str, Any]]] = [] + + async def get(self, job_id: str) -> _StubJob | None: + return self.job if self.job.id == job_id else None + + async def mark_bbox_refining(self, job_id: str) -> _StubJob | None: + self.job.status = JobStatus.REFINING_BBOXES.value + self.job.bbox_refine_status = "running" + self.job.bbox_refine_attempts = (self.job.bbox_refine_attempts or 0) + 1 + self.calls.append(("mark_bbox_refining", {"job_id": job_id})) + return self.job + + async def mark_bbox_refined(self, job_id: str, *, result: dict[str, Any]) -> _StubJob | None: + self.job.status = JobStatus.SUCCEEDED.value + self.job.bbox_refine_status = "succeeded" + self.job.result_json = result + self.calls.append(("mark_bbox_refined", {"job_id": job_id})) + return self.job + + async def mark_bbox_refine_failed(self, job_id: str, *, code: str, message: str) -> _StubJob | None: + self.job.status = JobStatus.PARTIAL_SUCCEEDED.value + self.job.bbox_refine_status = "failed" + self.calls.append(("mark_bbox_refine_failed", {"code": code, "message": message})) + return self.job + + async def update(self, job_id: str, **changes: Any) -> _StubJob | None: + for k, v in changes.items(): + setattr(self.job, k, v) + self.calls.append(("update", changes)) + return self.job + + +class _StubPublisher: + def __init__(self) -> None: + self.published: list[dict[str, Any]] = [] + + async def publish(self, **kwargs: Any) -> None: + self.published.append(kwargs) + + +class _StubWebhook: + def __init__(self) -> None: + self.delivered: list[tuple[str, Any]] = [] + + async def deliver(self, url: str, payload: Any, *, extra_headers: dict[str, str]) -> None: + self.delivered.append((url, payload)) + + +def _make_normalizer() -> BinaryNormalizer: + settings = IDPSettings(office_converter="libreoffice") + return BinaryNormalizer( + settings=settings, + pdf_guard=PdfGuard(), + image=ImageNormalizer(), + office=LibreOfficeConverter(settings=settings), + archive=ArchiveUnpacker(settings=settings), + email_=EmailUnpacker(), + ) + + +def _make_refiner() -> BboxRefiner: + settings = IDPSettings(bbox_refine_threshold=0.85, bbox_refine_min_text_words=3) + return BboxRefiner( + router=WordRouter(pymupdf=PyMuPDFWordExtractor(settings), ocr=NoneOcrEngine()), + matcher=ValueMatcher(settings), + ) + + +def _make_worker(repo: _StubRepo, publisher: _StubPublisher, webhook: _StubWebhook) -> BboxRefineWorker: + return BboxRefineWorker( + repository=repo, # type: ignore[arg-type] + event_publisher=publisher, # type: ignore[arg-type] + webhook=webhook, # type: ignore[arg-type] + normalizer=_make_normalizer(), + refiner=_make_refiner(), + settings=IDPSettings(), + ) + + +# ----------------------------------------------------------------- tests + + +@pytest.mark.asyncio +async def test_grounds_partial_succeeded_job_and_transitions_to_succeeded() -> None: + pdf = _real_pdf() + job = _StubJob( + schema_json={ + "document_content_base64": base64.b64encode(pdf).decode(), + "document_content_type": "application/pdf", + }, + result_json=_result_with_field("Acme Corporation").model_dump(mode="json", by_alias=True), + ) + repo = _StubRepo(job) + publisher = _StubPublisher() + webhook = _StubWebhook() + worker = _make_worker(repo, publisher, webhook) + + await worker._process(job.id) + + assert job.status == JobStatus.SUCCEEDED.value + assert job.bbox_refine_status == "succeeded" + assert [name for name, _ in repo.calls] == ["mark_bbox_refining", "mark_bbox_refined"] + # No webhook delivered because the stub job has no callback_url. + assert webhook.delivered == [] + # No retry was scheduled. + assert publisher.published == [] + # The refined result should now carry source=pdf_text on the field. + refined = ExtractionResult.model_validate(job.result_json) + field_ = refined.documents[0].fields[0].fieldGroupFields[0] + assert field_.bbox.source == BboxSource.PDF_TEXT + + +@pytest.mark.asyncio +async def test_skips_jobs_not_in_partial_succeeded() -> None: + job = _StubJob(status=JobStatus.SUCCEEDED.value, result_json={}) + repo = _StubRepo(job) + publisher = _StubPublisher() + webhook = _StubWebhook() + worker = _make_worker(repo, publisher, webhook) + + await worker._process(job.id) + + # No state transitions, no retries, no webhooks. + assert repo.calls == [] + assert webhook.delivered == [] + assert publisher.published == [] + + +@pytest.mark.asyncio +async def test_drops_unknown_job_id() -> None: + repo = _StubRepo(_StubJob(id="other")) + publisher = _StubPublisher() + webhook = _StubWebhook() + worker = _make_worker(repo, publisher, webhook) + + await worker._process("missing") + + assert repo.calls == [] + assert webhook.delivered == [] + assert publisher.published == [] + + +@pytest.mark.asyncio +async def test_permanent_error_marks_failed_no_republish() -> None: + # Empty schema_json triggers a permanent ValueError ("no document_content_base64"). + job = _StubJob( + schema_json={}, + result_json=_result_with_field("Acme").model_dump(mode="json", by_alias=True), + ) + repo = _StubRepo(job) + publisher = _StubPublisher() + webhook = _StubWebhook() + worker = _make_worker(repo, publisher, webhook) + + await worker._process(job.id) + + assert job.status == JobStatus.PARTIAL_SUCCEEDED.value + assert job.bbox_refine_status == "failed" + names = [name for name, _ in repo.calls] + assert "mark_bbox_refine_failed" in names + assert publisher.published == [] # never republish on permanent diff --git a/tests/unit/test_get_job_result_handler.py b/tests/unit/test_get_job_result_handler.py new file mode 100644 index 0000000..7eb3ec3 --- /dev/null +++ b/tests/unit/test_get_job_result_handler.py @@ -0,0 +1,145 @@ +# Copyright 2026 Firefly Software Solutions Inc +"""``GetJobResultHandler`` -- partial result reads + wait_for_bboxes long-poll.""" + +from __future__ import annotations + +import asyncio +from dataclasses import dataclass, field +from datetime import UTC, datetime +from typing import Any + +import pytest + +from flydesk_idp.core.services.jobs.get_job_result_handler import ( + GetJobResultHandler, + GetJobResultQuery, + JobNotReady, +) +from flydesk_idp.interfaces.dtos.extract import ExtractionResult +from flydesk_idp.interfaces.enums.job_status import JobStatus + + +def _result_payload() -> dict[str, Any]: + return ExtractionResult( + request_id="00000000-0000-0000-0000-000000000001", + documents=[], + model="m", + latency_ms=10, + ).model_dump(mode="json", by_alias=True) + + +@dataclass +class _StubJob: + id: str = "job-1" + status: str = JobStatus.SUCCEEDED.value + result_json: dict[str, Any] | None = field(default_factory=_result_payload) + created_at: datetime = field(default_factory=lambda: datetime.now(UTC)) + + +class _StubRepo: + """Repository stub that can flip status mid-poll to simulate the refiner.""" + + def __init__(self, job: _StubJob, *, flip_to: str | None = None, after_calls: int = 0) -> None: + self.job = job + self.flip_to = flip_to + self.after_calls = after_calls + self.calls = 0 + + async def get(self, job_id: str) -> _StubJob | None: + self.calls += 1 + if self.flip_to is not None and self.calls > self.after_calls: + self.job.status = self.flip_to + return self.job if self.job.id == job_id else None + + +@pytest.mark.asyncio +async def test_returns_result_for_succeeded() -> None: + handler = GetJobResultHandler(repository=_StubRepo(_StubJob())) # type: ignore[arg-type] + out = await handler.do_handle(GetJobResultQuery(job_id="job-1")) + assert out is not None + assert out.job_id == "job-1" + + +@pytest.mark.asyncio +async def test_returns_partial_result_when_status_partial_succeeded() -> None: + job = _StubJob(status=JobStatus.PARTIAL_SUCCEEDED.value) + handler = GetJobResultHandler(repository=_StubRepo(job)) # type: ignore[arg-type] + out = await handler.do_handle(GetJobResultQuery(job_id="job-1")) + assert out is not None + assert out.job_id == "job-1" + + +@pytest.mark.asyncio +async def test_returns_partial_result_when_status_refining_bboxes() -> None: + job = _StubJob(status=JobStatus.REFINING_BBOXES.value) + handler = GetJobResultHandler(repository=_StubRepo(job)) # type: ignore[arg-type] + out = await handler.do_handle(GetJobResultQuery(job_id="job-1")) + assert out is not None + + +@pytest.mark.asyncio +async def test_raises_job_not_ready_for_queued() -> None: + job = _StubJob(status=JobStatus.QUEUED.value, result_json=None) + handler = GetJobResultHandler(repository=_StubRepo(job)) # type: ignore[arg-type] + with pytest.raises(JobNotReady) as ei: + await handler.do_handle(GetJobResultQuery(job_id="job-1")) + assert ei.value.status == JobStatus.QUEUED + + +@pytest.mark.asyncio +async def test_raises_job_not_ready_for_failed() -> None: + job = _StubJob(status=JobStatus.FAILED.value, result_json=None) + handler = GetJobResultHandler(repository=_StubRepo(job)) # type: ignore[arg-type] + with pytest.raises(JobNotReady) as ei: + await handler.do_handle(GetJobResultQuery(job_id="job-1")) + assert ei.value.status == JobStatus.FAILED + + +@pytest.mark.asyncio +async def test_returns_none_for_unknown_job() -> None: + handler = GetJobResultHandler(repository=_StubRepo(_StubJob(id="other"))) # type: ignore[arg-type] + out = await handler.do_handle(GetJobResultQuery(job_id="missing")) + assert out is None + + +@pytest.mark.asyncio +async def test_wait_for_bboxes_returns_partial_at_timeout() -> None: + # Job stays in PARTIAL_SUCCEEDED throughout; poll should return that + # state (with its result) once the deadline elapses. + job = _StubJob(status=JobStatus.PARTIAL_SUCCEEDED.value) + handler = GetJobResultHandler(repository=_StubRepo(job)) # type: ignore[arg-type] + started = asyncio.get_running_loop().time() + out = await handler.do_handle( + GetJobResultQuery( + job_id="job-1", + wait_for_bboxes=True, + timeout_s=0.3, + poll_interval_s=0.1, + ) + ) + elapsed = asyncio.get_running_loop().time() - started + assert out is not None + assert 0.2 < elapsed < 1.0 # respected the timeout, didn't return instantly + + +@pytest.mark.asyncio +async def test_wait_for_bboxes_returns_early_when_status_flips_to_succeeded() -> None: + # Job starts in PARTIAL_SUCCEEDED; after 2 polls the stub flips it to + # SUCCEEDED -- handler should return before the timeout fires. + job = _StubJob(status=JobStatus.PARTIAL_SUCCEEDED.value) + handler = GetJobResultHandler( + repository=_StubRepo(job, flip_to=JobStatus.SUCCEEDED.value, after_calls=2) # type: ignore[arg-type] + ) + started = asyncio.get_running_loop().time() + out = await handler.do_handle( + GetJobResultQuery( + job_id="job-1", + wait_for_bboxes=True, + timeout_s=10.0, + poll_interval_s=0.1, + ) + ) + elapsed = asyncio.get_running_loop().time() - started + assert out is not None + assert job.status == JobStatus.SUCCEEDED.value + assert elapsed < 5.0 # well under the 10s timeout diff --git a/tests/unit/test_job_status_enum.py b/tests/unit/test_job_status_enum.py new file mode 100644 index 0000000..1750cec --- /dev/null +++ b/tests/unit/test_job_status_enum.py @@ -0,0 +1,53 @@ +# Copyright 2026 Firefly Software Solutions Inc +"""``JobStatus`` semantic predicates -- terminal + has_result invariants.""" + +from __future__ import annotations + +import pytest + +from flydesk_idp.interfaces.enums.job_status import BboxRefineStatus, JobStatus + + +@pytest.mark.parametrize( + "status", + [JobStatus.SUCCEEDED, JobStatus.FAILED, JobStatus.CANCELLED], +) +def test_terminal_statuses(status: JobStatus) -> None: + assert status.is_terminal + + +@pytest.mark.parametrize( + "status", + [ + JobStatus.QUEUED, + JobStatus.RUNNING, + JobStatus.PARTIAL_SUCCEEDED, + JobStatus.REFINING_BBOXES, + ], +) +def test_non_terminal_statuses(status: JobStatus) -> None: + assert not status.is_terminal + + +@pytest.mark.parametrize( + "status", + [JobStatus.SUCCEEDED, JobStatus.PARTIAL_SUCCEEDED, JobStatus.REFINING_BBOXES], +) +def test_statuses_with_readable_result(status: JobStatus) -> None: + assert status.has_result + + +@pytest.mark.parametrize( + "status", + [JobStatus.QUEUED, JobStatus.RUNNING, JobStatus.FAILED, JobStatus.CANCELLED], +) +def test_statuses_without_readable_result(status: JobStatus) -> None: + assert not status.has_result + + +def test_bbox_refine_status_values() -> None: + # Stable wire values -- the migration + repository depend on these strings. + assert BboxRefineStatus.PENDING.value == "pending" + assert BboxRefineStatus.RUNNING.value == "running" + assert BboxRefineStatus.SUCCEEDED.value == "succeeded" + assert BboxRefineStatus.FAILED.value == "failed"