feat(transform): post-extraction transformation stage + docs refresh#11
Merged
Conversation
## New stage A new ``transform`` pipeline node runs after judge / judge_escalation and before rules / assemble, applying caller-declared transformations to the extracted field groups. Two transformation types ship; the DTO is a Pydantic discriminated union so new declarative types extend the public API without breaking changes. * ``EntityResolutionTransformation`` (declarative, free, ms-scale) Deterministic two-phase dedup of array-field rows: DNI-first exact match falls back to NFKD-folded token-subset matching for rows without DNI. Canonical-row picks the most complete value per sub-field. Subsumes the bastanteo POC's hand-rolled persona normalisation. * ``LlmTransformation`` (free-form, ~one LLM call per group) Caller provides a one-sentence ``intention``; the transformer serialises the target group, runs a focused structured-output call, and replaces (or augments, via ``output_group``) the original group. Use for role classification, language translation, free-text normalisation, schema migration, summarisation, etc. Each transformation declares a ``scope``: * ``task`` (default) — mutates that task's groups in place. * ``request`` — concatenates the matching group across every task, applies the transformation once, emits the result under ``ExtractionResult.request_transformations``. Per-task groups stay untouched. Right for cross-document entity resolution. The stage is gated by ``stages.transform`` and is a no-op when ``options.transformations`` is empty. Per-transformation failures degrade — the engine catches and logs them so a bad transformation never poisons the pipeline. ## Wiring * New package ``core/services/transformations/`` with ``EntityResolutionTransformer`` (``@service``-autoscanned), ``LlmTransformer`` (``@bean``), and ``TransformationEngine`` (``@bean``, dispatches on the discriminated union). * New prompt template ``transform.yaml`` registered in ``PromptCatalog`` (default for ``LlmTransformation`` without ``prompt_id``). * ``IDPSettings.transform_timeout_s = 600`` (env: ``FLYDESK_IDP_TRANSFORM_TIMEOUT_S``). * ``PipelineOrchestrator`` wires the node only when both ``stages.transform`` and a non-empty ``options.transformations`` list are present. * ``ExtractionResult.request_transformations`` (new field) carries the cross-doc output. ## Documentation refresh * **``docs/transformations.md``** (new) — full deep dive: type reference, scope semantics, configuration tables, worked examples, guide for adding new declarative types. * **``docs/pipeline.md``** — stage table updated with ``bbox_refine`` (was missing) and ``transform``; new section on per-stage timeouts; new section on sync vs. async ``bbox_refine`` paths. * **``docs/api-reference.md``** — request example includes ``transform`` + a sample ``transformations`` array; ``StageToggles`` schema updated with every toggle; new ``Transformation`` DTO section. * **``README.md``** — ASCII pipeline diagram backfilled with ``bbox_refine`` + ``transform``; doc map adds the new doc. ## Tests * ``test_entity_resolution_transformer.py`` — 5 tests: DNI match, name-variant match, single-token guard, ``output_group`` preserves original, missing target is no-op. * ``test_transformation_engine.py`` — 3 tests: dispatch to declarative, dispatch to LLM, request-scope consolidates across tasks. Full unit suite: 231 passed, 1 skipped. Ruff clean. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
## Typed EDA events (uuid + timestamps + correlation)
Every event the service publishes now carries a typed Pydantic
envelope rather than a raw ``{"job_id": ...}`` dict. Each event has:
* ``event_id`` -- fresh UUID v4 per publication. Clients can dedupe
at-least-once deliveries by this field.
* ``event_type`` -- routing constant identical to what pyfly's bus uses.
* ``version`` -- semver of the payload shape; consumers branch on the
major component if they need to.
* ``occurred_at`` -- UTC ISO-8601, when the producing service emitted.
* ``correlation_id`` / ``tenant_id`` -- echoes inbound
``X-Correlation-Id`` / ``X-Tenant-Id`` headers for end-to-end audit.
* Type-specific lifecycle fields (``submitted_at`` for submitted,
``started_at`` + ``finished_at`` + ``attempts`` for completed,
``attempt`` for retry republishes, ...).
New module ``interfaces/dtos/event.py`` exposes:
* ``IDPJobSubmittedEvent`` -- published by ``SubmitJobHandler`` and
re-published by ``JobWorker._delayed_publish`` during retry backoff.
* ``IDPJobCompletedEvent`` -- terminal state envelope; surfaced to
webhook subscribers via :class:`JobWebhookPayload`.
* ``IDPBboxRefineRequestedEvent`` -- fan-out signal the main worker
emits when ``stages.bbox_refine`` is on (async path).
* ``IDPBboxRefineCompletedEvent`` -- emitted by the bbox worker once
refinement settles (success or terminal failure).
* ``IDPEvent`` -- discriminated union for consumer-side parsing.
Publishers swap ``{"job_id": ...}`` -> ``envelope_for_publish(event)``;
backwards-compatible because the typed envelope still has ``job_id`` at
the top level. Workers' existing ``_on_event(envelope)`` handlers read
``job_id`` from the payload dict unchanged.
## Webhook payload audit fields
``JobWebhookPayload`` mirrors the typed envelope: ``event_id``,
``event_type``, ``version``, ``correlation_id``, ``tenant_id``,
``started_at``, ``finished_at``, ``attempts``. The webhook publisher
now threads these through from the worker so external clients can
dedupe by ``event_id`` and reconstruct the full lifecycle without
needing repository access.
## pyfly conventions sweep
* ``TransformationEngine`` is now ``@service`` (its two dependencies
-- ``EntityResolutionTransformer`` and ``LlmTransformer`` -- are
both DI-managed) and is no longer registered as a ``@bean`` in
``IDPCoreConfiguration``. This is the pyfly idiom: ``@service`` for
services whose deps autowire by type; ``@bean`` for things that
need explicit constructor values (settings, prompt templates...).
* ``LlmTransformer`` remains a ``@bean`` because its constructor
takes the ``transform`` prompt template + the default model id,
values pyfly can't autoresolve.
* Confirmed every existing CQRS handler carries ``@command_handler``
/ ``@query_handler`` + ``@service`` and every controller carries
``@rest_controller`` + ``@request_mapping``.
## ``transformations: list[]`` clarification
``ExtractionOptions.transformations`` is and has always been a
``list[Transformation]``; the docs now spell this out explicitly and
include a chained-transformation example (declarative dedup followed
by an LLM step on the deduped survivors).
## Tests
* ``test_event_envelopes.py`` -- 4 tests: default population, JSON
serialisation, discriminated-union round-trip, status enum
serialisation.
* ``test_multi_transformation.py`` -- 3 tests: two declarative
transformations chain in order, entity_resolution-then-llm sees
the deduped rows, empty list is a no-op.
Unit suite: 238 passed, 1 skipped. Ruff clean. Pyright clean.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
ancongui
added a commit
that referenced
this pull request
May 31, 2026
…11) * feat(transform): post-extraction transformation stage + docs refresh ## New stage A new ``transform`` pipeline node runs after judge / judge_escalation and before rules / assemble, applying caller-declared transformations to the extracted field groups. Two transformation types ship; the DTO is a Pydantic discriminated union so new declarative types extend the public API without breaking changes. * ``EntityResolutionTransformation`` (declarative, free, ms-scale) Deterministic two-phase dedup of array-field rows: DNI-first exact match falls back to NFKD-folded token-subset matching for rows without DNI. Canonical-row picks the most complete value per sub-field. Subsumes the bastanteo POC's hand-rolled persona normalisation. * ``LlmTransformation`` (free-form, ~one LLM call per group) Caller provides a one-sentence ``intention``; the transformer serialises the target group, runs a focused structured-output call, and replaces (or augments, via ``output_group``) the original group. Use for role classification, language translation, free-text normalisation, schema migration, summarisation, etc. Each transformation declares a ``scope``: * ``task`` (default) — mutates that task's groups in place. * ``request`` — concatenates the matching group across every task, applies the transformation once, emits the result under ``ExtractionResult.request_transformations``. Per-task groups stay untouched. Right for cross-document entity resolution. The stage is gated by ``stages.transform`` and is a no-op when ``options.transformations`` is empty. Per-transformation failures degrade — the engine catches and logs them so a bad transformation never poisons the pipeline. ## Wiring * New package ``core/services/transformations/`` with ``EntityResolutionTransformer`` (``@service``-autoscanned), ``LlmTransformer`` (``@bean``), and ``TransformationEngine`` (``@bean``, dispatches on the discriminated union). * New prompt template ``transform.yaml`` registered in ``PromptCatalog`` (default for ``LlmTransformation`` without ``prompt_id``). * ``IDPSettings.transform_timeout_s = 600`` (env: ``FLYDESK_IDP_TRANSFORM_TIMEOUT_S``). * ``PipelineOrchestrator`` wires the node only when both ``stages.transform`` and a non-empty ``options.transformations`` list are present. * ``ExtractionResult.request_transformations`` (new field) carries the cross-doc output. ## Documentation refresh * **``docs/transformations.md``** (new) — full deep dive: type reference, scope semantics, configuration tables, worked examples, guide for adding new declarative types. * **``docs/pipeline.md``** — stage table updated with ``bbox_refine`` (was missing) and ``transform``; new section on per-stage timeouts; new section on sync vs. async ``bbox_refine`` paths. * **``docs/api-reference.md``** — request example includes ``transform`` + a sample ``transformations`` array; ``StageToggles`` schema updated with every toggle; new ``Transformation`` DTO section. * **``README.md``** — ASCII pipeline diagram backfilled with ``bbox_refine`` + ``transform``; doc map adds the new doc. ## Tests * ``test_entity_resolution_transformer.py`` — 5 tests: DNI match, name-variant match, single-token guard, ``output_group`` preserves original, missing target is no-op. * ``test_transformation_engine.py`` — 3 tests: dispatch to declarative, dispatch to LLM, request-scope consolidates across tasks. Full unit suite: 231 passed, 1 skipped. Ruff clean. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com> * feat(eda): typed event envelopes + pyfly conventions sweep ## Typed EDA events (uuid + timestamps + correlation) Every event the service publishes now carries a typed Pydantic envelope rather than a raw ``{"job_id": ...}`` dict. Each event has: * ``event_id`` -- fresh UUID v4 per publication. Clients can dedupe at-least-once deliveries by this field. * ``event_type`` -- routing constant identical to what pyfly's bus uses. * ``version`` -- semver of the payload shape; consumers branch on the major component if they need to. * ``occurred_at`` -- UTC ISO-8601, when the producing service emitted. * ``correlation_id`` / ``tenant_id`` -- echoes inbound ``X-Correlation-Id`` / ``X-Tenant-Id`` headers for end-to-end audit. * Type-specific lifecycle fields (``submitted_at`` for submitted, ``started_at`` + ``finished_at`` + ``attempts`` for completed, ``attempt`` for retry republishes, ...). New module ``interfaces/dtos/event.py`` exposes: * ``IDPJobSubmittedEvent`` -- published by ``SubmitJobHandler`` and re-published by ``JobWorker._delayed_publish`` during retry backoff. * ``IDPJobCompletedEvent`` -- terminal state envelope; surfaced to webhook subscribers via :class:`JobWebhookPayload`. * ``IDPBboxRefineRequestedEvent`` -- fan-out signal the main worker emits when ``stages.bbox_refine`` is on (async path). * ``IDPBboxRefineCompletedEvent`` -- emitted by the bbox worker once refinement settles (success or terminal failure). * ``IDPEvent`` -- discriminated union for consumer-side parsing. Publishers swap ``{"job_id": ...}`` -> ``envelope_for_publish(event)``; backwards-compatible because the typed envelope still has ``job_id`` at the top level. Workers' existing ``_on_event(envelope)`` handlers read ``job_id`` from the payload dict unchanged. ## Webhook payload audit fields ``JobWebhookPayload`` mirrors the typed envelope: ``event_id``, ``event_type``, ``version``, ``correlation_id``, ``tenant_id``, ``started_at``, ``finished_at``, ``attempts``. The webhook publisher now threads these through from the worker so external clients can dedupe by ``event_id`` and reconstruct the full lifecycle without needing repository access. ## pyfly conventions sweep * ``TransformationEngine`` is now ``@service`` (its two dependencies -- ``EntityResolutionTransformer`` and ``LlmTransformer`` -- are both DI-managed) and is no longer registered as a ``@bean`` in ``IDPCoreConfiguration``. This is the pyfly idiom: ``@service`` for services whose deps autowire by type; ``@bean`` for things that need explicit constructor values (settings, prompt templates...). * ``LlmTransformer`` remains a ``@bean`` because its constructor takes the ``transform`` prompt template + the default model id, values pyfly can't autoresolve. * Confirmed every existing CQRS handler carries ``@command_handler`` / ``@query_handler`` + ``@service`` and every controller carries ``@rest_controller`` + ``@request_mapping``. ## ``transformations: list[]`` clarification ``ExtractionOptions.transformations`` is and has always been a ``list[Transformation]``; the docs now spell this out explicitly and include a chained-transformation example (declarative dedup followed by an LLM step on the deduped survivors). ## Tests * ``test_event_envelopes.py`` -- 4 tests: default population, JSON serialisation, discriminated-union round-trip, status enum serialisation. * ``test_multi_transformation.py`` -- 3 tests: two declarative transformations chain in order, entity_resolution-then-llm sees the deduped rows, empty list is a no-op. Unit suite: 238 passed, 1 skipped. Ruff clean. Pyright clean. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com> --------- Co-authored-by: ancongui <andres.contreras@soon.es> Co-authored-by: Claude <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
A new
transformpipeline stage that lets callers declare post-extraction transformations directly in the request, instead of re-implementing them in every consumer. Also backfills documentation that had drifted since the bbox_refine / multi-file async / job-filtering changes shipped.Why
The bastanteo POC had to hand-roll persona normalisation (NFKD-fold + token-subset dedup) outside the service. That logic is generic enough to belong in the IDP. A first-class transformation stage:
What ships
Two transformation types
entity_resolutionllmintention.Scopes:
task(per-document, default) andrequest(cross-document; emits a consolidated group underresult.request_transformations).New package:
core/services/transformations/withEntityResolutionTransformer,LlmTransformer,TransformationEngine. Pipeline node wired inPipelineOrchestrator._step_transformbetweenjudge_escalationandrules. Prompt template registered inPromptCatalog. Timeout env-tunable viaFLYDESK_IDP_TRANSFORM_TIMEOUT_S.Documentation
docs/transformations.md— deep dive with examples + config reference + guide for adding new declarative types.docs/pipeline.md— stage table updated (added the missingbbox_refinerow and the newtransformrow); new sections on per-stage timeouts and the sync-vs-asyncbbox_refinepaths.docs/api-reference.md— request example coverstransform+transformations;StageTogglesschema completed; newTransformationDTO section.README.md— ASCII pipeline diagram backfilled; doc map adds the new page.API example
{ "options": { "stages": { "transform": true }, "transformations": [ { "type": "entity_resolution", "target_group": "personas", "match_by": ["dni", "nombre"], "scope": "request" }, { "type": "llm", "target_group": "personas", "intention": "Normalize each cargo to a closed taxonomy: administrador_unico, consejero, apoderado, otros." } ] } }Test plan
tests/unit/test_entity_resolution_transformer.py— 5 tests (DNI match · accent+subset name match · single-token guard ·output_grouppreserves original · missing-target no-op).tests/unit/test_transformation_engine.py— 3 tests (dispatch declarative · dispatch LLM · request-scope consolidates).checkandformat --checkboth clean.🤖 Generated with Claude Code