Skip to content

Return Pydantic model instances through XCom for structured output#67644

Merged
kaxil merged 8 commits into
apache:mainfrom
astronomer:llm-typed-output
May 29, 2026
Merged

Return Pydantic model instances through XCom for structured output#67644
kaxil merged 8 commits into
apache:mainfrom
astronomer:llm-typed-output

Conversation

@kaxil
Copy link
Copy Markdown
Member

@kaxil kaxil commented May 28, 2026

Summary

When you pass output_type=SomePydanticModel to LLMOperator / LLMAgentOperator / LLMFileAnalysisOperator (or the matching @task.llm / @task.agent / @task.llm_file_analysis decorators), the operator used to call model_dump() on the result before pushing it to XCom. So even though the upstream declared a type, the downstream task got a dict and had to use analysis["priority"] instead of analysis.priority.

This PR drops that model_dump() call. The Pydantic instance flows through XCom as-is. Downstream tasks can type-hint the class and use attribute access.

Motivated by Jed's question :) "It would be nice if that could be List[TicketAnalysis] instead" of list[dict].

What it looks like

class TicketAnalysis(BaseModel):
    priority: str
    category: str

@task.llm(llm_conn_id="pydanticai_default", output_type=TicketAnalysis)
def analyze(ticket: str) -> str:
    return f"Analyze: {ticket}"

@task
def store(analyses: list[TicketAnalysis]):       # was: list[dict]
    for a in analyses:
        print(a.priority, a.category)            # was: a["priority"]

How it works (and why this design)

The Pydantic deserializer that turns the wire bytes back into a class instance already exists (see task-sdk/.../serde/__init__.py:286). What gates it is the allow-list check at serde/__init__.py:264, which only lets through classnames that match [core] allowed_deserialization_classes or live in the process-local _extra_allowed set.

Lifting that gate for all Pydantic models is what bolkedebruin pushed back on in PR #51059: import_string runs before any type check, and Pydantic validators (@field_validator, @model_validator) can execute arbitrary code during model_validate. So "trust any class as long as it inherits BaseModel" reopens an attack surface that was deliberately closed.

But in his same review he flagged the door:

"We might need to think of a mechanism that allows serializers to register 'allowed' classes, but that's probably out of scope for now (let's not include it now)."

This PR walks through that door. New helper in airflow.sdk.serde:

def allow_class(cls: type) -> None:
    # adds qualname(cls) to the process-local _extra_allowed set

Each LLM operator calls allow_class(output_type) from __init__. The threat model is the same as a config edit: the DAG author put output_type=MyModel in code that's already trusted. An attacker who can change that argument already has DAG-file write access, which is RCE.

The reason same-DAG downstream tasks just work without any config: every worker that runs any task in the DAG parses the DAG file at startup, which re-runs every operator's __init__, which calls allow_class again. Process-local, idempotent.

output_type can be a single class, a Union, an Optional, a list[Model], etc. (pydantic-ai accepts all of those). The new iter_base_model_classes helper walks the type tree and registers each reachable BaseModel so Union/Optional outputs work too.

Demo

Local run with a minimal DAG that mirrors what LLMOperator(output_type=TicketAnalysis) does internally -- registers the class via allow_class, returns the instance from one task, attribute-accesses it from the downstream task.

dag_run_grid produce_xcom_wide

The producer task's log line confirms the value flows as a Pydantic instance, not a dict:

INFO - Done. Returned value was: priority='high' category='bug'
       summary='Nightly ETL failing with Postgres connection timeout'
       suggested_action='Page the on-call DBA and check connection pool size'

The consumer task receives it as TicketAnalysis and uses attribute access. The UI XCom viewer renders it via the existing stringify path:

<module>.TicketAnalysis@version=1(priority=high,category=bug,summary=...,suggested_action=...)

Not pretty (no field-by-field rendering today), but the value shows without any allow-list edit on the deployment.

What doesn't get auto-registered

One path still needs [core] allowed_deserialization_classes updated: xcom_pull from a different DAG. The consumer DAG's worker only parses its own DAG file, so the producer's LLMOperator.__init__ never runs there. That case is the same as today and is called out in the operator guides.

The UI XCom display goes through stringify (not deserialize), so it works without config -- I confirmed this in the live demo above. Earlier drafts of this PR overstated the limitation; the docs in this version match the actual behaviour.

Fail-fast on classes that can't round-trip

allow_class rejects classes whose qualname can't be re-imported:

  • defined inside a function body (<locals> in __qualname__)
  • nested inside another class (dotted __qualname__)
  • dynamically built with a mismatched __name__ (e.g. MyModel = pydantic.create_model("Different", ...))
  • parametrised generics (Result[int])

Without this guard the failure shows up at the downstream consumer's import_string() call with no hint at the root cause. With the guard it raises a clear ValueError at DAG parse time, pointing at the operator that owns the bad output_type. The example DAGs that previously defined their Pydantic class inside the @dag body are updated to put them at module scope.

Backwards compatibility

Provider is in incubation lifecycle (0.3.0), so the breaking change to the XCom value shape is permitted by the API contract reservation. Migration note added to the top of providers/common/ai/docs/changelog.rst.

The allow_class helper itself is new to airflow.sdk.serde. The provider still declares apache-airflow>=3.0.0, so I added a try/except import: when running against an older Airflow that doesn't have allow_class, the operators fall back to model_dump() (the previous behaviour). Users on the new Airflow get the typed path; users on older versions keep the dict path.

kaxil added 3 commits May 28, 2026 12:44
…d output

`LLMOperator`, `LLMAgentOperator`, `LLMFileAnalysisOperator`, and their
`@task.llm` / `@task.agent` / `@task.llm_file_analysis` decorators stop
calling `model_dump()` on Pydantic outputs before pushing to XCom. Downstream
tasks now receive the model instance directly, so they can type-hint the
class (`def downstream(result: MyModel)`) and use attribute access
(`result.field`) instead of subscript access on a dict.

To avoid forcing every DAG author to edit `[core] allowed_deserialization_classes`,
the operators auto-register their `output_type` (and any `BaseModel`
reachable from `Union`/`Optional`/`list` shapes) via a new
`airflow.sdk.serde.allow_class(cls)` helper. The registration is process-local
and runs in each worker's `__init__` -- same-DAG downstream tasks parse the
DAG file when they start up, which re-runs the constructor and re-populates
the per-process allow-list.

The helper rejects classes that cannot be re-imported by qualname (defined
in a function body, nested in another class, dynamically built with a
mismatched `__name__`, or parametrised generics) so the failure surfaces at
DAG parse time rather than at XCom-consume time.

UI XCom viewer and cross-DAG `xcom_pull` are still gated by
`[core] allowed_deserialization_classes` because the API server and other
DAGs' workers don't import the producing DAG. Documented explicitly in the
operator guides.

Older Airflow versions that lack `allow_class` continue to get the dict
form via a try/except fallback in each operator, so the provider keeps
working on `apache-airflow>=3.0.0`.
…puts

The UI's XCom viewer renders structured-output Pydantic instances via the
``stringify`` path (``airflow.serialization.stringify``) rather than the
``deserialize`` path, so user classes outside the ``airflow.*`` glob do not
hit the allow-list gate -- they show up as ``module.MyModel@version=1(...)``
without any config change. Only cross-DAG ``xcom_pull`` is still gated.

Also hoist a ``pydantic.create_model`` import to module scope in the serde
test that was using it inline.
Strip DagBag's ``unusual_prefix_<sha>_`` module prefix from the displayed
classname and repr-quote string field values inside the ``classname@version=N(...)``
form. Before this change, an XCom value carrying a user-defined Pydantic class
rendered in the UI as:

   unusual_prefix_9ce9eb..._typed_xcom_demo.TicketAnalysis@version=1(
     priority=high,category=bug,summary=Nightly ETL...)

After:

   typed_xcom_demo.TicketAnalysis@version=1(
     priority='high', category='bug', summary='Nightly ETL...')

The prefix is a DagBag artifact (added to avoid ``sys.modules`` clashes
between same-named DAG files in different bundles) and has no value in the
human-readable XCom display. Quoting strings disambiguates ``field=value``
from a bare token and matches Pydantic/dataclass repr conventions.
kaxil added 4 commits May 28, 2026 15:47
Three CI failures fixed:

1. Compat tests against Airflow 3.0.6 / 3.1.8: new tests assumed
   allow_class is importable and asserted on Pydantic instance shape.
   Gate the new tests behind a requires_allow_class marker so they skip
   cleanly on older Airflow (operators already fall back to model_dump
   there via the import-safe import).

2. Docs build failed with 12 RST errors in autoapi-generated index.rst
   for example_dags modules. Pydantic BaseModel's inherited docstring
   leaks through autoapi rendering and breaks the Definition list. An
   explicit docstring on each module-level Pydantic class overrides the
   inherited one and keeps the RST valid.

3. Spell-check: qualname is a Python attribute name; backtick it in
   prose so the spell-checker treats it as code. Switched 'parametrised'
   (British) to 'parameterized' (American) to match wordlist.
Per Jed's review: some downstream consumers want the dict shape (e.g.
forwarding the value to an external system that expects JSON-style payloads).

Add serialize_output: bool = False to LLMOperator and AgentOperator (and via
inheritance, LLMFileAnalysisOperator). When True the operator calls
model_dump() before pushing to XCom, restoring the pre-PR behavior on demand
without giving up the typed default. The class is not registered in
_extra_allowed in that mode since the wire carries a plain dict and never
hits the allow-list gate.
mypy was inferring the type from the first try-branch assignment
(Callable[[type], None]) and then rejecting the except-branch's None
fallback. Annotate the variable as object | None explicitly so both
branches type-check.
Three more tests assert isinstance(result, BaseModel) from the
execute_complete rehydration path. On older Airflow (no allow_class), the
operator falls back to model_dump() and returns a dict, so those tests
need the same compat gate.
Comment thread providers/common/ai/src/airflow/providers/common/ai/operators/llm.py Outdated
Comment thread providers/common/ai/tests/unit/common/ai/operators/test_agent.py
Copy link
Copy Markdown
Member

@gopidesupavan gopidesupavan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM thanks for adding this. only nit..

Per gopidesupavan's review: the model_validate_json + try/except + serialize
logic was duplicated between LLMOperator.execute_complete and the AgentOperator
HITL branch. Move it to a single rehydrate_pydantic_output helper in
output_type.py and call from both sites.
@kaxil kaxil merged commit 9318bd6 into apache:main May 29, 2026
138 checks passed
@kaxil kaxil deleted the llm-typed-output branch May 29, 2026 01:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants